aboutsummaryrefslogtreecommitdiffstats
path: root/components/script/body.rs
diff options
context:
space:
mode:
Diffstat (limited to 'components/script/body.rs')
-rw-r--r--components/script/body.rs127
1 files changed, 85 insertions, 42 deletions
diff --git a/components/script/body.rs b/components/script/body.rs
index ce7113f9807..b7436c3e5e0 100644
--- a/components/script/body.rs
+++ b/components/script/body.rs
@@ -40,7 +40,9 @@ use js::rust::wrappers::JS_ParseJSON;
use js::rust::HandleValue;
use js::typedarray::{ArrayBuffer, CreateWith};
use mime::{self, Mime};
-use net_traits::request::{BodyChunkRequest, BodySource as NetBodySource, RequestBody};
+use net_traits::request::{
+ BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody,
+};
use script_traits::serializable::BlobImpl;
use std::ptr;
use std::rc::Rc;
@@ -49,7 +51,7 @@ use url::form_urlencoded;
/// The Dom object, or ReadableStream, that is the source of a body.
/// <https://fetch.spec.whatwg.org/#concept-body-source>
-#[derive(Clone)]
+#[derive(Clone, PartialEq)]
pub enum BodySource {
/// A ReadableStream comes with a null-source.
Null,
@@ -59,6 +61,14 @@ pub enum BodySource {
Object,
}
+/// The reason to stop reading from the body.
+enum StopReading {
+ /// The stream has errored.
+ Error,
+ /// The stream is done.
+ Done,
+}
+
/// The IPC route handler
/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
/// This route runs in the script process,
@@ -69,7 +79,7 @@ struct TransmitBodyConnectHandler {
stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource,
canceller: TaskCanceller,
- bytes_sender: Option<IpcSender<Vec<u8>>>,
+ bytes_sender: Option<IpcSender<BodyChunkResponse>>,
control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>,
in_memory_done: bool,
@@ -123,7 +133,14 @@ impl TransmitBodyConnectHandler {
BodyChunkRequest::Chunk => body_handler.transmit_source(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
- BodyChunkRequest::Done => body_handler.stop_reading(),
+ BodyChunkRequest::Done => {
+ body_handler.stop_reading(StopReading::Done);
+ },
+ // Note: this is actually sent from this process
+ // by the TransmitBodyPromiseHandler when the stream errors.
+ BodyChunkRequest::Error => {
+ body_handler.stop_reading(StopReading::Error);
+ },
}
}),
);
@@ -138,7 +155,7 @@ impl TransmitBodyConnectHandler {
fn transmit_source(&mut self) {
if self.in_memory_done {
// Step 5.1.3
- self.stop_reading();
+ self.stop_reading(StopReading::Done);
return;
}
@@ -153,29 +170,58 @@ impl TransmitBodyConnectHandler {
.bytes_sender
.as_ref()
.expect("No bytes sender to transmit source.")
- .send(bytes.clone());
+ .send(BodyChunkResponse::Chunk(bytes.clone()));
return;
}
warn!("Re-directs for file-based Blobs not supported yet.");
}
/// Take the IPC sender sent by `net`, so we can send body chunks with it.
- fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) {
+ /// Also the entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
+ fn start_reading(&mut self, sender: IpcSender<BodyChunkResponse>) {
self.bytes_sender = Some(sender);
+
+ // If we're using an actual ReadableStream, acquire a reader for it.
+ if self.source == BodySource::Null {
+ let stream = self.stream.clone();
+ let _ = self.task_source.queue_with_canceller(
+ task!(start_reading_request_body_stream: move || {
+ // Step 1, Let body be request’s body.
+ let rooted_stream = stream.root();
+
+ // TODO: Step 2, If body is null.
+
+ // Step 3, get a reader for stream.
+ rooted_stream.start_reading().expect("Couldn't acquire a reader for the body stream.");
+
+ // Note: this algorithm continues when the first chunk is requested by `net`.
+ }),
+ &self.canceller,
+ );
+ }
}
/// Drop the IPC sender sent by `net`
- fn stop_reading(&mut self) {
- // Note: this should close the corresponding receiver,
- // and terminate the request stream in `net`.
- self.bytes_sender = None;
+ fn stop_reading(&mut self, reason: StopReading) {
+ let bytes_sender = self
+ .bytes_sender
+ .take()
+ .expect("Stop reading called multiple times on TransmitBodyConnectHandler.");
+ match reason {
+ StopReading::Error => {
+ let _ = bytes_sender.send(BodyChunkResponse::Error);
+ },
+ StopReading::Done => {
+ let _ = bytes_sender.send(BodyChunkResponse::Done);
+ },
+ }
}
- /// The entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
+ /// Step 4 and following of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn transmit_body_chunk(&mut self) {
if self.in_memory_done {
// Step 5.1.3
- self.stop_reading();
+ self.stop_reading(StopReading::Done);
return;
}
@@ -188,7 +234,7 @@ impl TransmitBodyConnectHandler {
// In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey.
if let Some(bytes) = self.in_memory.clone() {
- let _ = bytes_sender.send(bytes);
+ let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes));
// Mark this body as `done` so that we can stop reading in the next tick,
// matching the behavior of the promise-based flow
self.in_memory_done = true;
@@ -197,27 +243,9 @@ impl TransmitBodyConnectHandler {
let _ = self.task_source.queue_with_canceller(
task!(setup_native_body_promise_handler: move || {
- // Step 1, Let body be request’s body.
- //
- // TODO: We need the handle the body null case,
- // here assuming body is something and we have the corresponding stream.
let rooted_stream = stream.root();
let global = rooted_stream.global();
- // TODO: Step 2, If body is null,
- // then queue a fetch task on request to process request end-of-body
- // for request and abort these steps.
-
- // TODO: queuing those "process request ..." tasks means we also need a handle on Request here.
-
- // Step 3, get a reader for stream.
- if rooted_stream.start_reading().is_err() {
- // Note: this can happen if script starts consuming request body
- // before fetch starts transmitting it.
- // Not in the spec.
- return;
- }
-
// Step 4, the result of reading a chunk from body’s stream with reader.
let promise = rooted_stream.read_a_chunk();
@@ -225,14 +253,19 @@ impl TransmitBodyConnectHandler {
// are a combination of the promise native handler here,
// and the corresponding IPC route in `component::net::http_loader`.
let promise_handler = Box::new(TransmitBodyPromiseHandler {
- bytes_sender,
+ bytes_sender: bytes_sender.clone(),
stream: rooted_stream.clone(),
- control_sender,
+ control_sender: control_sender.clone(),
});
- let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {stream: rooted_stream});
+ let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {
+ bytes_sender,
+ stream: rooted_stream,
+ control_sender,
+ });
- let handler = PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
+ let handler =
+ PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
@@ -248,7 +281,7 @@ impl TransmitBodyConnectHandler {
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseHandler {
#[ignore_malloc_size_of = "Channels are hard"]
- bytes_sender: IpcSender<Vec<u8>>,
+ bytes_sender: IpcSender<BodyChunkResponse>,
stream: DomRoot<ReadableStream>,
#[ignore_malloc_size_of = "Channels are hard"]
control_sender: IpcSender<BodyChunkRequest>,
@@ -278,8 +311,7 @@ impl Callback for TransmitBodyPromiseHandler {
Ok(chunk) => chunk,
Err(_) => {
// Step 5.5, the "otherwise" steps.
- // TODO: terminate fetch.
- let _ = self.control_sender.send(BodyChunkRequest::Done);
+ let _ = self.control_sender.send(BodyChunkRequest::Error);
return self.stream.stop_reading();
},
};
@@ -287,7 +319,7 @@ impl Callback for TransmitBodyPromiseHandler {
// Step 5.1 and 5.2, transmit chunk.
// Send the chunk to the body transmitter in net::http_loader::obtain_response.
// TODO: queue a fetch task on request to process request body for request.
- let _ = self.bytes_sender.send(chunk);
+ let _ = self.bytes_sender.send(BodyChunkResponse::Chunk(chunk));
}
}
@@ -295,14 +327,18 @@ impl Callback for TransmitBodyPromiseHandler {
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseRejectionHandler {
+ #[ignore_malloc_size_of = "Channels are hard"]
+ bytes_sender: IpcSender<BodyChunkResponse>,
stream: DomRoot<ReadableStream>,
+ #[ignore_malloc_size_of = "Channels are hard"]
+ control_sender: IpcSender<BodyChunkRequest>,
}
impl Callback for TransmitBodyPromiseRejectionHandler {
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) {
// Step 5.4, the "rejection" steps.
- // TODO: terminate fetch.
+ let _ = self.control_sender.send(BodyChunkRequest::Error);
return self.stream.stop_reading();
}
}
@@ -376,7 +412,14 @@ impl ExtractedBody {
BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
- BodyChunkRequest::Done => body_handler.stop_reading(),
+ BodyChunkRequest::Done => {
+ body_handler.stop_reading(StopReading::Done);
+ },
+ // Note: this is actually sent from this process
+ // by the TransmitBodyPromiseHandler when the stream errors.
+ BodyChunkRequest::Error => {
+ body_handler.stop_reading(StopReading::Error);
+ },
}
}),
);