diff options
Diffstat (limited to 'components/script/dom/xmlhttprequest.rs')
-rw-r--r-- | components/script/dom/xmlhttprequest.rs | 432 |
1 files changed, 242 insertions, 190 deletions
diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 5d0f76e5463..a4e9e767dd5 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -26,7 +26,8 @@ use dom::urlsearchparams::URLSearchParamsHelpers; use dom::xmlhttprequesteventtarget::XMLHttpRequestEventTarget; use dom::xmlhttprequesteventtarget::XMLHttpRequestEventTargetTypeId; use dom::xmlhttprequestupload::XMLHttpRequestUpload; -use script_task::{ScriptChan, ScriptMsg, Runnable}; +use network_listener::{NetworkListener, PreInvoke}; +use script_task::{ScriptChan, ScriptMsg, Runnable, ScriptPort}; use encoding::all::UTF_8; use encoding::label::encoding_from_whatwg_label; @@ -43,19 +44,20 @@ use js::jsapi::JS_ClearPendingException; use js::jsval::{JSVal, NullValue, UndefinedValue}; use net_traits::ControlMsg::Load; -use net_traits::ProgressMsg::{Payload, Done}; -use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadResponse}; -use cors::{allow_cross_origin_request, CORSRequest, RequestMode}; +use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer}; +use net_traits::{AsyncResponseListener, Metadata}; +use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener}; +use cors::CORSResponse; use util::str::DOMString; use util::task::spawn_named; use std::ascii::AsciiExt; use std::borrow::ToOwned; -use std::cell::Cell; -use std::sync::mpsc::{Sender, Receiver, channel}; +use std::cell::{RefCell, Cell}; use std::default::Default; use std::old_io::Timer; use std::str::FromStr; +use std::sync::{Mutex, Arc}; use std::time::duration::Duration; use time; use url::{Url, UrlParser}; @@ -74,28 +76,20 @@ enum XMLHttpRequestState { Done = 4, } -struct XHRProgressHandler { - addr: TrustedXHRAddress, - progress: XHRProgress, -} - -impl XHRProgressHandler { - fn new(addr: TrustedXHRAddress, progress: XHRProgress) -> XHRProgressHandler { - XHRProgressHandler { addr: addr, progress: progress } - } -} - -impl Runnable for XHRProgressHandler { - fn handler(self: Box<XHRProgressHandler>) { - let this = *self; - XMLHttpRequest::handle_progress(this.addr, this.progress); - } -} - #[derive(PartialEq, Clone, Copy)] #[jstraceable] pub struct GenerationId(u32); +/// Closure of required data for each async network event that comprises the +/// XHR's response. +struct XHRContext { + xhr: TrustedXHRAddress, + gen_id: GenerationId, + cors_request: Option<CORSRequest>, + buf: DOMRefCell<Vec<u8>>, + sync_status: DOMRefCell<Option<ErrorResult>>, +} + #[derive(Clone)] pub enum XHRProgress { /// Notify that headers have been received @@ -119,16 +113,6 @@ impl XHRProgress { } } -enum SyncOrAsync<'a> { - Sync(JSRef<'a, XMLHttpRequest>), - Async(TrustedXHRAddress, Box<ScriptChan+Send>) -} - -enum TerminateReason { - AbortedOrReopened, - TimedOut, -} - #[dom_struct] pub struct XMLHttpRequest { eventtarget: XMLHttpRequestEventTarget, @@ -157,8 +141,9 @@ pub struct XMLHttpRequest { global: GlobalField, timer: DOMRefCell<Timer>, fetch_time: Cell<i64>, - terminate_sender: DOMRefCell<Option<Sender<TerminateReason>>>, + timeout_target: DOMRefCell<Option<Box<ScriptChan+Send>>>, generation_id: Cell<GenerationId>, + response_status: Cell<Result<(), ()>>, } impl XMLHttpRequest { @@ -190,8 +175,9 @@ impl XMLHttpRequest { global: GlobalField::from_rooted(&global), timer: DOMRefCell::new(Timer::new().unwrap()), fetch_time: Cell::new(0), - terminate_sender: DOMRefCell::new(None), - generation_id: Cell::new(GenerationId(0)) + timeout_target: DOMRefCell::new(None), + generation_id: Cell::new(GenerationId(0)), + response_status: Cell::new(Ok(())), } } pub fn new(global: GlobalRef) -> Temporary<XMLHttpRequest> { @@ -205,141 +191,91 @@ impl XMLHttpRequest { Ok(XMLHttpRequest::new(global)) } - pub fn handle_progress(addr: TrustedXHRAddress, progress: XHRProgress) { - let xhr = addr.to_temporary().root(); - xhr.r().process_partial_response(progress); - } - - #[allow(unsafe_code)] - fn fetch(fetch_type: &SyncOrAsync, resource_task: ResourceTask, - mut load_data: LoadData, terminate_receiver: Receiver<TerminateReason>, - cors_request: Result<Option<CORSRequest>,()>, gen_id: GenerationId, - start_port: Receiver<LoadResponse>) -> ErrorResult { - - fn notify_partial_progress(fetch_type: &SyncOrAsync, msg: XHRProgress) { - match *fetch_type { - SyncOrAsync::Sync(xhr) => { - xhr.process_partial_response(msg); - }, - SyncOrAsync::Async(ref addr, ref script_chan) => { - script_chan.send(ScriptMsg::RunnableMsg(box XHRProgressHandler::new(addr.clone(), msg))).unwrap(); - } - } + fn check_cors(context: Arc<Mutex<XHRContext>>, + load_data: LoadData, + req: CORSRequest, + script_chan: Box<ScriptChan+Send>, + resource_task: ResourceTask) { + struct CORSContext { + xhr: Arc<Mutex<XHRContext>>, + load_data: RefCell<Option<LoadData>>, + req: CORSRequest, + script_chan: Box<ScriptChan+Send>, + resource_task: ResourceTask, } - macro_rules! notify_error_and_return( - ($err:expr) => ({ - notify_partial_progress(fetch_type, XHRProgress::Errored(gen_id, $err)); - return Err($err) - }); - ); - - macro_rules! terminate( - ($reason:expr) => ( - match $reason { - TerminateReason::AbortedOrReopened => { - return Err(Abort) - } - TerminateReason::TimedOut => { - notify_error_and_return!(Timeout); - } + impl AsyncCORSResponseListener for CORSContext { + fn response_available(&self, response: CORSResponse) { + if response.network_error { + let mut context = self.xhr.lock().unwrap(); + let xhr = context.xhr.to_temporary().root(); + xhr.r().process_partial_response(XHRProgress::Errored(context.gen_id, Network)); + *context.sync_status.borrow_mut() = Some(Err(Network)); + return; } - ); - ); - - - match cors_request { - Err(_) => { - // Happens in case of cross-origin non-http URIs - notify_error_and_return!(Network); - } - Ok(Some(ref req)) => { - let (chan, cors_port) = channel(); - let req2 = req.clone(); - // TODO: this exists only to make preflight check non-blocking - // perhaps should be handled by the resource_loader? - spawn_named("XHR:Cors".to_owned(), move || { - let response = req2.http_fetch(); - chan.send(response).unwrap(); + let mut load_data = self.load_data.borrow_mut().take().unwrap(); + load_data.cors = Some(ResourceCORSData { + preflight: self.req.preflight_flag, + origin: self.req.origin.clone() }); - select! ( - response = cors_port.recv() => { - let response = response.unwrap(); - if response.network_error { - notify_error_and_return!(Network); - } else { - load_data.cors = Some(ResourceCORSData { - preflight: req.preflight_flag, - origin: req.origin.clone() - }); - } - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); + XMLHttpRequest::initiate_async_xhr(self.xhr.clone(), self.script_chan.clone(), + self.resource_task.clone(), load_data); } - _ => {} } - // Step 10, 13 - resource_task.send(Load(load_data)).unwrap(); + let cors_context = CORSContext { + xhr: context, + load_data: RefCell::new(Some(load_data)), + req: req.clone(), + script_chan: script_chan.clone(), + resource_task: resource_task, + }; + req.http_fetch_async(box cors_context, script_chan); + } - let progress_port; - select! ( - response = start_port.recv() => { - let response = response.unwrap(); - match cors_request { - Ok(Some(ref req)) => { - match response.metadata.headers { - Some(ref h) if allow_cross_origin_request(req, h) => {}, - _ => notify_error_and_return!(Network) - } - }, + fn initiate_async_xhr(context: Arc<Mutex<XHRContext>>, + script_chan: Box<ScriptChan+Send>, + resource_task: ResourceTask, + load_data: LoadData) { + impl AsyncResponseListener for XHRContext { + fn headers_available(&self, metadata: Metadata) { + let xhr = self.xhr.to_temporary().root(); + let rv = xhr.r().process_headers_available(self.cors_request.clone(), + self.gen_id, + metadata); + if rv.is_err() { + *self.sync_status.borrow_mut() = Some(rv); + } + } - _ => {} - }; - // XXXManishearth Clear cache entries in case of a network error - notify_partial_progress(fetch_type, XHRProgress::HeadersReceived(gen_id, - response.metadata.headers.clone(), response.metadata.status.clone())); + fn data_available(&self, payload: Vec<u8>) { + self.buf.borrow_mut().push_all(payload.as_slice()); + let xhr = self.xhr.to_temporary().root(); + xhr.r().process_data_available(self.gen_id, self.buf.borrow().clone()); + } - progress_port = response.progress_port; - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); + fn response_complete(&self, status: Result<(), String>) { + let xhr = self.xhr.to_temporary().root(); + let rv = xhr.r().process_response_complete(self.gen_id, status); + *self.sync_status.borrow_mut() = Some(rv); + } + } - let mut buf = vec!(); - loop { - // Under most circumstances, progress_port will contain lots of Payload - // events. Since select! does not have any fairness or priority, it - // might always remove the progress_port event, even when there is - // a terminate event waiting in the terminate_receiver. If this happens, - // a timeout or abort will take too long to be processed. To avoid this, - // in each iteration, we check for a terminate event before we block. - match terminate_receiver.try_recv() { - Ok(reason) => terminate!(reason), - Err(_) => () - }; - - select! ( - progress = progress_port.recv() => match progress.unwrap() { - Payload(data) => { - buf.push_all(data.as_slice()); - notify_partial_progress(fetch_type, - XHRProgress::Loading(gen_id, ByteString::new(buf.clone()))); - }, - Done(Ok(())) => { - notify_partial_progress(fetch_type, XHRProgress::Done(gen_id)); - return Ok(()); - }, - Done(Err(_)) => { - notify_error_and_return!(Network); - } - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); + impl PreInvoke for XHRContext { + fn should_invoke(&self) -> bool { + let xhr = self.xhr.to_temporary().root(); + xhr.r().generation_id.get() == self.gen_id + } } + + let listener = box NetworkListener { + context: context, + script_chan: script_chan, + }; + resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap(); } } @@ -577,10 +513,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { } - let global = self.global.root(); - let resource_task = global.r().resource_task(); - let (start_chan, start_port) = channel(); - let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap(), start_chan); + let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap()); load_data.data = extracted; #[inline] @@ -613,10 +546,9 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { } load_data.method = (*self.request_method.borrow()).clone(); - let (terminate_sender, terminate_receiver) = channel(); - *self.terminate_sender.borrow_mut() = Some(terminate_sender); // CORS stuff + let global = self.global.root(); let referer_url = self.global.root().r().get_url(); let mode = if self.upload_events.get() { RequestMode::ForcedPreflight @@ -647,31 +579,15 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { debug!("request_headers = {:?}", *self.request_headers.borrow()); - let gen_id = self.generation_id.get(); + self.fetch_time.set(time::now().to_timespec().sec); + let rv = self.fetch(load_data, cors_request, global.r()); if self.sync.get() { - return XMLHttpRequest::fetch(&mut SyncOrAsync::Sync(self), resource_task, load_data, - terminate_receiver, cors_request, gen_id, start_port); - } else { - self.fetch_time.set(time::now().to_timespec().sec); - let script_chan = global.r().script_chan(); - // Pin the object before launching the fetch task. This is to ensure that - // the object will stay alive as long as there are (possibly cancelled) - // inflight events queued up in the script task's port. - let addr = Trusted::new(self.global.root().r().get_cx(), self, - script_chan.clone()); - spawn_named("XHRTask".to_owned(), move || { - let _ = XMLHttpRequest::fetch(&mut SyncOrAsync::Async(addr, script_chan), - resource_task, - load_data, - terminate_receiver, - cors_request, - gen_id, - start_port); - }); - let timeout = self.timeout.get(); - if timeout > 0 { - self.set_timeout(timeout); - } + return rv; + } + + let timeout = self.timeout.get(); + if timeout > 0 { + self.set_timeout(timeout); } Ok(()) } @@ -812,6 +728,10 @@ pub type TrustedXHRAddress = Trusted<XMLHttpRequest>; trait PrivateXMLHttpRequestHelpers { fn change_ready_state(self, XMLHttpRequestState); + fn process_headers_available(&self, cors_request: Option<CORSRequest>, + gen_id: GenerationId, metadata: Metadata) -> Result<(), Error>; + fn process_data_available(self, gen_id: GenerationId, payload: Vec<u8>); + fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) -> ErrorResult; fn process_partial_response(self, progress: XHRProgress); fn terminate_ongoing_fetch(self); fn insert_trusted_header(self, name: String, value: String); @@ -822,6 +742,9 @@ trait PrivateXMLHttpRequestHelpers { fn set_timeout(self, timeout:u32); fn cancel_timeout(self); fn filter_response_headers(self) -> Headers; + fn discard_subsequent_responses(self); + fn fetch(self, load_data: LoadData, cors_request: Result<Option<CORSRequest>,()>, + global: GlobalRef) -> ErrorResult; } impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { @@ -837,6 +760,45 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { event.r().fire(target); } + fn process_headers_available(&self, cors_request: Option<CORSRequest>, + gen_id: GenerationId, metadata: Metadata) -> Result<(), Error> { + match cors_request { + Some(ref req) => { + match metadata.headers { + Some(ref h) if allow_cross_origin_request(req, h) => {}, + _ => { + self.process_partial_response(XHRProgress::Errored(gen_id, Network)); + return Err(Network); + } + } + }, + + _ => {} + }; + // XXXManishearth Clear cache entries in case of a network error + self.process_partial_response(XHRProgress::HeadersReceived(gen_id, + metadata.headers, metadata.status)); + Ok(()) + } + + fn process_data_available(self, gen_id: GenerationId, payload: Vec<u8>) { + self.process_partial_response(XHRProgress::Loading(gen_id, ByteString::new(payload))); + } + + fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) + -> ErrorResult { + match status { + Ok(()) => { + self.process_partial_response(XHRProgress::Done(gen_id)); + Ok(()) + }, + Err(_) => { + self.process_partial_response(XHRProgress::Errored(gen_id, Network)); + Err(Network) + } + } + } + fn process_partial_response(self, progress: XHRProgress) { let msg_id = progress.generation_id(); @@ -853,6 +815,11 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // Ignore message if it belongs to a terminated fetch return_if_fetch_was_terminated!(); + // Ignore messages coming from previously-errored responses or requests that have timed out + if self.response_status.get().is_err() { + return; + } + match progress { XHRProgress::HeadersReceived(_, headers, status) => { assert!(self.ready_state.get() == XMLHttpRequestState::Opened); @@ -904,6 +871,8 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.ready_state.get() == XMLHttpRequestState::Loading || self.sync.get()); + self.cancel_timeout(); + // Part of step 11, send() (processing response end of file) // XXXManishearth handle errors, if any (substep 2) @@ -919,6 +888,9 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.dispatch_response_progress_event("loadend".to_owned()); }, XHRProgress::Errored(_, e) => { + self.cancel_timeout(); + + self.discard_subsequent_responses(); self.send_flag.set(false); // XXXManishearth set response to NetworkError self.change_ready_state(XMLHttpRequestState::Done); @@ -952,7 +924,8 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { fn terminate_ongoing_fetch(self) { let GenerationId(prev_id) = self.generation_id.get(); self.generation_id.set(GenerationId(prev_id + 1)); - self.terminate_sender.borrow().as_ref().map(|s| s.send(TerminateReason::AbortedOrReopened)); + *self.timeout_target.borrow_mut() = None; + self.response_status.set(Ok(())); } fn insert_trusted_header(self, name: String, value: String) { @@ -990,15 +963,36 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.dispatch_progress_event(false, type_, len, total); } fn set_timeout(self, timeout: u32) { + struct XHRTimeout { + xhr: TrustedXHRAddress, + gen_id: GenerationId, + } + + impl Runnable for XHRTimeout { + fn handler(self: Box<XHRTimeout>) { + let this = *self; + let xhr = this.xhr.to_temporary().root(); + if xhr.r().ready_state.get() != XMLHttpRequestState::Done { + xhr.r().process_partial_response(XHRProgress::Errored(this.gen_id, Timeout)); + } + } + } + // Sets up the object to timeout in a given number of milliseconds // This will cancel all previous timeouts let oneshot = self.timer.borrow_mut() .oneshot(Duration::milliseconds(timeout as i64)); - let terminate_sender = (*self.terminate_sender.borrow()).clone(); + let timeout_target = (*self.timeout_target.borrow().as_ref().unwrap()).clone(); + let global = self.global.root(); + let xhr = Trusted::new(global.r().get_cx(), self, global.r().script_chan()); + let gen_id = self.generation_id.get(); spawn_named("XHR:Timer".to_owned(), move || { match oneshot.recv() { Ok(_) => { - terminate_sender.map(|s| s.send(TerminateReason::TimedOut)); + timeout_target.send(ScriptMsg::RunnableMsg(box XHRTimeout { + xhr: xhr, + gen_id: gen_id, + })).unwrap(); }, Err(_) => { // This occurs if xhr.timeout (the sender) goes out of scope (i.e, xhr went out of scope) @@ -1065,6 +1059,64 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // XXXManishearth additional CORS filtering goes here headers } + + fn discard_subsequent_responses(self) { + self.response_status.set(Err(())); + } + + fn fetch(self, + load_data: LoadData, + cors_request: Result<Option<CORSRequest>,()>, + global: GlobalRef) -> ErrorResult { + let cors_request = match cors_request { + Err(_) => { + // Happens in case of cross-origin non-http URIs + self.process_partial_response(XHRProgress::Errored( + self.generation_id.get(), Network)); + return Err(Network); + } + Ok(req) => req, + }; + + let xhr = Trusted::new(global.get_cx(), self, global.script_chan()); + + let context = Arc::new(Mutex::new(XHRContext { + xhr: xhr, + cors_request: cors_request.clone(), + gen_id: self.generation_id.get(), + buf: DOMRefCell::new(vec!()), + sync_status: DOMRefCell::new(None), + })); + + let (script_chan, script_port) = if self.sync.get() { + let (tx, rx) = global.new_script_pair(); + (tx, Some(rx)) + } else { + (global.script_chan(), None) + }; + *self.timeout_target.borrow_mut() = Some(script_chan.clone()); + + let resource_task = global.resource_task(); + if let Some(req) = cors_request { + XMLHttpRequest::check_cors(context.clone(), load_data, req.clone(), + script_chan.clone(), resource_task); + } else { + XMLHttpRequest::initiate_async_xhr(context.clone(), script_chan, + resource_task, load_data); + } + + if let Some(script_port) = script_port { + loop { + global.process_event(script_port.recv()); + let context = context.lock().unwrap(); + let sync_status = context.sync_status.borrow(); + if let Some(ref status) = *sync_status { + return status.clone(); + } + } + } + Ok(()) + } } trait Extractable { |