diff options
Diffstat (limited to 'components/net')
-rw-r--r-- | components/net/fetch/methods.rs | 8 | ||||
-rw-r--r-- | components/net/http_loader.rs | 194 | ||||
-rw-r--r-- | components/net/tests/http_loader.rs | 7 |
3 files changed, 177 insertions, 32 deletions
diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index c25ab10cbe6..1aabb545354 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -23,7 +23,8 @@ use net_traits::request::{ is_cors_safelisted_method, is_cors_safelisted_request_header, Origin, ResponseTainting, Window, }; use net_traits::request::{ - BodyChunkRequest, CredentialsMode, Destination, Referrer, Request, RequestMode, + BodyChunkRequest, BodyChunkResponse, CredentialsMode, Destination, Referrer, Request, + RequestMode, }; use net_traits::response::{Response, ResponseBody, ResponseType}; use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy, ResourceFetchTiming}; @@ -641,7 +642,10 @@ fn scheme_fetch( let (body_chan, body_port) = ipc::channel().unwrap(); let _ = stream.send(BodyChunkRequest::Connect(body_chan)); let _ = stream.send(BodyChunkRequest::Chunk); - body_port.recv().ok() + match body_port.recv().ok() { + Some(BodyChunkResponse::Chunk(bytes)) => Some(bytes), + _ => panic!("cert should be sent in a single chunk."), + } }); let data = data.as_ref().and_then(|b| { let idx = b.iter().position(|b| *b == b'&')?; diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index a8dfaecc422..33a6e4415e3 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -11,7 +11,7 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target} use crate::hsts::HstsList; use crate::http_cache::{CacheKey, HttpCache}; use crate::resource_thread::AuthCache; -use crossbeam_channel::{unbounded, Sender}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use devtools_traits::{ ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest, }; @@ -30,6 +30,7 @@ use http::header::{ CONTENT_TYPE, }; use http::{HeaderMap, Request as HyperRequest}; +use hyper::header::TRANSFER_ENCODING; use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode}; use hyper_serde::Serde; use ipc_channel::ipc::{self, IpcSender}; @@ -40,7 +41,8 @@ use net_traits::quality::{quality_to_value, Quality, QualityItem}; use net_traits::request::Origin::Origin as SpecificOrigin; use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header}; use net_traits::request::{ - BodyChunkRequest, RedirectMode, Referrer, Request, RequestBuilder, RequestMode, + BodyChunkRequest, BodyChunkResponse, RedirectMode, Referrer, Request, RequestBuilder, + RequestMode, }; use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin}; use net_traits::request::{ResponseTainting, ServiceWorkersMode}; @@ -61,7 +63,7 @@ use std::time::{Duration, SystemTime}; use time::{self, Tm}; use tokio::prelude::{future, Future, Sink, Stream}; use tokio::runtime::Runtime; -use tokio::sync::mpsc::channel; +use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender}; lazy_static! { pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap())); @@ -405,16 +407,89 @@ fn auth_from_cache( } } +/// Messages from the IPC route to the fetch worker, +/// used to fill the body with bytes coming-in over IPC. +enum BodyChunk { + /// A chunk of bytes. + Chunk(Vec<u8>), + /// Body is done. + Done, +} + +/// The stream side of the body passed to hyper. +enum BodyStream { + /// A receiver that can be used in Body::wrap_stream, + /// for streaming the request over the network. + Chunked(TokioReceiver<Vec<u8>>), + /// A body whose bytes are buffered + /// and sent in one chunk over the network. + Buffered(Receiver<BodyChunk>), +} + +/// The sink side of the body passed to hyper, +/// used to enqueue chunks. +enum BodySink { + /// A Tokio sender used to feed chunks to the network stream. + Chunked(TokioSender<Vec<u8>>), + /// A Crossbeam sender used to send chunks to the fetch worker, + /// where they will be buffered + /// in order to ensure they are not streamed them over the network. + Buffered(Sender<BodyChunk>), +} + +impl BodySink { + pub fn transmit_bytes(&self, bytes: Vec<u8>) { + match self { + BodySink::Chunked(ref sender) => { + let sender = sender.clone(); + HANDLE + .lock() + .unwrap() + .as_mut() + .unwrap() + .spawn(sender.send(bytes).map(|_| ()).map_err(|_| ())); + }, + BodySink::Buffered(ref sender) => { + let _ = sender.send(BodyChunk::Chunk(bytes)); + }, + } + } + + pub fn close(&self) { + match self { + BodySink::Chunked(ref sender) => { + let mut sender = sender.clone(); + HANDLE + .lock() + .unwrap() + .as_mut() + .unwrap() + .spawn(future::lazy(move || { + if sender.close().is_err() { + warn!("Failed to close network request sink."); + } + Ok(()) + })); + }, + BodySink::Buffered(ref sender) => { + let _ = sender.send(BodyChunk::Done); + }, + } + } +} + fn obtain_response( client: &Client<Connector, Body>, url: &ServoUrl, method: &Method, - request_headers: &HeaderMap, + request_headers: &mut HeaderMap, body: Option<IpcSender<BodyChunkRequest>>, + source_is_null: bool, pipeline_id: &Option<PipelineId>, request_id: Option<&str>, is_xhr: bool, context: &FetchContext, + fetch_terminated: Sender<bool>, ) -> Box< dyn Future< Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>), @@ -435,12 +510,25 @@ fn obtain_response( .replace("}", "%7D"); let request = if let Some(chunk_requester) = body { - // TODO: If body is a stream, append `Transfer-Encoding`/`chunked`, - // see step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch + let (sink, stream) = if source_is_null { + // Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch + // TODO: this should not be set for HTTP/2(currently not supported?). + request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - let (body_chan, body_port) = ipc::channel().unwrap(); + let (sender, receiver) = channel(1); + (BodySink::Chunked(sender), BodyStream::Chunked(receiver)) + } else { + // Note: Hyper seems to already buffer bytes when the request appears not stream-able, + // see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104 + // + // However since this doesn't appear documented, and we're using an ancient version, + // for now we buffer manually to ensure we don't stream requests + // to servers that might not know how to handle them. + let (sender, receiver) = unbounded(); + (BodySink::Buffered(sender), BodyStream::Buffered(receiver)) + }; - let (sender, receiver) = channel(1); + let (body_chan, body_port) = ipc::channel().unwrap(); let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan)); @@ -453,32 +541,58 @@ fn obtain_response( ROUTER.add_route( body_port.to_opaque(), Box::new(move |message| { - let bytes: Vec<u8> = message.to().unwrap(); - let chunk_requester = chunk_requester.clone(); - let sender = sender.clone(); + let bytes: Vec<u8> = match message.to().unwrap() { + BodyChunkResponse::Chunk(bytes) => bytes, + BodyChunkResponse::Done => { + // Step 3, abort these parallel steps. + let _ = fetch_terminated.send(false); + sink.close(); + return; + }, + BodyChunkResponse::Error => { + // Step 4 and/or 5. + // TODO: differentiate between the two steps, + // where step 5 requires setting an `aborted` flag on the fetch. + let _ = fetch_terminated.send(true); + sink.close(); + return; + }, + }; devtools_bytes.lock().unwrap().append(&mut bytes.clone()); - HANDLE.lock().unwrap().as_mut().unwrap().spawn( - // Step 5.1.2.2 - // Transmit a chunk over the network(and blocking until this is done). - sender - .send(bytes) - .map(move |_| { - // Step 5.1.2.3 - // Request the next chunk. - let _ = chunk_requester.send(BodyChunkRequest::Chunk); - () - }) - .map_err(|_| ()), - ); + // Step 5.1.2.2, transmit chunk over the network, + // currently implemented by sending the bytes to the fetch worker. + sink.transmit_bytes(bytes); + + // Step 5.1.2.3 + // Request the next chunk. + let _ = chunk_requester.send(BodyChunkRequest::Chunk); }), ); + let body = match stream { + BodyStream::Chunked(receiver) => Body::wrap_stream(receiver), + BodyStream::Buffered(receiver) => { + // Accumulate bytes received over IPC into a vector. + let mut body = vec![]; + loop { + match receiver.recv() { + Ok(BodyChunk::Chunk(mut bytes)) => { + body.append(&mut bytes); + }, + Ok(BodyChunk::Done) => break, + Err(_) => warn!("Failed to read all chunks from request body."), + } + } + body.into() + }, + }; + HyperRequest::builder() .method(method) .uri(encoded_url) - .body(Body::wrap_stream(receiver)) + .body(body) } else { HyperRequest::builder() .method(method) @@ -1566,16 +1680,26 @@ fn http_network_fetch( // do not. Once we support other kinds of fetches we'll need to be more fine grained here // since things like image fetches are classified differently by devtools let is_xhr = request.destination == Destination::None; + + // The receiver will receive true if there has been an error streaming the request body. + let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded(); + let response_future = obtain_response( &context.state.client, &url, &request.method, - &request.headers, - request.body.as_mut().map(|body| body.take_stream()), + &mut request.headers, + request.body.as_ref().map(|body| body.take_stream()), + request + .body + .as_ref() + .map(|body| body.source_is_null()) + .unwrap_or(false), &request.pipeline_id, request_id.as_ref().map(Deref::deref), is_xhr, context, + fetch_terminated_sender, ); let pipeline_id = request.pipeline_id; @@ -1585,6 +1709,22 @@ fn http_network_fetch( Err(error) => return Response::network_error(error), }; + // Check if there was an error while streaming the request body. + // + // It's ok to block on the receiver, + // since we're already blocking on the response future above, + // so we can be sure that the request has already been processed, + // and a message is in the channel(or soon will be). + match fetch_terminated_receiver.recv() { + Ok(true) => { + return Response::network_error(NetworkError::Internal( + "Request body streaming failed.".into(), + )); + }, + Ok(false) => {}, + Err(_) => warn!("Failed to receive confirmation request was streamed without error."), + } + if log_enabled!(log::Level::Info) { info!("{:?} response for {}", res.version(), url); for header in res.headers().iter() { diff --git a/components/net/tests/http_loader.rs b/components/net/tests/http_loader.rs index 0e4c401d3dc..48f18560d54 100644 --- a/components/net/tests/http_loader.rs +++ b/components/net/tests/http_loader.rs @@ -33,8 +33,8 @@ use net::http_loader::determine_request_referrer; use net::resource_thread::AuthCacheEntry; use net::test::replace_host_table; use net_traits::request::{ - BodyChunkRequest, BodySource, CredentialsMode, Destination, RequestBody, RequestBuilder, - RequestMode, + BodyChunkRequest, BodyChunkResponse, BodySource, CredentialsMode, Destination, RequestBody, + RequestBuilder, RequestMode, }; use net_traits::response::{HttpsState, ResponseBody}; use net_traits::{CookieSource, NetworkError, ReferrerPolicy}; @@ -108,7 +108,8 @@ fn create_request_body_with_content(content: Vec<u8>) -> RequestBody { Box::new(move |message| { let request = message.to().unwrap(); if let BodyChunkRequest::Connect(sender) = request { - let _ = sender.send(content.clone()); + let _ = sender.send(BodyChunkResponse::Chunk(content.clone())); + let _ = sender.send(BodyChunkResponse::Done); } }), ); |