aboutsummaryrefslogtreecommitdiffstats
path: root/components/net
diff options
context:
space:
mode:
Diffstat (limited to 'components/net')
-rw-r--r--components/net/fetch/methods.rs8
-rw-r--r--components/net/http_loader.rs194
-rw-r--r--components/net/tests/http_loader.rs7
3 files changed, 177 insertions, 32 deletions
diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs
index c25ab10cbe6..1aabb545354 100644
--- a/components/net/fetch/methods.rs
+++ b/components/net/fetch/methods.rs
@@ -23,7 +23,8 @@ use net_traits::request::{
is_cors_safelisted_method, is_cors_safelisted_request_header, Origin, ResponseTainting, Window,
};
use net_traits::request::{
- BodyChunkRequest, CredentialsMode, Destination, Referrer, Request, RequestMode,
+ BodyChunkRequest, BodyChunkResponse, CredentialsMode, Destination, Referrer, Request,
+ RequestMode,
};
use net_traits::response::{Response, ResponseBody, ResponseType};
use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy, ResourceFetchTiming};
@@ -641,7 +642,10 @@ fn scheme_fetch(
let (body_chan, body_port) = ipc::channel().unwrap();
let _ = stream.send(BodyChunkRequest::Connect(body_chan));
let _ = stream.send(BodyChunkRequest::Chunk);
- body_port.recv().ok()
+ match body_port.recv().ok() {
+ Some(BodyChunkResponse::Chunk(bytes)) => Some(bytes),
+ _ => panic!("cert should be sent in a single chunk."),
+ }
});
let data = data.as_ref().and_then(|b| {
let idx = b.iter().position(|b| *b == b'&')?;
diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs
index a8dfaecc422..33a6e4415e3 100644
--- a/components/net/http_loader.rs
+++ b/components/net/http_loader.rs
@@ -11,7 +11,7 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target}
use crate::hsts::HstsList;
use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache;
-use crossbeam_channel::{unbounded, Sender};
+use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::{
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
};
@@ -30,6 +30,7 @@ use http::header::{
CONTENT_TYPE,
};
use http::{HeaderMap, Request as HyperRequest};
+use hyper::header::TRANSFER_ENCODING;
use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode};
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcSender};
@@ -40,7 +41,8 @@ use net_traits::quality::{quality_to_value, Quality, QualityItem};
use net_traits::request::Origin::Origin as SpecificOrigin;
use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header};
use net_traits::request::{
- BodyChunkRequest, RedirectMode, Referrer, Request, RequestBuilder, RequestMode,
+ BodyChunkRequest, BodyChunkResponse, RedirectMode, Referrer, Request, RequestBuilder,
+ RequestMode,
};
use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin};
use net_traits::request::{ResponseTainting, ServiceWorkersMode};
@@ -61,7 +63,7 @@ use std::time::{Duration, SystemTime};
use time::{self, Tm};
use tokio::prelude::{future, Future, Sink, Stream};
use tokio::runtime::Runtime;
-use tokio::sync::mpsc::channel;
+use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender};
lazy_static! {
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
@@ -405,16 +407,89 @@ fn auth_from_cache(
}
}
+/// Messages from the IPC route to the fetch worker,
+/// used to fill the body with bytes coming-in over IPC.
+enum BodyChunk {
+ /// A chunk of bytes.
+ Chunk(Vec<u8>),
+ /// Body is done.
+ Done,
+}
+
+/// The stream side of the body passed to hyper.
+enum BodyStream {
+ /// A receiver that can be used in Body::wrap_stream,
+ /// for streaming the request over the network.
+ Chunked(TokioReceiver<Vec<u8>>),
+ /// A body whose bytes are buffered
+ /// and sent in one chunk over the network.
+ Buffered(Receiver<BodyChunk>),
+}
+
+/// The sink side of the body passed to hyper,
+/// used to enqueue chunks.
+enum BodySink {
+ /// A Tokio sender used to feed chunks to the network stream.
+ Chunked(TokioSender<Vec<u8>>),
+ /// A Crossbeam sender used to send chunks to the fetch worker,
+ /// where they will be buffered
+ /// in order to ensure they are not streamed them over the network.
+ Buffered(Sender<BodyChunk>),
+}
+
+impl BodySink {
+ pub fn transmit_bytes(&self, bytes: Vec<u8>) {
+ match self {
+ BodySink::Chunked(ref sender) => {
+ let sender = sender.clone();
+ HANDLE
+ .lock()
+ .unwrap()
+ .as_mut()
+ .unwrap()
+ .spawn(sender.send(bytes).map(|_| ()).map_err(|_| ()));
+ },
+ BodySink::Buffered(ref sender) => {
+ let _ = sender.send(BodyChunk::Chunk(bytes));
+ },
+ }
+ }
+
+ pub fn close(&self) {
+ match self {
+ BodySink::Chunked(ref sender) => {
+ let mut sender = sender.clone();
+ HANDLE
+ .lock()
+ .unwrap()
+ .as_mut()
+ .unwrap()
+ .spawn(future::lazy(move || {
+ if sender.close().is_err() {
+ warn!("Failed to close network request sink.");
+ }
+ Ok(())
+ }));
+ },
+ BodySink::Buffered(ref sender) => {
+ let _ = sender.send(BodyChunk::Done);
+ },
+ }
+ }
+}
+
fn obtain_response(
client: &Client<Connector, Body>,
url: &ServoUrl,
method: &Method,
- request_headers: &HeaderMap,
+ request_headers: &mut HeaderMap,
body: Option<IpcSender<BodyChunkRequest>>,
+ source_is_null: bool,
pipeline_id: &Option<PipelineId>,
request_id: Option<&str>,
is_xhr: bool,
context: &FetchContext,
+ fetch_terminated: Sender<bool>,
) -> Box<
dyn Future<
Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>),
@@ -435,12 +510,25 @@ fn obtain_response(
.replace("}", "%7D");
let request = if let Some(chunk_requester) = body {
- // TODO: If body is a stream, append `Transfer-Encoding`/`chunked`,
- // see step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
+ let (sink, stream) = if source_is_null {
+ // Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
+ // TODO: this should not be set for HTTP/2(currently not supported?).
+ request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
- let (body_chan, body_port) = ipc::channel().unwrap();
+ let (sender, receiver) = channel(1);
+ (BodySink::Chunked(sender), BodyStream::Chunked(receiver))
+ } else {
+ // Note: Hyper seems to already buffer bytes when the request appears not stream-able,
+ // see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104
+ //
+ // However since this doesn't appear documented, and we're using an ancient version,
+ // for now we buffer manually to ensure we don't stream requests
+ // to servers that might not know how to handle them.
+ let (sender, receiver) = unbounded();
+ (BodySink::Buffered(sender), BodyStream::Buffered(receiver))
+ };
- let (sender, receiver) = channel(1);
+ let (body_chan, body_port) = ipc::channel().unwrap();
let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan));
@@ -453,32 +541,58 @@ fn obtain_response(
ROUTER.add_route(
body_port.to_opaque(),
Box::new(move |message| {
- let bytes: Vec<u8> = message.to().unwrap();
- let chunk_requester = chunk_requester.clone();
- let sender = sender.clone();
+ let bytes: Vec<u8> = match message.to().unwrap() {
+ BodyChunkResponse::Chunk(bytes) => bytes,
+ BodyChunkResponse::Done => {
+ // Step 3, abort these parallel steps.
+ let _ = fetch_terminated.send(false);
+ sink.close();
+ return;
+ },
+ BodyChunkResponse::Error => {
+ // Step 4 and/or 5.
+ // TODO: differentiate between the two steps,
+ // where step 5 requires setting an `aborted` flag on the fetch.
+ let _ = fetch_terminated.send(true);
+ sink.close();
+ return;
+ },
+ };
devtools_bytes.lock().unwrap().append(&mut bytes.clone());
- HANDLE.lock().unwrap().as_mut().unwrap().spawn(
- // Step 5.1.2.2
- // Transmit a chunk over the network(and blocking until this is done).
- sender
- .send(bytes)
- .map(move |_| {
- // Step 5.1.2.3
- // Request the next chunk.
- let _ = chunk_requester.send(BodyChunkRequest::Chunk);
- ()
- })
- .map_err(|_| ()),
- );
+ // Step 5.1.2.2, transmit chunk over the network,
+ // currently implemented by sending the bytes to the fetch worker.
+ sink.transmit_bytes(bytes);
+
+ // Step 5.1.2.3
+ // Request the next chunk.
+ let _ = chunk_requester.send(BodyChunkRequest::Chunk);
}),
);
+ let body = match stream {
+ BodyStream::Chunked(receiver) => Body::wrap_stream(receiver),
+ BodyStream::Buffered(receiver) => {
+ // Accumulate bytes received over IPC into a vector.
+ let mut body = vec![];
+ loop {
+ match receiver.recv() {
+ Ok(BodyChunk::Chunk(mut bytes)) => {
+ body.append(&mut bytes);
+ },
+ Ok(BodyChunk::Done) => break,
+ Err(_) => warn!("Failed to read all chunks from request body."),
+ }
+ }
+ body.into()
+ },
+ };
+
HyperRequest::builder()
.method(method)
.uri(encoded_url)
- .body(Body::wrap_stream(receiver))
+ .body(body)
} else {
HyperRequest::builder()
.method(method)
@@ -1566,16 +1680,26 @@ fn http_network_fetch(
// do not. Once we support other kinds of fetches we'll need to be more fine grained here
// since things like image fetches are classified differently by devtools
let is_xhr = request.destination == Destination::None;
+
+ // The receiver will receive true if there has been an error streaming the request body.
+ let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded();
+
let response_future = obtain_response(
&context.state.client,
&url,
&request.method,
- &request.headers,
- request.body.as_mut().map(|body| body.take_stream()),
+ &mut request.headers,
+ request.body.as_ref().map(|body| body.take_stream()),
+ request
+ .body
+ .as_ref()
+ .map(|body| body.source_is_null())
+ .unwrap_or(false),
&request.pipeline_id,
request_id.as_ref().map(Deref::deref),
is_xhr,
context,
+ fetch_terminated_sender,
);
let pipeline_id = request.pipeline_id;
@@ -1585,6 +1709,22 @@ fn http_network_fetch(
Err(error) => return Response::network_error(error),
};
+ // Check if there was an error while streaming the request body.
+ //
+ // It's ok to block on the receiver,
+ // since we're already blocking on the response future above,
+ // so we can be sure that the request has already been processed,
+ // and a message is in the channel(or soon will be).
+ match fetch_terminated_receiver.recv() {
+ Ok(true) => {
+ return Response::network_error(NetworkError::Internal(
+ "Request body streaming failed.".into(),
+ ));
+ },
+ Ok(false) => {},
+ Err(_) => warn!("Failed to receive confirmation request was streamed without error."),
+ }
+
if log_enabled!(log::Level::Info) {
info!("{:?} response for {}", res.version(), url);
for header in res.headers().iter() {
diff --git a/components/net/tests/http_loader.rs b/components/net/tests/http_loader.rs
index 0e4c401d3dc..48f18560d54 100644
--- a/components/net/tests/http_loader.rs
+++ b/components/net/tests/http_loader.rs
@@ -33,8 +33,8 @@ use net::http_loader::determine_request_referrer;
use net::resource_thread::AuthCacheEntry;
use net::test::replace_host_table;
use net_traits::request::{
- BodyChunkRequest, BodySource, CredentialsMode, Destination, RequestBody, RequestBuilder,
- RequestMode,
+ BodyChunkRequest, BodyChunkResponse, BodySource, CredentialsMode, Destination, RequestBody,
+ RequestBuilder, RequestMode,
};
use net_traits::response::{HttpsState, ResponseBody};
use net_traits::{CookieSource, NetworkError, ReferrerPolicy};
@@ -108,7 +108,8 @@ fn create_request_body_with_content(content: Vec<u8>) -> RequestBody {
Box::new(move |message| {
let request = message.to().unwrap();
if let BodyChunkRequest::Connect(sender) = request {
- let _ = sender.send(content.clone());
+ let _ = sender.send(BodyChunkResponse::Chunk(content.clone()));
+ let _ = sender.send(BodyChunkResponse::Done);
}
}),
);