diff options
Diffstat (limited to 'components/script/body.rs')
-rw-r--r-- | components/script/body.rs | 127 |
1 files changed, 85 insertions, 42 deletions
diff --git a/components/script/body.rs b/components/script/body.rs index ce7113f9807..b7436c3e5e0 100644 --- a/components/script/body.rs +++ b/components/script/body.rs @@ -40,7 +40,9 @@ use js::rust::wrappers::JS_ParseJSON; use js::rust::HandleValue; use js::typedarray::{ArrayBuffer, CreateWith}; use mime::{self, Mime}; -use net_traits::request::{BodyChunkRequest, BodySource as NetBodySource, RequestBody}; +use net_traits::request::{ + BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody, +}; use script_traits::serializable::BlobImpl; use std::ptr; use std::rc::Rc; @@ -49,7 +51,7 @@ use url::form_urlencoded; /// The Dom object, or ReadableStream, that is the source of a body. /// <https://fetch.spec.whatwg.org/#concept-body-source> -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum BodySource { /// A ReadableStream comes with a null-source. Null, @@ -59,6 +61,14 @@ pub enum BodySource { Object, } +/// The reason to stop reading from the body. +enum StopReading { + /// The stream has errored. + Error, + /// The stream is done. + Done, +} + /// The IPC route handler /// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>. /// This route runs in the script process, @@ -69,7 +79,7 @@ struct TransmitBodyConnectHandler { stream: Trusted<ReadableStream>, task_source: NetworkingTaskSource, canceller: TaskCanceller, - bytes_sender: Option<IpcSender<Vec<u8>>>, + bytes_sender: Option<IpcSender<BodyChunkResponse>>, control_sender: IpcSender<BodyChunkRequest>, in_memory: Option<Vec<u8>>, in_memory_done: bool, @@ -123,7 +133,14 @@ impl TransmitBodyConnectHandler { BodyChunkRequest::Chunk => body_handler.transmit_source(), // Note: this is actually sent from this process // by the TransmitBodyPromiseHandler when reading stops. - BodyChunkRequest::Done => body_handler.stop_reading(), + BodyChunkRequest::Done => { + body_handler.stop_reading(StopReading::Done); + }, + // Note: this is actually sent from this process + // by the TransmitBodyPromiseHandler when the stream errors. + BodyChunkRequest::Error => { + body_handler.stop_reading(StopReading::Error); + }, } }), ); @@ -138,7 +155,7 @@ impl TransmitBodyConnectHandler { fn transmit_source(&mut self) { if self.in_memory_done { // Step 5.1.3 - self.stop_reading(); + self.stop_reading(StopReading::Done); return; } @@ -153,29 +170,58 @@ impl TransmitBodyConnectHandler { .bytes_sender .as_ref() .expect("No bytes sender to transmit source.") - .send(bytes.clone()); + .send(BodyChunkResponse::Chunk(bytes.clone())); return; } warn!("Re-directs for file-based Blobs not supported yet."); } /// Take the IPC sender sent by `net`, so we can send body chunks with it. - fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) { + /// Also the entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body> + fn start_reading(&mut self, sender: IpcSender<BodyChunkResponse>) { self.bytes_sender = Some(sender); + + // If we're using an actual ReadableStream, acquire a reader for it. + if self.source == BodySource::Null { + let stream = self.stream.clone(); + let _ = self.task_source.queue_with_canceller( + task!(start_reading_request_body_stream: move || { + // Step 1, Let body be request’s body. + let rooted_stream = stream.root(); + + // TODO: Step 2, If body is null. + + // Step 3, get a reader for stream. + rooted_stream.start_reading().expect("Couldn't acquire a reader for the body stream."); + + // Note: this algorithm continues when the first chunk is requested by `net`. + }), + &self.canceller, + ); + } } /// Drop the IPC sender sent by `net` - fn stop_reading(&mut self) { - // Note: this should close the corresponding receiver, - // and terminate the request stream in `net`. - self.bytes_sender = None; + fn stop_reading(&mut self, reason: StopReading) { + let bytes_sender = self + .bytes_sender + .take() + .expect("Stop reading called multiple times on TransmitBodyConnectHandler."); + match reason { + StopReading::Error => { + let _ = bytes_sender.send(BodyChunkResponse::Error); + }, + StopReading::Done => { + let _ = bytes_sender.send(BodyChunkResponse::Done); + }, + } } - /// The entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body> + /// Step 4 and following of <https://fetch.spec.whatwg.org/#concept-request-transmit-body> fn transmit_body_chunk(&mut self) { if self.in_memory_done { // Step 5.1.3 - self.stop_reading(); + self.stop_reading(StopReading::Done); return; } @@ -188,7 +234,7 @@ impl TransmitBodyConnectHandler { // In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey. if let Some(bytes) = self.in_memory.clone() { - let _ = bytes_sender.send(bytes); + let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes)); // Mark this body as `done` so that we can stop reading in the next tick, // matching the behavior of the promise-based flow self.in_memory_done = true; @@ -197,27 +243,9 @@ impl TransmitBodyConnectHandler { let _ = self.task_source.queue_with_canceller( task!(setup_native_body_promise_handler: move || { - // Step 1, Let body be request’s body. - // - // TODO: We need the handle the body null case, - // here assuming body is something and we have the corresponding stream. let rooted_stream = stream.root(); let global = rooted_stream.global(); - // TODO: Step 2, If body is null, - // then queue a fetch task on request to process request end-of-body - // for request and abort these steps. - - // TODO: queuing those "process request ..." tasks means we also need a handle on Request here. - - // Step 3, get a reader for stream. - if rooted_stream.start_reading().is_err() { - // Note: this can happen if script starts consuming request body - // before fetch starts transmitting it. - // Not in the spec. - return; - } - // Step 4, the result of reading a chunk from body’s stream with reader. let promise = rooted_stream.read_a_chunk(); @@ -225,14 +253,19 @@ impl TransmitBodyConnectHandler { // are a combination of the promise native handler here, // and the corresponding IPC route in `component::net::http_loader`. let promise_handler = Box::new(TransmitBodyPromiseHandler { - bytes_sender, + bytes_sender: bytes_sender.clone(), stream: rooted_stream.clone(), - control_sender, + control_sender: control_sender.clone(), }); - let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {stream: rooted_stream}); + let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler { + bytes_sender, + stream: rooted_stream, + control_sender, + }); - let handler = PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler)); + let handler = + PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler)); let realm = enter_realm(&*global); let comp = InRealm::Entered(&realm); @@ -248,7 +281,7 @@ impl TransmitBodyConnectHandler { #[derive(Clone, JSTraceable, MallocSizeOf)] struct TransmitBodyPromiseHandler { #[ignore_malloc_size_of = "Channels are hard"] - bytes_sender: IpcSender<Vec<u8>>, + bytes_sender: IpcSender<BodyChunkResponse>, stream: DomRoot<ReadableStream>, #[ignore_malloc_size_of = "Channels are hard"] control_sender: IpcSender<BodyChunkRequest>, @@ -278,8 +311,7 @@ impl Callback for TransmitBodyPromiseHandler { Ok(chunk) => chunk, Err(_) => { // Step 5.5, the "otherwise" steps. - // TODO: terminate fetch. - let _ = self.control_sender.send(BodyChunkRequest::Done); + let _ = self.control_sender.send(BodyChunkRequest::Error); return self.stream.stop_reading(); }, }; @@ -287,7 +319,7 @@ impl Callback for TransmitBodyPromiseHandler { // Step 5.1 and 5.2, transmit chunk. // Send the chunk to the body transmitter in net::http_loader::obtain_response. // TODO: queue a fetch task on request to process request body for request. - let _ = self.bytes_sender.send(chunk); + let _ = self.bytes_sender.send(BodyChunkResponse::Chunk(chunk)); } } @@ -295,14 +327,18 @@ impl Callback for TransmitBodyPromiseHandler { /// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>. #[derive(Clone, JSTraceable, MallocSizeOf)] struct TransmitBodyPromiseRejectionHandler { + #[ignore_malloc_size_of = "Channels are hard"] + bytes_sender: IpcSender<BodyChunkResponse>, stream: DomRoot<ReadableStream>, + #[ignore_malloc_size_of = "Channels are hard"] + control_sender: IpcSender<BodyChunkRequest>, } impl Callback for TransmitBodyPromiseRejectionHandler { /// <https://fetch.spec.whatwg.org/#concept-request-transmit-body> fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) { // Step 5.4, the "rejection" steps. - // TODO: terminate fetch. + let _ = self.control_sender.send(BodyChunkRequest::Error); return self.stream.stop_reading(); } } @@ -376,7 +412,14 @@ impl ExtractedBody { BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(), // Note: this is actually sent from this process // by the TransmitBodyPromiseHandler when reading stops. - BodyChunkRequest::Done => body_handler.stop_reading(), + BodyChunkRequest::Done => { + body_handler.stop_reading(StopReading::Done); + }, + // Note: this is actually sent from this process + // by the TransmitBodyPromiseHandler when the stream errors. + BodyChunkRequest::Error => { + body_handler.stop_reading(StopReading::Error); + }, } }), ); |