aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--components/net/fetch/methods.rs8
-rw-r--r--components/net/http_loader.rs194
-rw-r--r--components/net/tests/http_loader.rs7
-rw-r--r--components/net_traits/request.rs23
-rw-r--r--components/script/body.rs127
-rw-r--r--tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini32
6 files changed, 282 insertions, 109 deletions
diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs
index c25ab10cbe6..1aabb545354 100644
--- a/components/net/fetch/methods.rs
+++ b/components/net/fetch/methods.rs
@@ -23,7 +23,8 @@ use net_traits::request::{
is_cors_safelisted_method, is_cors_safelisted_request_header, Origin, ResponseTainting, Window,
};
use net_traits::request::{
- BodyChunkRequest, CredentialsMode, Destination, Referrer, Request, RequestMode,
+ BodyChunkRequest, BodyChunkResponse, CredentialsMode, Destination, Referrer, Request,
+ RequestMode,
};
use net_traits::response::{Response, ResponseBody, ResponseType};
use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy, ResourceFetchTiming};
@@ -641,7 +642,10 @@ fn scheme_fetch(
let (body_chan, body_port) = ipc::channel().unwrap();
let _ = stream.send(BodyChunkRequest::Connect(body_chan));
let _ = stream.send(BodyChunkRequest::Chunk);
- body_port.recv().ok()
+ match body_port.recv().ok() {
+ Some(BodyChunkResponse::Chunk(bytes)) => Some(bytes),
+ _ => panic!("cert should be sent in a single chunk."),
+ }
});
let data = data.as_ref().and_then(|b| {
let idx = b.iter().position(|b| *b == b'&')?;
diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs
index a8dfaecc422..33a6e4415e3 100644
--- a/components/net/http_loader.rs
+++ b/components/net/http_loader.rs
@@ -11,7 +11,7 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target}
use crate::hsts::HstsList;
use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache;
-use crossbeam_channel::{unbounded, Sender};
+use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::{
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
};
@@ -30,6 +30,7 @@ use http::header::{
CONTENT_TYPE,
};
use http::{HeaderMap, Request as HyperRequest};
+use hyper::header::TRANSFER_ENCODING;
use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode};
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcSender};
@@ -40,7 +41,8 @@ use net_traits::quality::{quality_to_value, Quality, QualityItem};
use net_traits::request::Origin::Origin as SpecificOrigin;
use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header};
use net_traits::request::{
- BodyChunkRequest, RedirectMode, Referrer, Request, RequestBuilder, RequestMode,
+ BodyChunkRequest, BodyChunkResponse, RedirectMode, Referrer, Request, RequestBuilder,
+ RequestMode,
};
use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin};
use net_traits::request::{ResponseTainting, ServiceWorkersMode};
@@ -61,7 +63,7 @@ use std::time::{Duration, SystemTime};
use time::{self, Tm};
use tokio::prelude::{future, Future, Sink, Stream};
use tokio::runtime::Runtime;
-use tokio::sync::mpsc::channel;
+use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender};
lazy_static! {
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
@@ -405,16 +407,89 @@ fn auth_from_cache(
}
}
+/// Messages from the IPC route to the fetch worker,
+/// used to fill the body with bytes coming-in over IPC.
+enum BodyChunk {
+ /// A chunk of bytes.
+ Chunk(Vec<u8>),
+ /// Body is done.
+ Done,
+}
+
+/// The stream side of the body passed to hyper.
+enum BodyStream {
+ /// A receiver that can be used in Body::wrap_stream,
+ /// for streaming the request over the network.
+ Chunked(TokioReceiver<Vec<u8>>),
+ /// A body whose bytes are buffered
+ /// and sent in one chunk over the network.
+ Buffered(Receiver<BodyChunk>),
+}
+
+/// The sink side of the body passed to hyper,
+/// used to enqueue chunks.
+enum BodySink {
+ /// A Tokio sender used to feed chunks to the network stream.
+ Chunked(TokioSender<Vec<u8>>),
+ /// A Crossbeam sender used to send chunks to the fetch worker,
+ /// where they will be buffered
+ /// in order to ensure they are not streamed them over the network.
+ Buffered(Sender<BodyChunk>),
+}
+
+impl BodySink {
+ pub fn transmit_bytes(&self, bytes: Vec<u8>) {
+ match self {
+ BodySink::Chunked(ref sender) => {
+ let sender = sender.clone();
+ HANDLE
+ .lock()
+ .unwrap()
+ .as_mut()
+ .unwrap()
+ .spawn(sender.send(bytes).map(|_| ()).map_err(|_| ()));
+ },
+ BodySink::Buffered(ref sender) => {
+ let _ = sender.send(BodyChunk::Chunk(bytes));
+ },
+ }
+ }
+
+ pub fn close(&self) {
+ match self {
+ BodySink::Chunked(ref sender) => {
+ let mut sender = sender.clone();
+ HANDLE
+ .lock()
+ .unwrap()
+ .as_mut()
+ .unwrap()
+ .spawn(future::lazy(move || {
+ if sender.close().is_err() {
+ warn!("Failed to close network request sink.");
+ }
+ Ok(())
+ }));
+ },
+ BodySink::Buffered(ref sender) => {
+ let _ = sender.send(BodyChunk::Done);
+ },
+ }
+ }
+}
+
fn obtain_response(
client: &Client<Connector, Body>,
url: &ServoUrl,
method: &Method,
- request_headers: &HeaderMap,
+ request_headers: &mut HeaderMap,
body: Option<IpcSender<BodyChunkRequest>>,
+ source_is_null: bool,
pipeline_id: &Option<PipelineId>,
request_id: Option<&str>,
is_xhr: bool,
context: &FetchContext,
+ fetch_terminated: Sender<bool>,
) -> Box<
dyn Future<
Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>),
@@ -435,12 +510,25 @@ fn obtain_response(
.replace("}", "%7D");
let request = if let Some(chunk_requester) = body {
- // TODO: If body is a stream, append `Transfer-Encoding`/`chunked`,
- // see step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
+ let (sink, stream) = if source_is_null {
+ // Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
+ // TODO: this should not be set for HTTP/2(currently not supported?).
+ request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
- let (body_chan, body_port) = ipc::channel().unwrap();
+ let (sender, receiver) = channel(1);
+ (BodySink::Chunked(sender), BodyStream::Chunked(receiver))
+ } else {
+ // Note: Hyper seems to already buffer bytes when the request appears not stream-able,
+ // see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104
+ //
+ // However since this doesn't appear documented, and we're using an ancient version,
+ // for now we buffer manually to ensure we don't stream requests
+ // to servers that might not know how to handle them.
+ let (sender, receiver) = unbounded();
+ (BodySink::Buffered(sender), BodyStream::Buffered(receiver))
+ };
- let (sender, receiver) = channel(1);
+ let (body_chan, body_port) = ipc::channel().unwrap();
let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan));
@@ -453,32 +541,58 @@ fn obtain_response(
ROUTER.add_route(
body_port.to_opaque(),
Box::new(move |message| {
- let bytes: Vec<u8> = message.to().unwrap();
- let chunk_requester = chunk_requester.clone();
- let sender = sender.clone();
+ let bytes: Vec<u8> = match message.to().unwrap() {
+ BodyChunkResponse::Chunk(bytes) => bytes,
+ BodyChunkResponse::Done => {
+ // Step 3, abort these parallel steps.
+ let _ = fetch_terminated.send(false);
+ sink.close();
+ return;
+ },
+ BodyChunkResponse::Error => {
+ // Step 4 and/or 5.
+ // TODO: differentiate between the two steps,
+ // where step 5 requires setting an `aborted` flag on the fetch.
+ let _ = fetch_terminated.send(true);
+ sink.close();
+ return;
+ },
+ };
devtools_bytes.lock().unwrap().append(&mut bytes.clone());
- HANDLE.lock().unwrap().as_mut().unwrap().spawn(
- // Step 5.1.2.2
- // Transmit a chunk over the network(and blocking until this is done).
- sender
- .send(bytes)
- .map(move |_| {
- // Step 5.1.2.3
- // Request the next chunk.
- let _ = chunk_requester.send(BodyChunkRequest::Chunk);
- ()
- })
- .map_err(|_| ()),
- );
+ // Step 5.1.2.2, transmit chunk over the network,
+ // currently implemented by sending the bytes to the fetch worker.
+ sink.transmit_bytes(bytes);
+
+ // Step 5.1.2.3
+ // Request the next chunk.
+ let _ = chunk_requester.send(BodyChunkRequest::Chunk);
}),
);
+ let body = match stream {
+ BodyStream::Chunked(receiver) => Body::wrap_stream(receiver),
+ BodyStream::Buffered(receiver) => {
+ // Accumulate bytes received over IPC into a vector.
+ let mut body = vec![];
+ loop {
+ match receiver.recv() {
+ Ok(BodyChunk::Chunk(mut bytes)) => {
+ body.append(&mut bytes);
+ },
+ Ok(BodyChunk::Done) => break,
+ Err(_) => warn!("Failed to read all chunks from request body."),
+ }
+ }
+ body.into()
+ },
+ };
+
HyperRequest::builder()
.method(method)
.uri(encoded_url)
- .body(Body::wrap_stream(receiver))
+ .body(body)
} else {
HyperRequest::builder()
.method(method)
@@ -1566,16 +1680,26 @@ fn http_network_fetch(
// do not. Once we support other kinds of fetches we'll need to be more fine grained here
// since things like image fetches are classified differently by devtools
let is_xhr = request.destination == Destination::None;
+
+ // The receiver will receive true if there has been an error streaming the request body.
+ let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded();
+
let response_future = obtain_response(
&context.state.client,
&url,
&request.method,
- &request.headers,
- request.body.as_mut().map(|body| body.take_stream()),
+ &mut request.headers,
+ request.body.as_ref().map(|body| body.take_stream()),
+ request
+ .body
+ .as_ref()
+ .map(|body| body.source_is_null())
+ .unwrap_or(false),
&request.pipeline_id,
request_id.as_ref().map(Deref::deref),
is_xhr,
context,
+ fetch_terminated_sender,
);
let pipeline_id = request.pipeline_id;
@@ -1585,6 +1709,22 @@ fn http_network_fetch(
Err(error) => return Response::network_error(error),
};
+ // Check if there was an error while streaming the request body.
+ //
+ // It's ok to block on the receiver,
+ // since we're already blocking on the response future above,
+ // so we can be sure that the request has already been processed,
+ // and a message is in the channel(or soon will be).
+ match fetch_terminated_receiver.recv() {
+ Ok(true) => {
+ return Response::network_error(NetworkError::Internal(
+ "Request body streaming failed.".into(),
+ ));
+ },
+ Ok(false) => {},
+ Err(_) => warn!("Failed to receive confirmation request was streamed without error."),
+ }
+
if log_enabled!(log::Level::Info) {
info!("{:?} response for {}", res.version(), url);
for header in res.headers().iter() {
diff --git a/components/net/tests/http_loader.rs b/components/net/tests/http_loader.rs
index 0e4c401d3dc..48f18560d54 100644
--- a/components/net/tests/http_loader.rs
+++ b/components/net/tests/http_loader.rs
@@ -33,8 +33,8 @@ use net::http_loader::determine_request_referrer;
use net::resource_thread::AuthCacheEntry;
use net::test::replace_host_table;
use net_traits::request::{
- BodyChunkRequest, BodySource, CredentialsMode, Destination, RequestBody, RequestBuilder,
- RequestMode,
+ BodyChunkRequest, BodyChunkResponse, BodySource, CredentialsMode, Destination, RequestBody,
+ RequestBuilder, RequestMode,
};
use net_traits::response::{HttpsState, ResponseBody};
use net_traits::{CookieSource, NetworkError, ReferrerPolicy};
@@ -108,7 +108,8 @@ fn create_request_body_with_content(content: Vec<u8>) -> RequestBody {
Box::new(move |message| {
let request = message.to().unwrap();
if let BodyChunkRequest::Connect(sender) = request {
- let _ = sender.send(content.clone());
+ let _ = sender.send(BodyChunkResponse::Chunk(content.clone()));
+ let _ = sender.send(BodyChunkResponse::Done);
}
}),
);
diff --git a/components/net_traits/request.rs b/components/net_traits/request.rs
index f80b82b0fca..c15063c7a70 100644
--- a/components/net_traits/request.rs
+++ b/components/net_traits/request.rs
@@ -124,16 +124,33 @@ pub enum BodySource {
}
/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
+/// which are sent from script to net.
+#[derive(Debug, Deserialize, Serialize)]
+pub enum BodyChunkResponse {
+ /// A chunk of bytes.
+ Chunk(Vec<u8>),
+ /// The body is done.
+ Done,
+ /// There was an error streaming the body,
+ /// terminate fetch.
+ Error,
+}
+
+/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
+/// which are sent from net to script
+/// (with the exception of Done, which is sent from script to script).
#[derive(Debug, Deserialize, Serialize)]
pub enum BodyChunkRequest {
/// Connect a fetch in `net`, with a stream of bytes from `script`.
- Connect(IpcSender<Vec<u8>>),
+ Connect(IpcSender<BodyChunkResponse>),
/// Re-extract a new stream from the source, following a redirect.
Extract(IpcReceiver<BodyChunkRequest>),
/// Ask for another chunk.
Chunk,
- /// Signal the stream is done.
+ /// Signal the stream is done(sent from script to script).
Done,
+ /// Signal the stream has errored(sent from script to script).
+ Error,
}
/// The net component's view into <https://fetch.spec.whatwg.org/#bodies>
@@ -173,7 +190,7 @@ impl RequestBody {
}
}
- pub fn take_stream(&mut self) -> IpcSender<BodyChunkRequest> {
+ pub fn take_stream(&self) -> IpcSender<BodyChunkRequest> {
self.chan.clone()
}
diff --git a/components/script/body.rs b/components/script/body.rs
index ce7113f9807..b7436c3e5e0 100644
--- a/components/script/body.rs
+++ b/components/script/body.rs
@@ -40,7 +40,9 @@ use js::rust::wrappers::JS_ParseJSON;
use js::rust::HandleValue;
use js::typedarray::{ArrayBuffer, CreateWith};
use mime::{self, Mime};
-use net_traits::request::{BodyChunkRequest, BodySource as NetBodySource, RequestBody};
+use net_traits::request::{
+ BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody,
+};
use script_traits::serializable::BlobImpl;
use std::ptr;
use std::rc::Rc;
@@ -49,7 +51,7 @@ use url::form_urlencoded;
/// The Dom object, or ReadableStream, that is the source of a body.
/// <https://fetch.spec.whatwg.org/#concept-body-source>
-#[derive(Clone)]
+#[derive(Clone, PartialEq)]
pub enum BodySource {
/// A ReadableStream comes with a null-source.
Null,
@@ -59,6 +61,14 @@ pub enum BodySource {
Object,
}
+/// The reason to stop reading from the body.
+enum StopReading {
+ /// The stream has errored.
+ Error,
+ /// The stream is done.
+ Done,
+}
+
/// The IPC route handler
/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
/// This route runs in the script process,
@@ -69,7 +79,7 @@ struct TransmitBodyConnectHandler {
stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource,
canceller: TaskCanceller,
- bytes_sender: Option<IpcSender<Vec<u8>>>,
+ bytes_sender: Option<IpcSender<BodyChunkResponse>>,
control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>,
in_memory_done: bool,
@@ -123,7 +133,14 @@ impl TransmitBodyConnectHandler {
BodyChunkRequest::Chunk => body_handler.transmit_source(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
- BodyChunkRequest::Done => body_handler.stop_reading(),
+ BodyChunkRequest::Done => {
+ body_handler.stop_reading(StopReading::Done);
+ },
+ // Note: this is actually sent from this process
+ // by the TransmitBodyPromiseHandler when the stream errors.
+ BodyChunkRequest::Error => {
+ body_handler.stop_reading(StopReading::Error);
+ },
}
}),
);
@@ -138,7 +155,7 @@ impl TransmitBodyConnectHandler {
fn transmit_source(&mut self) {
if self.in_memory_done {
// Step 5.1.3
- self.stop_reading();
+ self.stop_reading(StopReading::Done);
return;
}
@@ -153,29 +170,58 @@ impl TransmitBodyConnectHandler {
.bytes_sender
.as_ref()
.expect("No bytes sender to transmit source.")
- .send(bytes.clone());
+ .send(BodyChunkResponse::Chunk(bytes.clone()));
return;
}
warn!("Re-directs for file-based Blobs not supported yet.");
}
/// Take the IPC sender sent by `net`, so we can send body chunks with it.
- fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) {
+ /// Also the entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
+ fn start_reading(&mut self, sender: IpcSender<BodyChunkResponse>) {
self.bytes_sender = Some(sender);
+
+ // If we're using an actual ReadableStream, acquire a reader for it.
+ if self.source == BodySource::Null {
+ let stream = self.stream.clone();
+ let _ = self.task_source.queue_with_canceller(
+ task!(start_reading_request_body_stream: move || {
+ // Step 1, Let body be request’s body.
+ let rooted_stream = stream.root();
+
+ // TODO: Step 2, If body is null.
+
+ // Step 3, get a reader for stream.
+ rooted_stream.start_reading().expect("Couldn't acquire a reader for the body stream.");
+
+ // Note: this algorithm continues when the first chunk is requested by `net`.
+ }),
+ &self.canceller,
+ );
+ }
}
/// Drop the IPC sender sent by `net`
- fn stop_reading(&mut self) {
- // Note: this should close the corresponding receiver,
- // and terminate the request stream in `net`.
- self.bytes_sender = None;
+ fn stop_reading(&mut self, reason: StopReading) {
+ let bytes_sender = self
+ .bytes_sender
+ .take()
+ .expect("Stop reading called multiple times on TransmitBodyConnectHandler.");
+ match reason {
+ StopReading::Error => {
+ let _ = bytes_sender.send(BodyChunkResponse::Error);
+ },
+ StopReading::Done => {
+ let _ = bytes_sender.send(BodyChunkResponse::Done);
+ },
+ }
}
- /// The entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
+ /// Step 4 and following of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn transmit_body_chunk(&mut self) {
if self.in_memory_done {
// Step 5.1.3
- self.stop_reading();
+ self.stop_reading(StopReading::Done);
return;
}
@@ -188,7 +234,7 @@ impl TransmitBodyConnectHandler {
// In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey.
if let Some(bytes) = self.in_memory.clone() {
- let _ = bytes_sender.send(bytes);
+ let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes));
// Mark this body as `done` so that we can stop reading in the next tick,
// matching the behavior of the promise-based flow
self.in_memory_done = true;
@@ -197,27 +243,9 @@ impl TransmitBodyConnectHandler {
let _ = self.task_source.queue_with_canceller(
task!(setup_native_body_promise_handler: move || {
- // Step 1, Let body be request’s body.
- //
- // TODO: We need the handle the body null case,
- // here assuming body is something and we have the corresponding stream.
let rooted_stream = stream.root();
let global = rooted_stream.global();
- // TODO: Step 2, If body is null,
- // then queue a fetch task on request to process request end-of-body
- // for request and abort these steps.
-
- // TODO: queuing those "process request ..." tasks means we also need a handle on Request here.
-
- // Step 3, get a reader for stream.
- if rooted_stream.start_reading().is_err() {
- // Note: this can happen if script starts consuming request body
- // before fetch starts transmitting it.
- // Not in the spec.
- return;
- }
-
// Step 4, the result of reading a chunk from body’s stream with reader.
let promise = rooted_stream.read_a_chunk();
@@ -225,14 +253,19 @@ impl TransmitBodyConnectHandler {
// are a combination of the promise native handler here,
// and the corresponding IPC route in `component::net::http_loader`.
let promise_handler = Box::new(TransmitBodyPromiseHandler {
- bytes_sender,
+ bytes_sender: bytes_sender.clone(),
stream: rooted_stream.clone(),
- control_sender,
+ control_sender: control_sender.clone(),
});
- let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {stream: rooted_stream});
+ let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {
+ bytes_sender,
+ stream: rooted_stream,
+ control_sender,
+ });
- let handler = PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
+ let handler =
+ PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
@@ -248,7 +281,7 @@ impl TransmitBodyConnectHandler {
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseHandler {
#[ignore_malloc_size_of = "Channels are hard"]
- bytes_sender: IpcSender<Vec<u8>>,
+ bytes_sender: IpcSender<BodyChunkResponse>,
stream: DomRoot<ReadableStream>,
#[ignore_malloc_size_of = "Channels are hard"]
control_sender: IpcSender<BodyChunkRequest>,
@@ -278,8 +311,7 @@ impl Callback for TransmitBodyPromiseHandler {
Ok(chunk) => chunk,
Err(_) => {
// Step 5.5, the "otherwise" steps.
- // TODO: terminate fetch.
- let _ = self.control_sender.send(BodyChunkRequest::Done);
+ let _ = self.control_sender.send(BodyChunkRequest::Error);
return self.stream.stop_reading();
},
};
@@ -287,7 +319,7 @@ impl Callback for TransmitBodyPromiseHandler {
// Step 5.1 and 5.2, transmit chunk.
// Send the chunk to the body transmitter in net::http_loader::obtain_response.
// TODO: queue a fetch task on request to process request body for request.
- let _ = self.bytes_sender.send(chunk);
+ let _ = self.bytes_sender.send(BodyChunkResponse::Chunk(chunk));
}
}
@@ -295,14 +327,18 @@ impl Callback for TransmitBodyPromiseHandler {
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseRejectionHandler {
+ #[ignore_malloc_size_of = "Channels are hard"]
+ bytes_sender: IpcSender<BodyChunkResponse>,
stream: DomRoot<ReadableStream>,
+ #[ignore_malloc_size_of = "Channels are hard"]
+ control_sender: IpcSender<BodyChunkRequest>,
}
impl Callback for TransmitBodyPromiseRejectionHandler {
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) {
// Step 5.4, the "rejection" steps.
- // TODO: terminate fetch.
+ let _ = self.control_sender.send(BodyChunkRequest::Error);
return self.stream.stop_reading();
}
}
@@ -376,7 +412,14 @@ impl ExtractedBody {
BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
- BodyChunkRequest::Done => body_handler.stop_reading(),
+ BodyChunkRequest::Done => {
+ body_handler.stop_reading(StopReading::Done);
+ },
+ // Note: this is actually sent from this process
+ // by the TransmitBodyPromiseHandler when the stream errors.
+ BodyChunkRequest::Error => {
+ body_handler.stop_reading(StopReading::Error);
+ },
}
}),
);
diff --git a/tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini b/tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini
index 40a2537c13c..209ec09de2b 100644
--- a/tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini
+++ b/tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini
@@ -1,41 +1,9 @@
[request-upload.any.html]
- type: testharness
[Fetch with POST with ReadableStream]
expected: FAIL
- [Fetch with POST with ReadableStream containing String]
- expected: FAIL
-
- [Fetch with POST with ReadableStream containing null]
- expected: FAIL
-
- [Fetch with POST with ReadableStream containing number]
- expected: FAIL
-
- [Fetch with POST with ReadableStream containing ArrayBuffer]
- expected: FAIL
-
- [Fetch with POST with ReadableStream containing Blob]
- expected: FAIL
-
[request-upload.any.worker.html]
- type: testharness
[Fetch with POST with ReadableStream]
expected: FAIL
- [Fetch with POST with ReadableStream containing String]
- expected: FAIL
-
- [Fetch with POST with ReadableStream containing null]
- expected: FAIL
-
- [Fetch with POST with ReadableStream containing number]
- expected: FAIL
-
- [Fetch with POST with ReadableStream containing ArrayBuffer]
- expected: FAIL
-
- [Fetch with POST with ReadableStream containing Blob]
- expected: FAIL
-