aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorManish Goregaokar <manishsmail@gmail.com>2016-06-03 18:37:26 +0530
committerManish Goregaokar <manishsmail@gmail.com>2016-06-10 20:53:40 +0530
commitbf99e73cb0dfb9a5977bfe2a1de779390d013e8b (patch)
treeadd59d54660d03738b376b8ca22df2290bad1fdc
parent6e29b872d7fb8556ea7dd610e82868b6f719f24e (diff)
downloadservo-bf99e73cb0dfb9a5977bfe2a1de779390d013e8b.tar.gz
servo-bf99e73cb0dfb9a5977bfe2a1de779390d013e8b.zip
Re-add support for fetching chunks (and thus xhr download progress)
-rw-r--r--components/net/fetch/methods.rs82
-rw-r--r--components/net/resource_thread.rs4
-rw-r--r--components/net_traits/lib.rs69
-rw-r--r--components/script/dom/xmlhttprequest.rs26
-rw-r--r--tests/unit/net/fetch.rs2
5 files changed, 96 insertions, 87 deletions
diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs
index d2bab7b16ad..49a4180e03f 100644
--- a/components/net/fetch/methods.rs
+++ b/components/net/fetch/methods.rs
@@ -28,6 +28,7 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::Read;
use std::iter::FromIterator;
+use std::mem::swap;
use std::rc::Rc;
use std::sync::mpsc::{channel, Sender, Receiver};
use unicase::UniCase;
@@ -36,7 +37,12 @@ use util::thread::spawn_named;
pub type Target = Option<Box<FetchTaskTarget + Send>>;
-type DoneChannel = Option<(Sender<()>, Receiver<()>)>;
+enum Data {
+ Payload(Vec<u8>),
+ Done,
+}
+
+type DoneChannel = Option<(Sender<Data>, Receiver<Data>)>;
/// [Fetch](https://fetch.spec.whatwg.org#concept-fetch)
pub fn fetch(request: Rc<Request>, target: &mut Target, state: HttpState) -> Response {
@@ -258,8 +264,38 @@ fn main_fetch(request: Rc<Request>, cache: &mut CORSCache, cors_flag: bool,
// Step 18
if request.synchronous {
+ if let Some(ref mut target) = *target {
+ // process_response is not supposed to be used
+ // by sync fetch, but we overload it here for simplicity
+ target.process_response(&response);
+ }
+
if let Some(ref ch) = *done_chan {
- let _ = ch.1.recv();
+ loop {
+ match ch.1.recv()
+ .expect("fetch worker should always send Done before terminating") {
+ Data::Payload(vec) => {
+ if let Some(ref mut target) = *target {
+ target.process_response_chunk(vec);
+ }
+ }
+ Data::Done => break,
+ }
+ }
+ } else {
+ if let ResponseBody::Done(ref vec) = *response.body.lock().unwrap() {
+ // in case there was no channel to wait for, the body was
+ // obtained synchronously via basic_fetch for data/file/about/etc
+ // We should still send the body across as a chunk
+ if let Some(ref mut target) = *target {
+ target.process_response_chunk(vec.clone());
+ }
+ }
+ }
+
+ // overloaded similarly to process_response
+ if let Some(ref mut target) = *target {
+ target.process_response_eof(&response);
}
return response;
}
@@ -283,7 +319,26 @@ fn main_fetch(request: Rc<Request>, cache: &mut CORSCache, cors_flag: bool,
// Step 21
if let Some(ref ch) = *done_chan {
- let _ = ch.1.recv();
+ loop {
+ match ch.1.recv()
+ .expect("fetch worker should always send Done before terminating") {
+ Data::Payload(vec) => {
+ if let Some(ref mut target) = *target {
+ target.process_response_chunk(vec);
+ }
+ }
+ Data::Done => break,
+ }
+ }
+ } else {
+ if let Some(ref mut target) = *target {
+ if let ResponseBody::Done(ref vec) = *response.body.lock().unwrap() {
+ // in case there was no channel to wait for, the body was
+ // obtained synchronously via basic_fetch for data/file/about/etc
+ // We should still send the body across as a chunk
+ target.process_response_chunk(vec.clone());
+ }
+ }
}
// Step 22
@@ -876,19 +931,28 @@ fn http_network_fetch(request: Rc<Request>,
loop {
match read_block(&mut res.response) {
- Ok(ReadResult::Payload(ref mut chunk)) => {
+ Ok(ReadResult::Payload(chunk)) => {
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
- body.append(chunk);
+
+ body.extend_from_slice(&chunk);
+ if let Some(ref sender) = done_sender {
+ let _ = sender.send(Data::Payload(chunk));
+ }
}
},
Ok(ReadResult::EOF) | Err(_) => {
+ let mut empty_vec = Vec::new();
let completed_body = match *res_body.lock().unwrap() {
- ResponseBody::Receiving(ref body) => (*body).clone(),
- _ => vec![]
+ ResponseBody::Receiving(ref mut body) => {
+ // avoid cloning the body
+ swap(body, &mut empty_vec);
+ empty_vec
+ },
+ _ => empty_vec,
};
*res_body.lock().unwrap() = ResponseBody::Done(completed_body);
- if let Some(sender) = done_sender {
- let _ = sender.send(());
+ if let Some(ref sender) = done_sender {
+ let _ = sender.send(Data::Done);
}
break;
}
diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs
index 18857e5005c..c7c6000844a 100644
--- a/components/net/resource_thread.rs
+++ b/components/net/resource_thread.rs
@@ -494,15 +494,13 @@ impl CoreResourceManager {
blocked_content: BLOCKED_CONTENT_RULES.clone(),
};
spawn_named(format!("fetch thread for {}", init.url), move || {
- let sync = init.synchronous;
let request = Request::from_init(init);
// XXXManishearth: Check origin against pipeline id
// todo load context / mimesniff in fetch
// todo referrer policy?
// todo service worker stuff
let mut target = Some(Box::new(sender) as Box<FetchTaskTarget + Send + 'static>);
- let response = fetch(Rc::new(request), &mut target, http_state);
- target.unwrap().fetch_done(&response, sync);
+ fetch(Rc::new(request), &mut target, http_state);
})
}
diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs
index 59656fbea9d..0f3cd6a34ac 100644
--- a/components/net_traits/lib.rs
+++ b/components/net_traits/lib.rs
@@ -37,7 +37,7 @@ use hyper::mime::{Attr, Mime};
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use msg::constellation_msg::{PipelineId, ReferrerPolicy};
use request::{Request, RequestInit};
-use response::{HttpsState, Response, ResponseBody};
+use response::{HttpsState, Response};
use std::io::Error as IOError;
use std::sync::mpsc::Sender;
use std::thread;
@@ -164,8 +164,8 @@ pub enum FetchResponseMsg {
ProcessRequestEOF,
// todo: send more info about the response (or perhaps the entire Response)
ProcessResponse(Result<Metadata, NetworkError>),
- ProcessResponseEOF(Result<Option<Vec<u8>>, NetworkError>),
- FetchDone(Result<(Metadata, Option<Vec<u8>>), NetworkError>),
+ ProcessResponseChunk(Vec<u8>),
+ ProcessResponseEOF(Result<(), NetworkError>),
}
pub trait FetchTaskTarget {
@@ -184,22 +184,21 @@ pub trait FetchTaskTarget {
/// Fired when headers are received
fn process_response(&mut self, response: &Response);
+ /// Fired when a chunk of response content is received
+ fn process_response_chunk(&mut self, chunk: Vec<u8>);
+
/// https://fetch.spec.whatwg.org/#process-response-end-of-file
///
/// Fired when the response is fully fetched
fn process_response_eof(&mut self, response: &Response);
-
- /// Called when fetch terminates, useful for sync
- fn fetch_done(&mut self, response: &Response, sync: bool);
}
pub trait FetchResponseListener {
fn process_request_body(&mut self);
fn process_request_eof(&mut self);
fn process_response(&mut self, metadata: Result<Metadata, NetworkError>);
- fn process_response_eof(&mut self, response: Result<Option<Vec<u8>>, NetworkError>);
-
- fn fetch_done(&mut self, response: Result<(Metadata, Option<Vec<u8>>), NetworkError>);
+ fn process_response_chunk(&mut self, chunk: Vec<u8>);
+ fn process_response_eof(&mut self, response: Result<(), NetworkError>);
}
impl FetchTaskTarget for IpcSender<FetchResponseMsg> {
@@ -214,57 +213,17 @@ impl FetchTaskTarget for IpcSender<FetchResponseMsg> {
fn process_response(&mut self, response: &Response) {
let _ = self.send(FetchResponseMsg::ProcessResponse(response.metadata()));
}
+ fn process_response_chunk(&mut self, chunk: Vec<u8>) {
+ let _ = self.send(FetchResponseMsg::ProcessResponseChunk(chunk));
+ }
fn process_response_eof(&mut self, response: &Response) {
if response.is_network_error() {
// todo: finer grained errors
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Err(NetworkError::Internal("Network error".into()))));
+ } else {
+ let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(())));
}
- if let Ok(ref guard) = response.body.lock() {
- match **guard {
- ResponseBody::Done(ref vec) => {
- let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(Some(vec.clone()))));
- return;
- }
- ResponseBody::Empty => {
- let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(None)));
- return;
- }
- _ => ()
- }
- }
-
- // If something goes wrong, log it instead of crashing the resource thread
- let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Err(NetworkError::Internal("Incomplete body".into()))));
- }
-
- fn fetch_done(&mut self, response: &Response, sync: bool) {
- if !sync {
- // fetch_done is only used by sync XHR, avoid pointless data cloning
- return;
- }
- if response.is_network_error() {
- // todo: finer grained errors
- let _ = self.send(FetchResponseMsg::FetchDone(Err(NetworkError::Internal("Network error".into()))));
- }
- if let Ok(ref guard) = response.body.lock() {
- match **guard {
- ResponseBody::Done(ref vec) => {
- let ret = response.metadata().map(|m| (m, Some(vec.clone())));
- let _ = self.send(FetchResponseMsg::FetchDone(ret));
- return;
- }
- ResponseBody::Empty => {
- let ret = response.metadata().map(|m| (m, None));
- let _ = self.send(FetchResponseMsg::FetchDone(ret));
- return;
- }
- _ => ()
- }
- }
-
- // If something goes wrong, log it instead of crashing the resource thread
- let _ = self.send(FetchResponseMsg::FetchDone(Err(NetworkError::Internal("Incomplete body".into()))));
}
}
@@ -315,8 +274,8 @@ impl<T: FetchResponseListener> Action<T> for FetchResponseMsg {
FetchResponseMsg::ProcessRequestBody => listener.process_request_body(),
FetchResponseMsg::ProcessRequestEOF => listener.process_request_eof(),
FetchResponseMsg::ProcessResponse(meta) => listener.process_response(meta),
+ FetchResponseMsg::ProcessResponseChunk(data) => listener.process_response_chunk(data),
FetchResponseMsg::ProcessResponseEOF(data) => listener.process_response_eof(data),
- FetchResponseMsg::FetchDone(response) => listener.fetch_done(response),
}
}
}
diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs
index 7425a2feff3..5a0866cc1bf 100644
--- a/components/script/dom/xmlhttprequest.rs
+++ b/components/script/dom/xmlhttprequest.rs
@@ -47,7 +47,7 @@ use net_traits::CoreResourceMsg::Fetch;
use net_traits::trim_http_whitespace;
use net_traits::{FetchResponseListener, Metadata, NetworkError, RequestSource};
use net_traits::{CoreResourceThread, LoadOrigin};
-use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode, Origin};
+use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode};
use network_listener::{NetworkListener, PreInvoke};
use parse::html::{ParseContext, parse_html};
use parse::xml::{self, parse_xml};
@@ -230,14 +230,13 @@ impl XMLHttpRequest {
*self.sync_status.borrow_mut() = Some(rv);
}
}
- fn process_response_eof(&mut self, response: Result<Option<Vec<u8>>, NetworkError>) {
+ fn process_response_chunk(&mut self, mut chunk: Vec<u8>) {
+ self.buf.borrow_mut().append(&mut chunk);
+ self.xhr.root().process_data_available(self.gen_id, self.buf.borrow().clone());
+ }
+ fn process_response_eof(&mut self, response: Result<(), NetworkError>) {
match response {
- Ok(buf) => {
- if let Some(buf) = buf {
- *self.buf.borrow_mut() = buf;
- // todo move to a process_chunk
- self.xhr.root().process_data_available(self.gen_id, self.buf.borrow().clone());
- }
+ Ok(()) => {
let rv = self.xhr.root().process_response_complete(self.gen_id, Ok(()));
*self.sync_status.borrow_mut() = Some(rv);
}
@@ -247,17 +246,6 @@ impl XMLHttpRequest {
}
}
}
- fn fetch_done(&mut self, response: Result<(Metadata, Option<Vec<u8>>), NetworkError>) {
- match response {
- Ok(response) => {
- self.process_response(Ok(response.0));
- self.process_response_eof(Ok(response.1));
- }
- Err(err) => {
- self.process_response_eof(Err(err));
- }
- }
- }
}
impl PreInvoke for XHRContext {
diff --git a/tests/unit/net/fetch.rs b/tests/unit/net/fetch.rs
index 3eb0ff78567..1439d6d42fe 100644
--- a/tests/unit/net/fetch.rs
+++ b/tests/unit/net/fetch.rs
@@ -41,11 +41,11 @@ impl FetchTaskTarget for FetchResponseCollector {
fn process_request_body(&mut self, _: &Request) {}
fn process_request_eof(&mut self, _: &Request) {}
fn process_response(&mut self, _: &Response) {}
+ fn process_response_chunk(&mut self, _: Vec<u8>) {}
/// Fired when the response is fully fetched
fn process_response_eof(&mut self, response: &Response) {
self.sender.send(response.clone());
}
- fn fetch_done(&mut self, _: &Response, _: bool) {}
}
fn fetch_async(request: Request, target: Box<FetchTaskTarget + Send>) {