diff options
author | Manish Goregaokar <manishsmail@gmail.com> | 2016-06-03 18:37:26 +0530 |
---|---|---|
committer | Manish Goregaokar <manishsmail@gmail.com> | 2016-06-10 20:53:40 +0530 |
commit | bf99e73cb0dfb9a5977bfe2a1de779390d013e8b (patch) | |
tree | add59d54660d03738b376b8ca22df2290bad1fdc | |
parent | 6e29b872d7fb8556ea7dd610e82868b6f719f24e (diff) | |
download | servo-bf99e73cb0dfb9a5977bfe2a1de779390d013e8b.tar.gz servo-bf99e73cb0dfb9a5977bfe2a1de779390d013e8b.zip |
Re-add support for fetching chunks (and thus xhr download progress)
-rw-r--r-- | components/net/fetch/methods.rs | 82 | ||||
-rw-r--r-- | components/net/resource_thread.rs | 4 | ||||
-rw-r--r-- | components/net_traits/lib.rs | 69 | ||||
-rw-r--r-- | components/script/dom/xmlhttprequest.rs | 26 | ||||
-rw-r--r-- | tests/unit/net/fetch.rs | 2 |
5 files changed, 96 insertions, 87 deletions
diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index d2bab7b16ad..49a4180e03f 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -28,6 +28,7 @@ use std::collections::HashSet; use std::fs::File; use std::io::Read; use std::iter::FromIterator; +use std::mem::swap; use std::rc::Rc; use std::sync::mpsc::{channel, Sender, Receiver}; use unicase::UniCase; @@ -36,7 +37,12 @@ use util::thread::spawn_named; pub type Target = Option<Box<FetchTaskTarget + Send>>; -type DoneChannel = Option<(Sender<()>, Receiver<()>)>; +enum Data { + Payload(Vec<u8>), + Done, +} + +type DoneChannel = Option<(Sender<Data>, Receiver<Data>)>; /// [Fetch](https://fetch.spec.whatwg.org#concept-fetch) pub fn fetch(request: Rc<Request>, target: &mut Target, state: HttpState) -> Response { @@ -258,8 +264,38 @@ fn main_fetch(request: Rc<Request>, cache: &mut CORSCache, cors_flag: bool, // Step 18 if request.synchronous { + if let Some(ref mut target) = *target { + // process_response is not supposed to be used + // by sync fetch, but we overload it here for simplicity + target.process_response(&response); + } + if let Some(ref ch) = *done_chan { - let _ = ch.1.recv(); + loop { + match ch.1.recv() + .expect("fetch worker should always send Done before terminating") { + Data::Payload(vec) => { + if let Some(ref mut target) = *target { + target.process_response_chunk(vec); + } + } + Data::Done => break, + } + } + } else { + if let ResponseBody::Done(ref vec) = *response.body.lock().unwrap() { + // in case there was no channel to wait for, the body was + // obtained synchronously via basic_fetch for data/file/about/etc + // We should still send the body across as a chunk + if let Some(ref mut target) = *target { + target.process_response_chunk(vec.clone()); + } + } + } + + // overloaded similarly to process_response + if let Some(ref mut target) = *target { + target.process_response_eof(&response); } return response; } @@ -283,7 +319,26 @@ fn main_fetch(request: Rc<Request>, cache: &mut CORSCache, cors_flag: bool, // Step 21 if let Some(ref ch) = *done_chan { - let _ = ch.1.recv(); + loop { + match ch.1.recv() + .expect("fetch worker should always send Done before terminating") { + Data::Payload(vec) => { + if let Some(ref mut target) = *target { + target.process_response_chunk(vec); + } + } + Data::Done => break, + } + } + } else { + if let Some(ref mut target) = *target { + if let ResponseBody::Done(ref vec) = *response.body.lock().unwrap() { + // in case there was no channel to wait for, the body was + // obtained synchronously via basic_fetch for data/file/about/etc + // We should still send the body across as a chunk + target.process_response_chunk(vec.clone()); + } + } } // Step 22 @@ -876,19 +931,28 @@ fn http_network_fetch(request: Rc<Request>, loop { match read_block(&mut res.response) { - Ok(ReadResult::Payload(ref mut chunk)) => { + Ok(ReadResult::Payload(chunk)) => { if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() { - body.append(chunk); + + body.extend_from_slice(&chunk); + if let Some(ref sender) = done_sender { + let _ = sender.send(Data::Payload(chunk)); + } } }, Ok(ReadResult::EOF) | Err(_) => { + let mut empty_vec = Vec::new(); let completed_body = match *res_body.lock().unwrap() { - ResponseBody::Receiving(ref body) => (*body).clone(), - _ => vec![] + ResponseBody::Receiving(ref mut body) => { + // avoid cloning the body + swap(body, &mut empty_vec); + empty_vec + }, + _ => empty_vec, }; *res_body.lock().unwrap() = ResponseBody::Done(completed_body); - if let Some(sender) = done_sender { - let _ = sender.send(()); + if let Some(ref sender) = done_sender { + let _ = sender.send(Data::Done); } break; } diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs index 18857e5005c..c7c6000844a 100644 --- a/components/net/resource_thread.rs +++ b/components/net/resource_thread.rs @@ -494,15 +494,13 @@ impl CoreResourceManager { blocked_content: BLOCKED_CONTENT_RULES.clone(), }; spawn_named(format!("fetch thread for {}", init.url), move || { - let sync = init.synchronous; let request = Request::from_init(init); // XXXManishearth: Check origin against pipeline id // todo load context / mimesniff in fetch // todo referrer policy? // todo service worker stuff let mut target = Some(Box::new(sender) as Box<FetchTaskTarget + Send + 'static>); - let response = fetch(Rc::new(request), &mut target, http_state); - target.unwrap().fetch_done(&response, sync); + fetch(Rc::new(request), &mut target, http_state); }) } diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs index 59656fbea9d..0f3cd6a34ac 100644 --- a/components/net_traits/lib.rs +++ b/components/net_traits/lib.rs @@ -37,7 +37,7 @@ use hyper::mime::{Attr, Mime}; use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; use msg::constellation_msg::{PipelineId, ReferrerPolicy}; use request::{Request, RequestInit}; -use response::{HttpsState, Response, ResponseBody}; +use response::{HttpsState, Response}; use std::io::Error as IOError; use std::sync::mpsc::Sender; use std::thread; @@ -164,8 +164,8 @@ pub enum FetchResponseMsg { ProcessRequestEOF, // todo: send more info about the response (or perhaps the entire Response) ProcessResponse(Result<Metadata, NetworkError>), - ProcessResponseEOF(Result<Option<Vec<u8>>, NetworkError>), - FetchDone(Result<(Metadata, Option<Vec<u8>>), NetworkError>), + ProcessResponseChunk(Vec<u8>), + ProcessResponseEOF(Result<(), NetworkError>), } pub trait FetchTaskTarget { @@ -184,22 +184,21 @@ pub trait FetchTaskTarget { /// Fired when headers are received fn process_response(&mut self, response: &Response); + /// Fired when a chunk of response content is received + fn process_response_chunk(&mut self, chunk: Vec<u8>); + /// https://fetch.spec.whatwg.org/#process-response-end-of-file /// /// Fired when the response is fully fetched fn process_response_eof(&mut self, response: &Response); - - /// Called when fetch terminates, useful for sync - fn fetch_done(&mut self, response: &Response, sync: bool); } pub trait FetchResponseListener { fn process_request_body(&mut self); fn process_request_eof(&mut self); fn process_response(&mut self, metadata: Result<Metadata, NetworkError>); - fn process_response_eof(&mut self, response: Result<Option<Vec<u8>>, NetworkError>); - - fn fetch_done(&mut self, response: Result<(Metadata, Option<Vec<u8>>), NetworkError>); + fn process_response_chunk(&mut self, chunk: Vec<u8>); + fn process_response_eof(&mut self, response: Result<(), NetworkError>); } impl FetchTaskTarget for IpcSender<FetchResponseMsg> { @@ -214,57 +213,17 @@ impl FetchTaskTarget for IpcSender<FetchResponseMsg> { fn process_response(&mut self, response: &Response) { let _ = self.send(FetchResponseMsg::ProcessResponse(response.metadata())); } + fn process_response_chunk(&mut self, chunk: Vec<u8>) { + let _ = self.send(FetchResponseMsg::ProcessResponseChunk(chunk)); + } fn process_response_eof(&mut self, response: &Response) { if response.is_network_error() { // todo: finer grained errors let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Err(NetworkError::Internal("Network error".into())))); + } else { + let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(()))); } - if let Ok(ref guard) = response.body.lock() { - match **guard { - ResponseBody::Done(ref vec) => { - let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(Some(vec.clone())))); - return; - } - ResponseBody::Empty => { - let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(None))); - return; - } - _ => () - } - } - - // If something goes wrong, log it instead of crashing the resource thread - let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Err(NetworkError::Internal("Incomplete body".into())))); - } - - fn fetch_done(&mut self, response: &Response, sync: bool) { - if !sync { - // fetch_done is only used by sync XHR, avoid pointless data cloning - return; - } - if response.is_network_error() { - // todo: finer grained errors - let _ = self.send(FetchResponseMsg::FetchDone(Err(NetworkError::Internal("Network error".into())))); - } - if let Ok(ref guard) = response.body.lock() { - match **guard { - ResponseBody::Done(ref vec) => { - let ret = response.metadata().map(|m| (m, Some(vec.clone()))); - let _ = self.send(FetchResponseMsg::FetchDone(ret)); - return; - } - ResponseBody::Empty => { - let ret = response.metadata().map(|m| (m, None)); - let _ = self.send(FetchResponseMsg::FetchDone(ret)); - return; - } - _ => () - } - } - - // If something goes wrong, log it instead of crashing the resource thread - let _ = self.send(FetchResponseMsg::FetchDone(Err(NetworkError::Internal("Incomplete body".into())))); } } @@ -315,8 +274,8 @@ impl<T: FetchResponseListener> Action<T> for FetchResponseMsg { FetchResponseMsg::ProcessRequestBody => listener.process_request_body(), FetchResponseMsg::ProcessRequestEOF => listener.process_request_eof(), FetchResponseMsg::ProcessResponse(meta) => listener.process_response(meta), + FetchResponseMsg::ProcessResponseChunk(data) => listener.process_response_chunk(data), FetchResponseMsg::ProcessResponseEOF(data) => listener.process_response_eof(data), - FetchResponseMsg::FetchDone(response) => listener.fetch_done(response), } } } diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 7425a2feff3..5a0866cc1bf 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -47,7 +47,7 @@ use net_traits::CoreResourceMsg::Fetch; use net_traits::trim_http_whitespace; use net_traits::{FetchResponseListener, Metadata, NetworkError, RequestSource}; use net_traits::{CoreResourceThread, LoadOrigin}; -use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode, Origin}; +use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode}; use network_listener::{NetworkListener, PreInvoke}; use parse::html::{ParseContext, parse_html}; use parse::xml::{self, parse_xml}; @@ -230,14 +230,13 @@ impl XMLHttpRequest { *self.sync_status.borrow_mut() = Some(rv); } } - fn process_response_eof(&mut self, response: Result<Option<Vec<u8>>, NetworkError>) { + fn process_response_chunk(&mut self, mut chunk: Vec<u8>) { + self.buf.borrow_mut().append(&mut chunk); + self.xhr.root().process_data_available(self.gen_id, self.buf.borrow().clone()); + } + fn process_response_eof(&mut self, response: Result<(), NetworkError>) { match response { - Ok(buf) => { - if let Some(buf) = buf { - *self.buf.borrow_mut() = buf; - // todo move to a process_chunk - self.xhr.root().process_data_available(self.gen_id, self.buf.borrow().clone()); - } + Ok(()) => { let rv = self.xhr.root().process_response_complete(self.gen_id, Ok(())); *self.sync_status.borrow_mut() = Some(rv); } @@ -247,17 +246,6 @@ impl XMLHttpRequest { } } } - fn fetch_done(&mut self, response: Result<(Metadata, Option<Vec<u8>>), NetworkError>) { - match response { - Ok(response) => { - self.process_response(Ok(response.0)); - self.process_response_eof(Ok(response.1)); - } - Err(err) => { - self.process_response_eof(Err(err)); - } - } - } } impl PreInvoke for XHRContext { diff --git a/tests/unit/net/fetch.rs b/tests/unit/net/fetch.rs index 3eb0ff78567..1439d6d42fe 100644 --- a/tests/unit/net/fetch.rs +++ b/tests/unit/net/fetch.rs @@ -41,11 +41,11 @@ impl FetchTaskTarget for FetchResponseCollector { fn process_request_body(&mut self, _: &Request) {} fn process_request_eof(&mut self, _: &Request) {} fn process_response(&mut self, _: &Response) {} + fn process_response_chunk(&mut self, _: Vec<u8>) {} /// Fired when the response is fully fetched fn process_response_eof(&mut self, response: &Response) { self.sender.send(response.clone()); } - fn fetch_done(&mut self, _: &Response, _: bool) {} } fn fetch_async(request: Request, target: Box<FetchTaskTarget + Send>) { |