diff options
author | Mukilan Thiyagarajan <mukilanthiagarajan@gmail.com> | 2014-10-11 23:50:48 +0530 |
---|---|---|
committer | Mukilan Thiyagarajan <mukilanthiagarajan@gmail.com> | 2014-11-03 22:06:17 +0530 |
commit | 7435db26ac93de57c24a5c29693282cd7bbf9f6c (patch) | |
tree | 173506f994fc722c7d1ce2b766c1f25f5952fd47 | |
parent | 1a3ff8739c2a17d61f295f213f31ddee25e0b3ae (diff) | |
download | servo-7435db26ac93de57c24a5c29693282cd7bbf9f6c.tar.gz servo-7435db26ac93de57c24a5c29693282cd7bbf9f6c.zip |
Fix race condition in XHR and handle other abort/open scenarios
This fixes issue #3630
A short summary of the changes:
* Use generation id to cancel inflight requests
* Handles nested calls to abort, open, send inside handlers
* Adds XHRReleaseMsg to delay freeing XHR object till all
inflight events are processed
* Change the ErroredMsg enum to be more symmetric with the returned
Error enum
-rw-r--r-- | components/script/dom/dedicatedworkerglobalscope.rs | 7 | ||||
-rw-r--r-- | components/script/dom/xmlhttprequest.rs | 361 | ||||
-rw-r--r-- | components/script/script_task.rs | 5 |
3 files changed, 240 insertions, 133 deletions
diff --git a/components/script/dom/dedicatedworkerglobalscope.rs b/components/script/dom/dedicatedworkerglobalscope.rs index 6dafca0f885..9bf06024dec 100644 --- a/components/script/dom/dedicatedworkerglobalscope.rs +++ b/components/script/dom/dedicatedworkerglobalscope.rs @@ -19,7 +19,7 @@ use dom::workerglobalscope::DedicatedGlobalScope; use dom::workerglobalscope::{WorkerGlobalScope, WorkerGlobalScopeHelpers}; use dom::xmlhttprequest::XMLHttpRequest; use script_task::{ScriptTask, ScriptChan}; -use script_task::{ScriptMsg, FromWorker, DOMMessage, FireTimerMsg, XHRProgressMsg, WorkerRelease}; +use script_task::{ScriptMsg, FromWorker, DOMMessage, FireTimerMsg, XHRProgressMsg, XHRReleaseMsg, WorkerRelease}; use script_task::WorkerPostMessage; use script_task::StackRootTLS; @@ -134,7 +134,10 @@ impl DedicatedWorkerGlobalScope { global.delayed_release_worker(); }, Ok(XHRProgressMsg(addr, progress)) => { - XMLHttpRequest::handle_xhr_progress(addr, progress) + XMLHttpRequest::handle_progress(addr, progress) + }, + Ok(XHRReleaseMsg(addr)) => { + XMLHttpRequest::handle_release(addr) }, Ok(WorkerPostMessage(addr, data, nbytes)) => { Worker::handle_message(addr, data, nbytes); diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 3245ac210e5..3ad488253b0 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -46,7 +46,7 @@ use libc::c_void; use net::resource_task::{ResourceTask, ResourceCORSData, Load, LoadData, Payload, Done}; use cors::{allow_cross_origin_request, CORSRequest, CORSMode, ForcedPreflightMode}; -use script_task::{ScriptChan, XHRProgressMsg}; +use script_task::{ScriptChan, XHRProgressMsg, XHRReleaseMsg}; use servo_util::str::DOMString; use servo_util::task::spawn_named; @@ -83,24 +83,41 @@ enum XMLHttpRequestState { XHRDone = 4, // So as not to conflict with the ProgressMsg `Done` } +#[deriving(PartialEq)] +#[jstraceable] +pub struct GenerationId(uint); + pub enum XHRProgress { /// Notify that headers have been received - HeadersReceivedMsg(Option<ResponseHeaderCollection>, Status), + HeadersReceivedMsg(GenerationId, Option<ResponseHeaderCollection>, Status), /// Partial progress (after receiving headers), containing portion of the response - LoadingMsg(ByteString), + LoadingMsg(GenerationId, ByteString), /// Loading is done - DoneMsg, - /// There was an error (Abort or Timeout). For a network or other error, just pass None - ErroredMsg(Option<Error>), - /// Timeout was reached - TimeoutMsg + DoneMsg(GenerationId), + /// There was an error (only Abort, Timeout or Network is used) + ErroredMsg(GenerationId, Error), +} + +impl XHRProgress { + fn generation_id(&self) -> GenerationId { + match *self { + HeadersReceivedMsg(id, _, _) | + LoadingMsg(id, _) | + DoneMsg(id) | + ErroredMsg(id, _) => id + } + } } enum SyncOrAsync<'a> { Sync(JSRef<'a, XMLHttpRequest>), - Async(TrustedXHRAddress, ScriptChan) + Async(TrustedXHRAddress, &'a ScriptChan) } +enum TerminateReason { + AbortedOrReopened, + TimedOut, +} #[dom_struct] pub struct XMLHttpRequest { @@ -131,8 +148,8 @@ pub struct XMLHttpRequest { pinned_count: Cell<uint>, timer: DOMRefCell<Timer>, fetch_time: Cell<i64>, - timeout_pinned: Cell<bool>, - terminate_sender: DOMRefCell<Option<Sender<Error>>>, + terminate_sender: DOMRefCell<Option<Sender<TerminateReason>>>, + generation_id: Cell<GenerationId>, } impl XMLHttpRequest { @@ -165,8 +182,8 @@ impl XMLHttpRequest { pinned_count: Cell::new(0), timer: DOMRefCell::new(Timer::new().unwrap()), fetch_time: Cell::new(0), - timeout_pinned: Cell::new(false), terminate_sender: DOMRefCell::new(None), + generation_id: Cell::new(GenerationId(0)) } } pub fn new(global: &GlobalRef) -> Temporary<XMLHttpRequest> { @@ -178,86 +195,145 @@ impl XMLHttpRequest { Ok(XMLHttpRequest::new(global)) } - pub fn handle_xhr_progress(addr: TrustedXHRAddress, progress: XHRProgress) { + pub fn handle_progress(addr: TrustedXHRAddress, progress: XHRProgress) { unsafe { let xhr = JS::from_trusted_xhr_address(addr).root(); xhr.process_partial_response(progress); } } + pub fn handle_release(addr: TrustedXHRAddress) { + addr.release_once(); + } + fn fetch(fetch_type: &SyncOrAsync, resource_task: ResourceTask, - mut load_data: LoadData, terminate_receiver: Receiver<Error>, - cors_request: Result<Option<CORSRequest>,()>) -> ErrorResult { + mut load_data: LoadData, terminate_receiver: Receiver<TerminateReason>, + cors_request: Result<Option<CORSRequest>,()>, gen_id: GenerationId) -> ErrorResult { + fn notify_partial_progress(fetch_type: &SyncOrAsync, msg: XHRProgress) { match *fetch_type { Sync(xhr) => { xhr.process_partial_response(msg); }, - Async(addr, ref script_chan) => { + Async(addr, script_chan) => { let ScriptChan(ref chan) = *script_chan; chan.send(XHRProgressMsg(addr, msg)); } } } + + macro_rules! notify_error_and_return( + ($err:expr) => ({ + notify_partial_progress(fetch_type, ErroredMsg(gen_id, $err)); + return Err($err) + }); + ) + + macro_rules! terminate( + ($reason:expr) => ( + match $reason { + AbortedOrReopened => { + return Err(Abort) + } + TimedOut => { + notify_error_and_return!(Timeout); + } + } + ); + ) + + match cors_request { - Err(_) => return Err(Network), // Happens in case of cross-origin non-http URIs + Err(_) => { + // Happens in case of cross-origin non-http URIs + notify_error_and_return!(Network); + } + Ok(Some(ref req)) => { - let response = req.http_fetch(); - if response.network_error { - return Err(Network) - } else { - load_data.cors = Some(ResourceCORSData { - preflight: req.preflight_flag, - origin: req.origin.clone() - }) - } - }, + let (chan, cors_port) = channel(); + let req2 = req.clone(); + // TODO: this exists only to make preflight check non-blocking + // perhaps shoud be handled by the resource_loader? + spawn_named("XHR:Cors", proc() { + let response = req2.http_fetch(); + chan.send(response); + }); + + select! ( + response = cors_port.recv() => { + if response.network_error { + notify_error_and_return!(Network); + } else { + load_data.cors = Some(ResourceCORSData { + preflight: req.preflight_flag, + origin: req.origin.clone() + }); + } + }, + reason = terminate_receiver.recv() => terminate!(reason) + ) + } _ => {} } // Step 10, 13 let (start_chan, start_port) = channel(); resource_task.send(Load(load_data, start_chan)); - let response = start_port.recv(); - match terminate_receiver.try_recv() { - Ok(e) => return Err(e), - _ => {} - } - match cors_request { - Ok(Some(ref req)) => { - match response.metadata.headers { - Some(ref h) if allow_cross_origin_request(req, h) => {}, - _ => return Err(Network) - } + + + let progress_port; + select! ( + response = start_port.recv() => { + match cors_request { + Ok(Some(ref req)) => { + match response.metadata.headers { + Some(ref h) if allow_cross_origin_request(req, h) => {}, + _ => notify_error_and_return!(Network) + } + }, + + _ => {} + }; + // XXXManishearth Clear cache entries in case of a network error + notify_partial_progress(fetch_type, HeadersReceivedMsg(gen_id, + response.metadata.headers.clone(), response.metadata.status.clone())); + + progress_port = response.progress_port; }, - _ => {} - } - // XXXManishearth Clear cache entries in case of a network error + reason = terminate_receiver.recv() => terminate!(reason) + ) - notify_partial_progress(fetch_type, HeadersReceivedMsg( - response.metadata.headers.clone(), response.metadata.status.clone())); let mut buf = vec!(); loop { - let progress = response.progress_port.recv(); + // Under most circumstances, progress_port will contain lots of Payload + // events. Since select! does not have any fairness or priority, it + // might always remove the progress_port event, even when there is + // a terminate event waiting in the terminate_receiver. If this happens, + // a timeout or abort will take too long to be processed. To avoid this, + // in each iteration, we check for a terminate event before we block. match terminate_receiver.try_recv() { - Ok(e) => return Err(e), - _ => {} - } - match progress { - Payload(data) => { - buf.push_all(data.as_slice()); - notify_partial_progress(fetch_type, LoadingMsg(ByteString::new(buf.clone()))); - }, - Done(Ok(())) => { - notify_partial_progress(fetch_type, DoneMsg); - return Ok(()); + Ok(reason) => terminate!(reason), + Err(_) => () + }; + + select! ( + progress = progress_port.recv() => match progress { + Payload(data) => { + buf.push_all(data.as_slice()); + notify_partial_progress(fetch_type, + LoadingMsg(gen_id, ByteString::new(buf.clone()))); + }, + Done(Ok(())) => { + notify_partial_progress(fetch_type, DoneMsg(gen_id)); + return Ok(()); + }, + Done(Err(_)) => { + notify_error_and_return!(Network); + } }, - Done(Err(_)) => { - notify_partial_progress(fetch_type, ErroredMsg(None)); - return Err(Network) - } - } + reason = terminate_receiver.recv() => terminate!(reason) + ) } } } @@ -270,8 +346,6 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { } fn Open(self, method: ByteString, url: DOMString) -> ErrorResult { - // Clean up from previous requests, if any: - self.cancel_timeout(); let uppercase_method = method.as_str().map(|s| { let upper = s.to_ascii_upper(); match upper.as_slice() { @@ -311,7 +385,9 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { return Err(InvalidAccess) } } - // XXXManishearth abort existing requests + // abort existing requests + self.terminate_ongoing_fetch(); + // Step 12 *self.request_url.borrow_mut() = Some(parsed_url); *self.request_headers.borrow_mut() = RequestHeaderCollection::new(); @@ -453,15 +529,8 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { Some (ref v) if v.len() == 0 => true, _ => false }); - let mut addr = None; - if !self.sync.get() { - // If one of the event handlers below aborts the fetch, - // the assertion in release_once() will fail since we haven't pinned it yet. - // Pin early to avoid dealing with this - unsafe { - addr = Some(self.to_trusted()); - } + if !self.sync.get() { // Step 8 let upload_target = *self.upload.root(); let event_target: JSRef<EventTarget> = EventTargetCast::from_ref(upload_target); @@ -471,15 +540,20 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { // Step 9 self.send_flag.set(true); + // If one of the event handlers below aborts the fetch by calling + // abort or open we will need the current generation id to detect it. + let gen_id = self.generation_id.get(); self.dispatch_response_progress_event("loadstart".to_string()); + if self.generation_id.get() != gen_id { + return Ok(()); + } if !self.upload_complete.get() { self.dispatch_upload_progress_event("loadstart".to_string(), Some(0)); + if self.generation_id.get() != gen_id { + return Ok(()); + } } - } - if self.ready_state.get() == Unsent { - // The progress events above might have run abort(), in which case we terminate the fetch. - return Ok(()); } let global = self.global.root(); @@ -544,15 +618,30 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { _ => {} } + let gen_id = self.generation_id.get(); if self.sync.get() { return XMLHttpRequest::fetch(&mut Sync(self), resource_task, load_data, - terminate_receiver, cors_request); + terminate_receiver, cors_request, gen_id); } else { self.fetch_time.set(time::now().to_timespec().sec); let script_chan = global.root_ref().script_chan().clone(); + // Pin the object before launching the fetch task. + // The XHRReleaseMsg sent when the fetch task completes will + // unpin it. This is to ensure that the object will stay alive + // as long as there are (possibly cancelled) inflight events queued up + // in the script task's port + let addr = unsafe { + self.to_trusted() + }; spawn_named("XHRTask", proc() { - let _ = XMLHttpRequest::fetch(&mut Async(addr.unwrap(), script_chan), - resource_task, load_data, terminate_receiver, cors_request); + let _ = XMLHttpRequest::fetch(&mut Async(addr, &script_chan), + resource_task, + load_data, + terminate_receiver, + cors_request, + gen_id); + let ScriptChan(ref chan) = script_chan; + chan.send(XHRReleaseMsg(addr)); }); let timeout = self.timeout.get(); if timeout > 0 { @@ -562,12 +651,19 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { Ok(()) } fn Abort(self) { - self.terminate_sender.borrow().as_ref().map(|s| s.send_opt(Abort)); - match self.ready_state.get() { - Opened if self.send_flag.get() => self.process_partial_response(ErroredMsg(Some(Abort))), - HeadersReceived | Loading => self.process_partial_response(ErroredMsg(Some(Abort))), - _ => {} - }; + self.terminate_ongoing_fetch(); + let state = self.ready_state.get(); + if (state == Opened && self.send_flag.get()) || + state == HeadersReceived || + state == Loading { + let gen_id = self.generation_id.get(); + self.process_partial_response(ErroredMsg(gen_id, Abort)); + // If open was called in one of the handlers invoked by the + // above call then we should terminate the abort sequence + if self.generation_id.get() != gen_id { + return + } + } self.ready_state.set(Unsent); } fn ResponseURL(self) -> DOMString { @@ -692,6 +788,7 @@ trait PrivateXMLHttpRequestHelpers { fn release_once(self); fn change_ready_state(self, XMLHttpRequestState); fn process_partial_response(self, progress: XHRProgress); + fn terminate_ongoing_fetch(self); fn insert_trusted_header(self, name: String, value: String); fn dispatch_progress_event(self, upload: bool, type_: DOMString, loaded: u64, total: Option<u64>); fn dispatch_upload_progress_event(self, type_: DOMString, partial_load: Option<u64>); @@ -742,8 +839,24 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { } fn process_partial_response(self, progress: XHRProgress) { + let msg_id = progress.generation_id(); + + // Aborts processing if abort() or open() was called + // (including from one of the event handlers called below) + macro_rules! return_if_fetch_was_terminated( + () => ( + if msg_id != self.generation_id.get() { + return + } + ); + ) + + // Ignore message if it belongs to a terminated fetch + return_if_fetch_was_terminated!(); + match progress { - HeadersReceivedMsg(headers, status) => { + HeadersReceivedMsg(_, headers, status) => { + assert!(self.ready_state.get() == Opened); // For synchronous requests, this should not fire any events, and just store data // XXXManishearth Find a way to track partial progress of the send (onprogresss for XHRUpload) @@ -753,8 +866,11 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // Substeps 2-4 if !self.sync.get() { self.dispatch_upload_progress_event("progress".to_string(), None); + return_if_fetch_was_terminated!(); self.dispatch_upload_progress_event("load".to_string(), None); + return_if_fetch_was_terminated!(); self.dispatch_upload_progress_event("loadend".to_string(), None); + return_if_fetch_was_terminated!(); } // Part of step 13, send() (processing response) // XXXManishearth handle errors, if any (substep 1) @@ -768,27 +884,25 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { None => {} }; // Substep 3 - if self.ready_state.get() == Opened && !self.sync.get() { + if !self.sync.get() { self.change_ready_state(HeadersReceived); } }, - LoadingMsg(partial_response) => { + LoadingMsg(_, partial_response) => { // For synchronous requests, this should not fire any events, and just store data // Part of step 13, send() (processing response body) // XXXManishearth handle errors, if any (substep 1) - // Substep 2 - if self.ready_state.get() == HeadersReceived && !self.sync.get() { - self.change_ready_state(Loading); - } - // Substep 3 *self.response.borrow_mut() = partial_response; - // Substep 4 if !self.sync.get() { + if self.ready_state.get() == HeadersReceived { + self.change_ready_state(Loading); + return_if_fetch_was_terminated!(); + } self.dispatch_response_progress_event("progress".to_string()); } }, - DoneMsg => { + DoneMsg(_) => { // Part of step 13, send() (processing response end of file) // XXXManishearth handle errors, if any (substep 1) @@ -797,50 +911,52 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // Subsubsteps 2-4 self.send_flag.set(false); self.change_ready_state(XHRDone); - + return_if_fetch_was_terminated!(); // Subsubsteps 5-7 self.dispatch_response_progress_event("progress".to_string()); + return_if_fetch_was_terminated!(); self.dispatch_response_progress_event("load".to_string()); + return_if_fetch_was_terminated!(); self.dispatch_response_progress_event("loadend".to_string()); } - self.cancel_timeout(); - self.release_once(); }, - ErroredMsg(e) => { + ErroredMsg(_, e) => { self.send_flag.set(false); // XXXManishearth set response to NetworkError self.change_ready_state(XHRDone); + return_if_fetch_was_terminated!(); + let errormsg = match e { - Some(Abort) => "abort", - Some(Timeout) => "timeout", - None => "error", - _ => unreachable!() + Abort => "abort", + Timeout => "timeout", + _ => "error", }; let upload_complete: &Cell<bool> = &self.upload_complete; if !upload_complete.get() { upload_complete.set(true); self.dispatch_upload_progress_event("progress".to_string(), None); + return_if_fetch_was_terminated!(); self.dispatch_upload_progress_event(errormsg.to_string(), None); + return_if_fetch_was_terminated!(); self.dispatch_upload_progress_event("loadend".to_string(), None); + return_if_fetch_was_terminated!(); } self.dispatch_response_progress_event("progress".to_string()); + return_if_fetch_was_terminated!(); self.dispatch_response_progress_event(errormsg.to_string()); + return_if_fetch_was_terminated!(); self.dispatch_response_progress_event("loadend".to_string()); - - self.cancel_timeout(); - self.release_once(); - }, - TimeoutMsg => { - match self.ready_state.get() { - Opened if self.send_flag.get() => self.process_partial_response(ErroredMsg(Some(Timeout))), - Loading | HeadersReceived => self.process_partial_response(ErroredMsg(Some(Timeout))), - _ => self.release_once() - }; } } } + fn terminate_ongoing_fetch(self) { + let GenerationId(prev_id) = self.generation_id.get(); + self.generation_id.set(GenerationId(prev_id + 1)); + self.terminate_sender.borrow().as_ref().map(|s| s.send_opt(AbortedOrReopened)); + } + fn insert_trusted_header(self, name: String, value: String) { // Insert a header without checking spec-compliance // Use for hardcoded headers @@ -886,23 +1002,11 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // This will cancel all previous timeouts let oneshot = self.timer.borrow_mut() .oneshot(Duration::milliseconds(timeout as i64)); - let addr = unsafe { - self.to_trusted() // This will increment the pin counter by one - }; - if self.timeout_pinned.get() { - // Already pinned due to a timeout, no need to pin it again since the old timeout was cancelled above - self.release_once(); - } - self.timeout_pinned.set(true); - let global = self.global.root(); - let script_chan = global.root_ref().script_chan().clone(); let terminate_sender = (*self.terminate_sender.borrow()).clone(); spawn_named("XHR:Timer", proc () { match oneshot.recv_opt() { Ok(_) => { - let ScriptChan(ref chan) = script_chan; - terminate_sender.map(|s| s.send_opt(Timeout)); - chan.send(XHRProgressMsg(addr, TimeoutMsg)); + terminate_sender.map(|s| s.send_opt(TimedOut)); }, Err(_) => { // This occurs if xhr.timeout (the sender) goes out of scope (i.e, xhr went out of scope) @@ -913,15 +1017,12 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { } ); } + fn cancel_timeout(self) { - // Cancels timeouts on the object, if any - if self.timeout_pinned.get() { - self.timeout_pinned.set(false); - self.release_once(); - } // oneshot() closes the previous channel, canceling the timeout self.timer.borrow_mut().oneshot(Zero::zero()); } + fn text_response(self) -> DOMString { let mut encoding = UTF_8 as EncodingRef; match self.response_headers.borrow().content_type { diff --git a/components/script/script_task.rs b/components/script/script_task.rs index deb6aaded80..5a6e6ca4173 100644 --- a/components/script/script_task.rs +++ b/components/script/script_task.rs @@ -100,6 +100,8 @@ pub enum ScriptMsg { ExitWindowMsg(PipelineId), /// Notifies the script of progress on a fetch (dispatched to all tasks). XHRProgressMsg(TrustedXHRAddress, XHRProgress), + /// Releases one reference to the XHR object (dispatched to all tasks). + XHRReleaseMsg(TrustedXHRAddress), /// Message sent through Worker.postMessage (only dispatched to /// DedicatedWorkerGlobalScope). DOMMessage(*mut u64, size_t), @@ -530,7 +532,8 @@ impl ScriptTask { FromConstellation(ExitPipelineMsg(id)) => if self.handle_exit_pipeline_msg(id) { return false }, FromScript(ExitWindowMsg(id)) => self.handle_exit_window_msg(id), FromConstellation(ResizeMsg(..)) => fail!("should have handled ResizeMsg already"), - FromScript(XHRProgressMsg(addr, progress)) => XMLHttpRequest::handle_xhr_progress(addr, progress), + FromScript(XHRProgressMsg(addr, progress)) => XMLHttpRequest::handle_progress(addr, progress), + FromScript(XHRReleaseMsg(addr)) => XMLHttpRequest::handle_release(addr), FromScript(DOMMessage(..)) => fail!("unexpected message"), FromScript(WorkerPostMessage(addr, data, nbytes)) => Worker::handle_message(addr, data, nbytes), FromScript(WorkerRelease(addr)) => Worker::handle_release(addr), |