diff options
author | Keith Yeung <kungfukeith11@gmail.com> | 2016-10-27 02:48:20 -0700 |
---|---|---|
committer | Keith Yeung <kungfukeith11@gmail.com> | 2016-11-11 14:50:49 -0800 |
commit | 0b32b624a7746a10ddfa7701fa316dfdbd59ee54 (patch) | |
tree | fcd7bed5c1239c4b2b7e8f6dfdce21cb798eb6bf /components/script/dom/eventsource.rs | |
parent | c198bfa388df0b5c6c80538f0b232d447326f084 (diff) | |
download | servo-0b32b624a7746a10ddfa7701fa316dfdbd59ee54.tar.gz servo-0b32b624a7746a10ddfa7701fa316dfdbd59ee54.zip |
Interpret event stream
Diffstat (limited to 'components/script/dom/eventsource.rs')
-rw-r--r-- | components/script/dom/eventsource.rs | 255 |
1 files changed, 229 insertions, 26 deletions
diff --git a/components/script/dom/eventsource.rs b/components/script/dom/eventsource.rs index 780d4563469..2bc10f244ac 100644 --- a/components/script/dom/eventsource.rs +++ b/components/script/dom/eventsource.rs @@ -11,25 +11,39 @@ use dom::bindings::js::Root; use dom::bindings::refcounted::Trusted; use dom::bindings::reflector::{Reflectable, reflect_dom_object}; use dom::bindings::str::DOMString; +use dom::event::Event; use dom::eventtarget::EventTarget; use dom::globalscope::GlobalScope; +use dom::messageevent::MessageEvent; +use encoding::Encoding; +use encoding::all::UTF_8; use hyper::header::{Accept, qitem}; use ipc_channel::ipc; use ipc_channel::router::ROUTER; +use js::conversions::ToJSValConvertible; +use js::jsapi::JSAutoCompartment; +use js::jsval::UndefinedValue; use mime::{Mime, TopLevel, SubLevel}; use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseListener, NetworkError}; use net_traits::request::{CacheMode, CorsSettings, CredentialsMode}; use net_traits::request::{RequestInit, RequestMode}; use network_listener::{NetworkListener, PreInvoke}; use script_thread::Runnable; +use servo_atoms::Atom; use std::cell::Cell; +use std::mem; +use std::str::{Chars, FromStr}; use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Sender, channel}; +use std::thread; +use std::time::Duration; use task_source::TaskSource; use url::Url; header! { (LastEventId, "Last-Event-ID") => [String] } +const DEFAULT_RECONNECTION_TIME: u64 = 5000; + #[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)] struct GenerationId(u32); @@ -44,18 +58,34 @@ enum ReadyState { #[dom_struct] pub struct EventSource { eventtarget: EventTarget, - url: DOMRefCell<Option<Url>>, + url: Url, request: DOMRefCell<Option<RequestInit>>, last_event_id: DOMRefCell<DOMString>, + reconnection_time: Cell<u64>, generation_id: Cell<GenerationId>, ready_state: Cell<ReadyState>, with_credentials: bool, } +enum ParserState { + Field, + Comment, + Value, + Eol +} + struct EventSourceContext { event_source: Trusted<EventSource>, - gen_id: GenerationId + gen_id: GenerationId, + parser_state: ParserState, + field: String, + value: String, + origin: String, + + event_type: String, + data: String, + last_event_id: String, } impl EventSourceContext { @@ -94,6 +124,9 @@ impl EventSourceContext { return; } let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); + // Step 2 + thread::sleep(Duration::from_millis(event_source.reconnection_time.get())); + // TODO Step 3: Optionally wait some more // Step 4 if self.gen_id != self.event_source.root().generation_id.get() { return; @@ -102,13 +135,146 @@ impl EventSourceContext { // Step 5 let runnable = box RefetchRequestRunnable { event_source: self.event_source.clone(), - gen_id: self.gen_id + gen_id: self.gen_id, + + event_type: self.event_type.clone(), + data: self.data.clone(), + last_event_id: self.last_event_id.clone(), }; if self.gen_id != self.event_source.root().generation_id.get() { return; } let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); } + + // https://html.spec.whatwg.org/multipage/#processField + fn process_field(&mut self) { + match &*self.field { + "event" => mem::swap(&mut self.event_type, &mut self.value), + "data" => { + self.data.push_str(&self.value); + self.data.push('\n'); + } + "id" => mem::swap(&mut self.last_event_id, &mut self.value), + "retry" => if let Ok(time) = u64::from_str(&self.value) { + self.event_source.root().reconnection_time.set(time); + }, + _ => () + } + + self.field.clear(); + self.value.clear(); + } + + // https://html.spec.whatwg.org/multipage/#dispatchMessage + #[allow(unsafe_code)] + fn dispatch_event(&mut self) { + let event_source = self.event_source.root(); + // Step 1 + *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone()); + // Step 2 + if self.data.is_empty() { + self.data.clear(); + self.event_type.clear(); + return; + } + // Step 3 + if let Some(last) = self.data.pop() { + if last != '\n' { + self.data.push(last); + } + } + // Step 6 + let type_ = if !self.event_type.is_empty() { + Atom::from(self.event_type.clone()) + } else { + atom!("message") + }; + // Steps 4-5 + let event = { + let _ac = JSAutoCompartment::new(event_source.global().get_cx(), + event_source.reflector().get_jsobject().get()); + rooted!(in(event_source.global().get_cx()) let mut data = UndefinedValue()); + unsafe { self.data.to_jsval(event_source.global().get_cx(), data.handle_mut()) }; + MessageEvent::new(&*event_source.global(), type_, false, false, data.handle(), + DOMString::from(self.origin.clone()), + event_source.last_event_id.borrow().clone()) + }; + // Step 7 + self.event_type.clear(); + self.data.clear(); + // Step 8 + let runnable = box DispatchEventRunnable { + event_source: self.event_source.clone(), + event: Trusted::new(&event) + }; + let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); + } + + // https://html.spec.whatwg.org/multipage/#event-stream-interpretation + fn parse(&mut self, stream: Chars) { + let mut stream = stream.peekable(); + + while let Some(ch) = stream.next() { + match (ch, &self.parser_state) { + (':', &ParserState::Eol) => self.parser_state = ParserState::Comment, + (':', &ParserState::Field) => { + self.parser_state = ParserState::Value; + if let Some(&' ') = stream.peek() { + stream.next(); + } + } + + ('\n', &ParserState::Value) => { + self.parser_state = ParserState::Eol; + self.process_field(); + } + ('\r', &ParserState::Value) => { + if let Some(&'\n') = stream.peek() { + continue; + } + self.parser_state = ParserState::Eol; + self.process_field(); + } + + ('\n', &ParserState::Field) => { + self.parser_state = ParserState::Eol; + self.process_field(); + } + ('\r', &ParserState::Field) => { + if let Some(&'\n') = stream.peek() { + continue; + } + self.parser_state = ParserState::Eol; + self.process_field(); + } + + ('\n', &ParserState::Eol) => self.dispatch_event(), + ('\r', &ParserState::Eol) => { + if let Some(&'\n') = stream.peek() { + continue; + } + self.dispatch_event(); + } + + ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol, + ('\r', &ParserState::Comment) => { + if let Some(&'\n') = stream.peek() { + continue; + } + self.parser_state = ParserState::Eol; + } + + (_, &ParserState::Field) => self.field.push(ch), + (_, &ParserState::Value) => self.value.push(ch), + (_, &ParserState::Eol) => { + self.parser_state = ParserState::Field; + self.field.push(ch); + } + (_, &ParserState::Comment) => (), + } + } + } } impl FetchResponseListener for EventSourceContext { @@ -130,8 +296,10 @@ impl FetchResponseListener for EventSourceContext { match meta.content_type { None => self.fail_the_connection(), Some(ct) => match ct.into_inner().0 { - Mime(TopLevel::Text, SubLevel::EventStream, _) => - self.announce_the_connection(), + Mime(TopLevel::Text, SubLevel::EventStream, _) => { + self.origin = meta.final_url.origin().unicode_serialization(); + self.announce_the_connection(); + } _ => self.fail_the_connection() } } @@ -142,12 +310,14 @@ impl FetchResponseListener for EventSourceContext { } } - fn process_response_chunk(&mut self, mut _chunk: Vec<u8>) { - // TODO + fn process_response_chunk(&mut self, chunk: Vec<u8>) { + let mut stream = String::new(); + UTF_8.raw_decoder().raw_feed(&chunk, &mut stream); + self.parse(stream.chars()) } fn process_response_eof(&mut self, _response: Result<(), NetworkError>) { - // TODO + } } @@ -158,12 +328,13 @@ impl PreInvoke for EventSourceContext { } impl EventSource { - fn new_inherited(with_credentials: bool) -> EventSource { + fn new_inherited(url: Url, with_credentials: bool) -> EventSource { EventSource { eventtarget: EventTarget::new_inherited(), - url: DOMRefCell::new(None), + url: url, request: DOMRefCell::new(None), last_event_id: DOMRefCell::new(DOMString::from("")), + reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME), generation_id: Cell::new(GenerationId(0)), ready_state: Cell::new(ReadyState::Connecting), @@ -171,8 +342,8 @@ impl EventSource { } } - fn new(global: &GlobalScope, with_credentials: bool) -> Root<EventSource> { - reflect_dom_object(box EventSource::new_inherited(with_credentials), + fn new(global: &GlobalScope, url: Url, with_credentials: bool) -> Root<EventSource> { + reflect_dom_object(box EventSource::new_inherited(url, with_credentials), global, Wrap) } @@ -181,15 +352,9 @@ impl EventSource { self.request.borrow().clone().unwrap() } - pub fn last_event_id(&self) -> DOMString { - self.last_event_id.borrow().clone() - } - pub fn Constructor(global: &GlobalScope, url: DOMString, event_source_init: &EventSourceInit) -> Fallible<Root<EventSource>> { - // Step 1 - let ev = EventSource::new(global, event_source_init.withCredentials); // TODO: Step 2 relevant settings object // Step 3 let base_url = global.api_base_url(); @@ -198,8 +363,8 @@ impl EventSource { // Step 4 Err(_) => return Err(Error::Syntax) }; - // Step 5 - *ev.url.borrow_mut() = Some(url_record.clone()); + // Step 1, 5 + let ev = EventSource::new(global, url_record.clone(), event_source_init.withCredentials); // Steps 6-7 let cors_attribute_state = if event_source_init.withCredentials { CorsSettings::UseCredentials @@ -231,7 +396,15 @@ impl EventSource { // Step 14 let context = EventSourceContext { event_source: Trusted::new(&ev), - gen_id: ev.generation_id.get() + gen_id: ev.generation_id.get(), + parser_state: ParserState::Eol, + field: String::new(), + value: String::new(), + origin: String::new(), + + event_type: String::new(), + data: String::new(), + last_event_id: String::new(), }; let listener = NetworkListener { context: Arc::new(Mutex::new(context)), @@ -260,7 +433,7 @@ impl EventSourceMethods for EventSource { // https://html.spec.whatwg.org/multipage/#dom-eventsource-url fn Url(&self) -> DOMString { - DOMString::from(self.url.borrow().clone().map_or("".to_owned(), Url::into_string)) + DOMString::from(self.url.as_str()) } // https://html.spec.whatwg.org/multipage/#dom-eventsource-withcredentials @@ -341,7 +514,11 @@ impl Runnable for ReestablishConnectionRunnable { pub struct RefetchRequestRunnable { event_source: Trusted<EventSource>, - gen_id: GenerationId + gen_id: GenerationId, + + event_type: String, + data: String, + last_event_id: String, } impl Runnable for RefetchRequestRunnable { @@ -358,13 +535,21 @@ impl Runnable for RefetchRequestRunnable { // Step 5.2 let mut request = event_source.request(); // Step 5.3 - if !event_source.last_event_id().is_empty() { - request.headers.set(LastEventId(String::from(event_source.last_event_id()))); + if !event_source.last_event_id.borrow().is_empty() { + request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone()))); } // Step 5.4 let context = EventSourceContext { event_source: self.event_source.clone(), - gen_id: self.gen_id + gen_id: self.gen_id, + parser_state: ParserState::Eol, + field: String::new(), + value: String::new(), + origin: String::new(), + + event_type: self.event_type.clone(), + data: self.data.clone(), + last_event_id: self.last_event_id.clone() }; let listener = NetworkListener { context: Arc::new(Mutex::new(context)), @@ -378,3 +563,21 @@ impl Runnable for RefetchRequestRunnable { global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap(); } } + +pub struct DispatchEventRunnable { + event_source: Trusted<EventSource>, + event: Trusted<MessageEvent>, +} + +impl Runnable for DispatchEventRunnable { + fn name(&self) -> &'static str { "EventSource DispatchEventRunnable" } + + // https://html.spec.whatwg.org/multipage/#dispatchMessage + fn handler(self: Box<DispatchEventRunnable>) { + let event_source = self.event_source.root(); + // Step 8 + if event_source.ready_state.get() != ReadyState::Closed { + self.event.root().upcast::<Event>().fire(&event_source.upcast()); + } + } +} |