aboutsummaryrefslogtreecommitdiffstats
path: root/components/script/dom/eventsource.rs
diff options
context:
space:
mode:
Diffstat (limited to 'components/script/dom/eventsource.rs')
-rw-r--r--components/script/dom/eventsource.rs486
1 files changed, 465 insertions, 21 deletions
diff --git a/components/script/dom/eventsource.rs b/components/script/dom/eventsource.rs
index f05e73beafa..2eabfcbc8b0 100644
--- a/components/script/dom/eventsource.rs
+++ b/components/script/dom/eventsource.rs
@@ -6,18 +6,50 @@ use dom::bindings::cell::DOMRefCell;
use dom::bindings::codegen::Bindings::EventHandlerBinding::EventHandlerNonNull;
use dom::bindings::codegen::Bindings::EventSourceBinding::{EventSourceInit, EventSourceMethods, Wrap};
use dom::bindings::error::{Error, Fallible};
+use dom::bindings::inheritance::Castable;
use dom::bindings::js::Root;
-use dom::bindings::reflector::reflect_dom_object;
+use dom::bindings::refcounted::Trusted;
+use dom::bindings::reflector::{Reflectable, reflect_dom_object};
use dom::bindings::str::DOMString;
+use dom::event::Event;
use dom::eventtarget::EventTarget;
use dom::globalscope::GlobalScope;
+use dom::messageevent::MessageEvent;
+use encoding::Encoding;
+use encoding::all::UTF_8;
+use euclid::length::Length;
+use hyper::header::{Accept, qitem};
+use ipc_channel::ipc;
+use ipc_channel::router::ROUTER;
+use js::conversions::ToJSValConvertible;
+use js::jsapi::JSAutoCompartment;
+use js::jsval::UndefinedValue;
+use mime::{Mime, TopLevel, SubLevel};
+use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg, FetchResponseListener, NetworkError};
+use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
+use net_traits::request::{RequestInit, RequestMode};
+use network_listener::{NetworkListener, PreInvoke};
+use script_thread::Runnable;
+use servo_atoms::Atom;
use std::cell::Cell;
+use std::mem;
+use std::str::{Chars, FromStr};
+use std::sync::{Arc, Mutex};
+use task_source::TaskSource;
+use timers::OneshotTimerCallback;
use url::Url;
+header! { (LastEventId, "Last-Event-ID") => [String] }
+
+const DEFAULT_RECONNECTION_TIME: u64 = 5000;
+
+#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
+struct GenerationId(u32);
+
#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
-enum EventSourceReadyState {
+/// https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate
+enum ReadyState {
Connecting = 0,
- #[allow(dead_code)]
Open = 1,
Closed = 2
}
@@ -26,9 +58,254 @@ enum EventSourceReadyState {
pub struct EventSource {
eventtarget: EventTarget,
url: Url,
- ready_state: Cell<EventSourceReadyState>,
+ request: DOMRefCell<Option<RequestInit>>,
+ last_event_id: DOMRefCell<DOMString>,
+ reconnection_time: Cell<u64>,
+ generation_id: Cell<GenerationId>,
+
+ ready_state: Cell<ReadyState>,
with_credentials: bool,
- last_event_id: DOMRefCell<DOMString>
+}
+
+enum ParserState {
+ Field,
+ Comment,
+ Value,
+ Eol
+}
+
+struct EventSourceContext {
+ event_source: Trusted<EventSource>,
+ gen_id: GenerationId,
+ action_sender: ipc::IpcSender<FetchResponseMsg>,
+
+ parser_state: ParserState,
+ field: String,
+ value: String,
+ origin: String,
+
+ event_type: String,
+ data: String,
+ last_event_id: String,
+}
+
+impl EventSourceContext {
+ fn announce_the_connection(&self) {
+ let event_source = self.event_source.root();
+ if self.gen_id != event_source.generation_id.get() {
+ return;
+ }
+ let runnable = box AnnounceConnectionRunnable {
+ event_source: self.event_source.clone()
+ };
+ let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+ }
+
+ fn fail_the_connection(&self) {
+ let event_source = self.event_source.root();
+ if self.gen_id != event_source.generation_id.get() {
+ return;
+ }
+ let runnable = box FailConnectionRunnable {
+ event_source: self.event_source.clone()
+ };
+ let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+ }
+
+ // https://html.spec.whatwg.org/multipage/#reestablish-the-connection
+ fn reestablish_the_connection(&self) {
+ let event_source = self.event_source.root();
+
+ if self.gen_id != event_source.generation_id.get() {
+ return;
+ }
+
+ // Step 1
+ let runnable = box ReestablishConnectionRunnable {
+ event_source: self.event_source.clone(),
+ action_sender: self.action_sender.clone()
+ };
+ let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+ }
+
+ // https://html.spec.whatwg.org/multipage/#processField
+ fn process_field(&mut self) {
+ match &*self.field {
+ "event" => mem::swap(&mut self.event_type, &mut self.value),
+ "data" => {
+ self.data.push_str(&self.value);
+ self.data.push('\n');
+ }
+ "id" => mem::swap(&mut self.last_event_id, &mut self.value),
+ "retry" => if let Ok(time) = u64::from_str(&self.value) {
+ self.event_source.root().reconnection_time.set(time);
+ },
+ _ => ()
+ }
+
+ self.field.clear();
+ self.value.clear();
+ }
+
+ // https://html.spec.whatwg.org/multipage/#dispatchMessage
+ #[allow(unsafe_code)]
+ fn dispatch_event(&mut self) {
+ let event_source = self.event_source.root();
+ // Step 1
+ *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
+ // Step 2
+ if self.data.is_empty() {
+ self.data.clear();
+ self.event_type.clear();
+ return;
+ }
+ // Step 3
+ if let Some(last) = self.data.pop() {
+ if last != '\n' {
+ self.data.push(last);
+ }
+ }
+ // Step 6
+ let type_ = if !self.event_type.is_empty() {
+ Atom::from(self.event_type.clone())
+ } else {
+ atom!("message")
+ };
+ // Steps 4-5
+ let event = {
+ let _ac = JSAutoCompartment::new(event_source.global().get_cx(),
+ event_source.reflector().get_jsobject().get());
+ rooted!(in(event_source.global().get_cx()) let mut data = UndefinedValue());
+ unsafe { self.data.to_jsval(event_source.global().get_cx(), data.handle_mut()) };
+ MessageEvent::new(&*event_source.global(), type_, false, false, data.handle(),
+ DOMString::from(self.origin.clone()),
+ event_source.last_event_id.borrow().clone())
+ };
+ // Step 7
+ self.event_type.clear();
+ self.data.clear();
+ // Step 8
+ let runnable = box DispatchEventRunnable {
+ event_source: self.event_source.clone(),
+ event: Trusted::new(&event)
+ };
+ let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+ }
+
+ // https://html.spec.whatwg.org/multipage/#event-stream-interpretation
+ fn parse(&mut self, stream: Chars) {
+ let mut stream = stream.peekable();
+
+ while let Some(ch) = stream.next() {
+ match (ch, &self.parser_state) {
+ (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
+ (':', &ParserState::Field) => {
+ self.parser_state = ParserState::Value;
+ if let Some(&' ') = stream.peek() {
+ stream.next();
+ }
+ }
+
+ ('\n', &ParserState::Value) => {
+ self.parser_state = ParserState::Eol;
+ self.process_field();
+ }
+ ('\r', &ParserState::Value) => {
+ if let Some(&'\n') = stream.peek() {
+ continue;
+ }
+ self.parser_state = ParserState::Eol;
+ self.process_field();
+ }
+
+ ('\n', &ParserState::Field) => {
+ self.parser_state = ParserState::Eol;
+ self.process_field();
+ }
+ ('\r', &ParserState::Field) => {
+ if let Some(&'\n') = stream.peek() {
+ continue;
+ }
+ self.parser_state = ParserState::Eol;
+ self.process_field();
+ }
+
+ ('\n', &ParserState::Eol) => self.dispatch_event(),
+ ('\r', &ParserState::Eol) => {
+ if let Some(&'\n') = stream.peek() {
+ continue;
+ }
+ self.dispatch_event();
+ }
+
+ ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
+ ('\r', &ParserState::Comment) => {
+ if let Some(&'\n') = stream.peek() {
+ continue;
+ }
+ self.parser_state = ParserState::Eol;
+ }
+
+ (_, &ParserState::Field) => self.field.push(ch),
+ (_, &ParserState::Value) => self.value.push(ch),
+ (_, &ParserState::Eol) => {
+ self.parser_state = ParserState::Field;
+ self.field.push(ch);
+ }
+ (_, &ParserState::Comment) => (),
+ }
+ }
+ }
+}
+
+impl FetchResponseListener for EventSourceContext {
+ fn process_request_body(&mut self) {
+ // TODO
+ }
+
+ fn process_request_eof(&mut self) {
+ // TODO
+ }
+
+ fn process_response(&mut self, metadata: Result<FetchMetadata, NetworkError>) {
+ match metadata {
+ Ok(fm) => {
+ let meta = match fm {
+ FetchMetadata::Unfiltered(m) => m,
+ FetchMetadata::Filtered { unsafe_, .. } => unsafe_
+ };
+ match meta.content_type {
+ None => self.fail_the_connection(),
+ Some(ct) => match ct.into_inner().0 {
+ Mime(TopLevel::Text, SubLevel::EventStream, _) => {
+ self.origin = meta.final_url.origin().unicode_serialization();
+ self.announce_the_connection();
+ }
+ _ => self.fail_the_connection()
+ }
+ }
+ }
+ Err(_) => {
+ self.reestablish_the_connection();
+ }
+ }
+ }
+
+ fn process_response_chunk(&mut self, chunk: Vec<u8>) {
+ let mut stream = String::new();
+ UTF_8.raw_decoder().raw_feed(&chunk, &mut stream);
+ self.parse(stream.chars())
+ }
+
+ fn process_response_eof(&mut self, _response: Result<(), NetworkError>) {
+ self.reestablish_the_connection();
+ }
+}
+
+impl PreInvoke for EventSourceContext {
+ fn should_invoke(&self) -> bool {
+ self.event_source.root().generation_id.get() == self.gen_id
+ }
}
impl EventSource {
@@ -36,9 +313,13 @@ impl EventSource {
EventSource {
eventtarget: EventTarget::new_inherited(),
url: url,
- ready_state: Cell::new(EventSourceReadyState::Connecting),
+ request: DOMRefCell::new(None),
+ last_event_id: DOMRefCell::new(DOMString::from("")),
+ reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
+ generation_id: Cell::new(GenerationId(0)),
+
+ ready_state: Cell::new(ReadyState::Connecting),
with_credentials: with_credentials,
- last_event_id: DOMRefCell::new(DOMString::from(""))
}
}
@@ -48,27 +329,78 @@ impl EventSource {
Wrap)
}
+ pub fn request(&self) -> RequestInit {
+ self.request.borrow().clone().unwrap()
+ }
+
pub fn Constructor(global: &GlobalScope,
- url_str: DOMString,
+ url: DOMString,
event_source_init: &EventSourceInit) -> Fallible<Root<EventSource>> {
- // Steps 1-2
- let base_url = global.get_url();
- let url = match base_url.join(&*url_str) {
+ // TODO: Step 2 relevant settings object
+ // Step 3
+ let base_url = global.api_base_url();
+ let url_record = match base_url.join(&*url) {
Ok(u) => u,
+ // Step 4
Err(_) => return Err(Error::Syntax)
};
- // Step 3
- let event_source = EventSource::new(global, url, event_source_init.withCredentials);
- // Step 4
- // Step 5
- // Step 6
- // Step 7
+ // Step 1, 5
+ let ev = EventSource::new(global, url_record.clone(), event_source_init.withCredentials);
+ // Steps 6-7
+ let cors_attribute_state = if event_source_init.withCredentials {
+ CorsSettings::UseCredentials
+ } else {
+ CorsSettings::Anonymous
+ };
// Step 8
- // Step 9
+ // TODO: Step 9 set request's client settings
+ let mut request = RequestInit {
+ url: url_record,
+ origin: global.get_url(),
+ pipeline_id: Some(global.pipeline_id()),
+ // https://html.spec.whatwg.org/multipage/#create-a-potential-cors-request
+ use_url_credentials: true,
+ mode: RequestMode::CorsMode,
+ credentials_mode: if cors_attribute_state == CorsSettings::Anonymous {
+ CredentialsMode::CredentialsSameOrigin
+ } else {
+ CredentialsMode::Include
+ },
+ ..RequestInit::default()
+ };
// Step 10
+ request.headers.set(Accept(vec![qitem(mime!(Text / EventStream))]));
// Step 11
- Ok(event_source)
+ request.cache_mode = CacheMode::NoStore;
// Step 12
+ *ev.request.borrow_mut() = Some(request.clone());
+ // Step 14
+ let (action_sender, action_receiver) = ipc::channel().unwrap();
+ let context = EventSourceContext {
+ event_source: Trusted::new(&ev),
+ gen_id: ev.generation_id.get(),
+ action_sender: action_sender.clone(),
+
+ parser_state: ParserState::Eol,
+ field: String::new(),
+ value: String::new(),
+ origin: String::new(),
+
+ event_type: String::new(),
+ data: String::new(),
+ last_event_id: String::new(),
+ };
+ let listener = NetworkListener {
+ context: Arc::new(Mutex::new(context)),
+ task_source: global.networking_task_source(),
+ wrapper: Some(global.get_runnable_wrapper())
+ };
+ ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
+ listener.notify_fetch(message.to().unwrap());
+ });
+ global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
+ // Step 13
+ Ok(ev)
}
}
@@ -99,7 +431,119 @@ impl EventSourceMethods for EventSource {
// https://html.spec.whatwg.org/multipage/#dom-eventsource-close
fn Close(&self) {
- self.ready_state.set(EventSourceReadyState::Closed);
- // TODO: Terminate ongoing fetch
+ let GenerationId(prev_id) = self.generation_id.get();
+ self.generation_id.set(GenerationId(prev_id + 1));
+ self.ready_state.set(ReadyState::Closed);
+ }
+}
+
+pub struct AnnounceConnectionRunnable {
+ event_source: Trusted<EventSource>,
+}
+
+impl Runnable for AnnounceConnectionRunnable {
+ fn name(&self) -> &'static str { "EventSource AnnounceConnectionRunnable" }
+
+ // https://html.spec.whatwg.org/multipage/#announce-the-connection
+ fn handler(self: Box<AnnounceConnectionRunnable>) {
+ let event_source = self.event_source.root();
+ if event_source.ready_state.get() != ReadyState::Closed {
+ event_source.ready_state.set(ReadyState::Open);
+ event_source.upcast::<EventTarget>().fire_event(atom!("open"));
+ }
+ }
+}
+
+pub struct FailConnectionRunnable {
+ event_source: Trusted<EventSource>,
+}
+
+impl Runnable for FailConnectionRunnable {
+ fn name(&self) -> &'static str { "EventSource FailConnectionRunnable" }
+
+ // https://html.spec.whatwg.org/multipage/#fail-the-connection
+ fn handler(self: Box<FailConnectionRunnable>) {
+ let event_source = self.event_source.root();
+ if event_source.ready_state.get() != ReadyState::Closed {
+ event_source.ready_state.set(ReadyState::Closed);
+ event_source.upcast::<EventTarget>().fire_event(atom!("error"));
+ }
+ }
+}
+
+pub struct ReestablishConnectionRunnable {
+ event_source: Trusted<EventSource>,
+ action_sender: ipc::IpcSender<FetchResponseMsg>,
+}
+
+impl Runnable for ReestablishConnectionRunnable {
+ fn name(&self) -> &'static str { "EventSource ReestablishConnectionRunnable" }
+
+ // https://html.spec.whatwg.org/multipage/#reestablish-the-connection
+ fn handler(self: Box<ReestablishConnectionRunnable>) {
+ let event_source = self.event_source.root();
+ // Step 1.1
+ if event_source.ready_state.get() == ReadyState::Closed {
+ return;
+ }
+ // Step 1.2
+ event_source.ready_state.set(ReadyState::Connecting);
+ // Step 1.3
+ event_source.upcast::<EventTarget>().fire_event(atom!("error"));
+ // Step 2
+ let duration = Length::new(event_source.reconnection_time.get());
+ // TODO Step 3: Optionally wait some more
+ // Steps 4-5
+ let callback = OneshotTimerCallback::EventSourceTimeout(EventSourceTimeoutCallback {
+ event_source: self.event_source.clone(),
+ action_sender: self.action_sender.clone()
+ });
+ let _ = event_source.global().schedule_callback(callback, duration);
+ }
+}
+
+#[derive(JSTraceable, HeapSizeOf)]
+pub struct EventSourceTimeoutCallback {
+ #[ignore_heap_size_of = "Because it is non-owning"]
+ event_source: Trusted<EventSource>,
+ #[ignore_heap_size_of = "Because it is non-owning"]
+ action_sender: ipc::IpcSender<FetchResponseMsg>,
+}
+
+impl EventSourceTimeoutCallback {
+ // https://html.spec.whatwg.org/multipage/#reestablish-the-connection
+ pub fn invoke(self) {
+ let event_source = self.event_source.root();
+ let global = event_source.global();
+ // Step 5.1
+ if event_source.ready_state.get() != ReadyState::Connecting {
+ return;
+ }
+ // Step 5.2
+ let mut request = event_source.request();
+ // Step 5.3
+ if !event_source.last_event_id.borrow().is_empty() {
+ request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone())));
+ }
+ // Step 5.4
+ global.core_resource_thread().send(CoreResourceMsg::Fetch(request, self.action_sender)).unwrap();
+ }
+}
+
+pub struct DispatchEventRunnable {
+ event_source: Trusted<EventSource>,
+ event: Trusted<MessageEvent>,
+}
+
+impl Runnable for DispatchEventRunnable {
+ fn name(&self) -> &'static str { "EventSource DispatchEventRunnable" }
+
+ // https://html.spec.whatwg.org/multipage/#dispatchMessage
+ fn handler(self: Box<DispatchEventRunnable>) {
+ let event_source = self.event_source.root();
+ // Step 8
+ if event_source.ready_state.get() != ReadyState::Closed {
+ self.event.root().upcast::<Event>().fire(&event_source.upcast());
+ }
}
}