aboutsummaryrefslogtreecommitdiffstats
path: root/components/script/dom/eventsource.rs
diff options
context:
space:
mode:
Diffstat (limited to 'components/script/dom/eventsource.rs')
-rw-r--r--components/script/dom/eventsource.rs549
1 files changed, 336 insertions, 213 deletions
diff --git a/components/script/dom/eventsource.rs b/components/script/dom/eventsource.rs
index 7945eec0b7d..49e612cd6eb 100644
--- a/components/script/dom/eventsource.rs
+++ b/components/script/dom/eventsource.rs
@@ -1,81 +1,85 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-use dom::bindings::cell::DOMRefCell;
-use dom::bindings::codegen::Bindings::EventHandlerBinding::EventHandlerNonNull;
-use dom::bindings::codegen::Bindings::EventSourceBinding::{EventSourceInit, EventSourceMethods, Wrap};
-use dom::bindings::error::{Error, Fallible};
-use dom::bindings::inheritance::Castable;
-use dom::bindings::js::Root;
-use dom::bindings::refcounted::Trusted;
-use dom::bindings::reflector::{DomObject, reflect_dom_object};
-use dom::bindings::str::DOMString;
-use dom::event::Event;
-use dom::eventtarget::EventTarget;
-use dom::globalscope::GlobalScope;
-use dom::messageevent::MessageEvent;
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+use crate::dom::bindings::cell::DomRefCell;
+use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
+ EventSourceInit, EventSourceMethods,
+};
+use crate::dom::bindings::error::{Error, Fallible};
+use crate::dom::bindings::inheritance::Castable;
+use crate::dom::bindings::refcounted::Trusted;
+use crate::dom::bindings::reflector::{reflect_dom_object, DomObject};
+use crate::dom::bindings::root::DomRoot;
+use crate::dom::bindings::str::DOMString;
+use crate::dom::event::Event;
+use crate::dom::eventtarget::EventTarget;
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::messageevent::MessageEvent;
+use crate::dom::performanceresourcetiming::InitiatorType;
+use crate::fetch::{create_a_potential_cors_request, FetchCanceller};
+use crate::network_listener::{self, NetworkListener, PreInvoke, ResourceTimingListener};
+use crate::realms::enter_realm;
+use crate::task_source::{TaskSource, TaskSourceName};
+use crate::timers::OneshotTimerCallback;
use dom_struct::dom_struct;
-use encoding::Encoding;
-use encoding::all::UTF_8;
-use euclid::length::Length;
-use hyper::header::{Accept, qitem};
+use euclid::Length;
+use headers::ContentType;
+use http::header::{self, HeaderName, HeaderValue};
use ipc_channel::ipc;
use ipc_channel::router::ROUTER;
use js::conversions::ToJSValConvertible;
-use js::jsapi::JSAutoCompartment;
use js::jsval::UndefinedValue;
-use mime::{Mime, TopLevel, SubLevel};
-use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg, FetchResponseListener, NetworkError};
-use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
-use net_traits::request::{RequestInit, RequestMode};
-use network_listener::{NetworkListener, PreInvoke};
-use script_thread::Runnable;
+use mime::{self, Mime};
+use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder};
+use net_traits::{CoreResourceMsg, FetchChannels, FetchMetadata, FilteredMetadata};
+use net_traits::{FetchResponseListener, FetchResponseMsg, NetworkError};
+use net_traits::{ResourceFetchTiming, ResourceTimingType};
use servo_atoms::Atom;
use servo_url::ServoUrl;
use std::cell::Cell;
use std::mem;
use std::str::{Chars, FromStr};
use std::sync::{Arc, Mutex};
-use task_source::TaskSource;
-use timers::OneshotTimerCallback;
-
-header! { (LastEventId, "Last-Event-ID") => [String] }
+use utf8;
const DEFAULT_RECONNECTION_TIME: u64 = 5000;
-#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
+#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
struct GenerationId(u32);
-#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
-/// https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate
+#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
+/// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
enum ReadyState {
Connecting = 0,
Open = 1,
- Closed = 2
+ Closed = 2,
}
#[dom_struct]
pub struct EventSource {
eventtarget: EventTarget,
url: ServoUrl,
- request: DOMRefCell<Option<RequestInit>>,
- last_event_id: DOMRefCell<DOMString>,
+ request: DomRefCell<Option<RequestBuilder>>,
+ last_event_id: DomRefCell<DOMString>,
reconnection_time: Cell<u64>,
generation_id: Cell<GenerationId>,
ready_state: Cell<ReadyState>,
with_credentials: bool,
+ canceller: DomRefCell<FetchCanceller>,
}
enum ParserState {
Field,
Comment,
Value,
- Eol
+ Eol,
}
struct EventSourceContext {
+ incomplete_utf8: Option<utf8::Incomplete>,
+
event_source: Trusted<EventSource>,
gen_id: GenerationId,
action_sender: ipc::IpcSender<FetchResponseMsg>,
@@ -88,29 +92,39 @@ struct EventSourceContext {
event_type: String,
data: String,
last_event_id: String,
+
+ resource_timing: ResourceFetchTiming,
}
impl EventSourceContext {
+ /// <https://html.spec.whatwg.org/multipage/#announce-the-connection>
fn announce_the_connection(&self) {
let event_source = self.event_source.root();
if self.gen_id != event_source.generation_id.get() {
return;
}
- let runnable = box AnnounceConnectionRunnable {
- event_source: self.event_source.clone()
- };
- let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+ let global = event_source.global();
+ let event_source = self.event_source.clone();
+ // FIXME(nox): Why are errors silenced here?
+ let _ = global.remote_event_task_source().queue(
+ task!(announce_the_event_source_connection: move || {
+ let event_source = event_source.root();
+ if event_source.ready_state.get() != ReadyState::Closed {
+ event_source.ready_state.set(ReadyState::Open);
+ event_source.upcast::<EventTarget>().fire_event(atom!("open"));
+ }
+ }),
+ &global,
+ );
}
+ /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
fn fail_the_connection(&self) {
let event_source = self.event_source.root();
if self.gen_id != event_source.generation_id.get() {
return;
}
- let runnable = box FailConnectionRunnable {
- event_source: self.event_source.clone()
- };
- let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+ event_source.fail_the_connection();
}
// https://html.spec.whatwg.org/multipage/#reestablish-the-connection
@@ -121,12 +135,43 @@ impl EventSourceContext {
return;
}
- // Step 1
- let runnable = box ReestablishConnectionRunnable {
- event_source: self.event_source.clone(),
- action_sender: self.action_sender.clone()
- };
- let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+ let trusted_event_source = self.event_source.clone();
+ let action_sender = self.action_sender.clone();
+ let global = event_source.global();
+ // FIXME(nox): Why are errors silenced here?
+ let _ = global.remote_event_task_source().queue(
+ task!(reestablish_the_event_source_onnection: move || {
+ let event_source = trusted_event_source.root();
+
+ // Step 1.1.
+ if event_source.ready_state.get() == ReadyState::Closed {
+ return;
+ }
+
+ // Step 1.2.
+ event_source.ready_state.set(ReadyState::Connecting);
+
+ // Step 1.3.
+ event_source.upcast::<EventTarget>().fire_event(atom!("error"));
+
+ // Step 2.
+ let duration = Length::new(event_source.reconnection_time.get());
+
+ // Step 3.
+ // TODO: Optionally wait some more.
+
+ // Steps 4-5.
+ let callback = OneshotTimerCallback::EventSourceTimeout(
+ EventSourceTimeoutCallback {
+ event_source: trusted_event_source,
+ action_sender,
+ }
+ );
+ // FIXME(nox): Why are errors silenced here?
+ let _ = event_source.global().schedule_callback(callback, duration);
+ }),
+ &global,
+ );
}
// https://html.spec.whatwg.org/multipage/#processField
@@ -136,12 +181,14 @@ impl EventSourceContext {
"data" => {
self.data.push_str(&self.value);
self.data.push('\n');
- }
+ },
"id" => mem::swap(&mut self.last_event_id, &mut self.value),
- "retry" => if let Ok(time) = u64::from_str(&self.value) {
- self.event_source.root().reconnection_time.set(time);
+ "retry" => {
+ if let Ok(time) = u64::from_str(&self.value) {
+ self.event_source.root().reconnection_time.set(time);
+ }
},
- _ => ()
+ _ => (),
}
self.field.clear();
@@ -174,23 +221,42 @@ impl EventSourceContext {
};
// Steps 4-5
let event = {
- let _ac = JSAutoCompartment::new(event_source.global().get_cx(),
- event_source.reflector().get_jsobject().get());
- rooted!(in(event_source.global().get_cx()) let mut data = UndefinedValue());
- unsafe { self.data.to_jsval(event_source.global().get_cx(), data.handle_mut()) };
- MessageEvent::new(&*event_source.global(), type_, false, false, data.handle(),
- DOMString::from(self.origin.clone()),
- event_source.last_event_id.borrow().clone())
+ let _ac = enter_realm(&*event_source);
+ rooted!(in(*event_source.global().get_cx()) let mut data = UndefinedValue());
+ unsafe {
+ self.data
+ .to_jsval(*event_source.global().get_cx(), data.handle_mut())
+ };
+ MessageEvent::new(
+ &*event_source.global(),
+ type_,
+ false,
+ false,
+ data.handle(),
+ DOMString::from(self.origin.clone()),
+ None,
+ event_source.last_event_id.borrow().clone(),
+ Vec::with_capacity(0),
+ )
};
// Step 7
self.event_type.clear();
self.data.clear();
- // Step 8
- let runnable = box DispatchEventRunnable {
- event_source: self.event_source.clone(),
- event: Trusted::new(&event)
- };
- let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+
+ // Step 8.
+ let global = event_source.global();
+ let event_source = self.event_source.clone();
+ let event = Trusted::new(&*event);
+ // FIXME(nox): Why are errors silenced here?
+ let _ = global.remote_event_task_source().queue(
+ task!(dispatch_the_event_source_event: move || {
+ let event_source = event_source.root();
+ if event_source.ready_state.get() != ReadyState::Closed {
+ event.root().upcast::<Event>().fire(&event_source.upcast());
+ }
+ }),
+ &global,
+ );
}
// https://html.spec.whatwg.org/multipage/#event-stream-interpretation
@@ -205,31 +271,31 @@ impl EventSourceContext {
if let Some(&' ') = stream.peek() {
stream.next();
}
- }
+ },
('\n', &ParserState::Value) => {
self.parser_state = ParserState::Eol;
self.process_field();
- }
+ },
('\r', &ParserState::Value) => {
if let Some(&'\n') = stream.peek() {
continue;
}
self.parser_state = ParserState::Eol;
self.process_field();
- }
+ },
('\n', &ParserState::Field) => {
self.parser_state = ParserState::Eol;
self.process_field();
- }
+ },
('\r', &ParserState::Field) => {
if let Some(&'\n') = stream.peek() {
continue;
}
self.parser_state = ParserState::Eol;
self.process_field();
- }
+ },
('\n', &ParserState::Eol) => self.dispatch_event(),
('\r', &ParserState::Eol) => {
@@ -237,7 +303,7 @@ impl EventSourceContext {
continue;
}
self.dispatch_event();
- }
+ },
('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
('\r', &ParserState::Comment) => {
@@ -245,14 +311,14 @@ impl EventSourceContext {
continue;
}
self.parser_state = ParserState::Eol;
- }
+ },
(_, &ParserState::Field) => self.field.push(ch),
(_, &ParserState::Value) => self.value.push(ch),
(_, &ParserState::Eol) => {
self.parser_state = ParserState::Field;
self.field.push(ch);
- }
+ },
(_, &ParserState::Comment) => (),
}
}
@@ -273,34 +339,102 @@ impl FetchResponseListener for EventSourceContext {
Ok(fm) => {
let meta = match fm {
FetchMetadata::Unfiltered(m) => m,
- FetchMetadata::Filtered { unsafe_, .. } => unsafe_
+ FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
+ FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
+ return self.fail_the_connection()
+ },
+ _ => unsafe_,
+ },
};
- match meta.content_type {
- None => self.fail_the_connection(),
- Some(ct) => match ct.into_inner().0 {
- Mime(TopLevel::Text, SubLevel::EventStream, _) => {
- self.origin = meta.final_url.origin().unicode_serialization();
- self.announce_the_connection();
- }
- _ => self.fail_the_connection()
- }
+ let mime = match meta.content_type {
+ None => return self.fail_the_connection(),
+ Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
+ };
+ if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
+ return self.fail_the_connection();
}
- }
+ self.origin = meta.final_url.origin().ascii_serialization();
+ self.announce_the_connection();
+ },
Err(_) => {
- self.reestablish_the_connection();
- }
+ // The spec advises failing here if reconnecting would be
+ // "futile", with no more specific advice; WPT tests
+ // consider a non-http(s) scheme to be futile.
+ match self.event_source.root().url.scheme() {
+ "http" | "https" => self.reestablish_the_connection(),
+ _ => self.fail_the_connection(),
+ }
+ },
}
}
fn process_response_chunk(&mut self, chunk: Vec<u8>) {
- let mut stream = String::new();
- UTF_8.raw_decoder().raw_feed(&chunk, &mut stream);
- self.parse(stream.chars())
+ let mut input = &*chunk;
+ if let Some(mut incomplete) = self.incomplete_utf8.take() {
+ match incomplete.try_complete(input) {
+ None => return,
+ Some((result, remaining_input)) => {
+ self.parse(result.unwrap_or("\u{FFFD}").chars());
+ input = remaining_input;
+ },
+ }
+ }
+
+ while !input.is_empty() {
+ match utf8::decode(&input) {
+ Ok(s) => {
+ self.parse(s.chars());
+ return;
+ },
+ Err(utf8::DecodeError::Invalid {
+ valid_prefix,
+ remaining_input,
+ ..
+ }) => {
+ self.parse(valid_prefix.chars());
+ self.parse("\u{FFFD}".chars());
+ input = remaining_input;
+ },
+ Err(utf8::DecodeError::Incomplete {
+ valid_prefix,
+ incomplete_suffix,
+ }) => {
+ self.parse(valid_prefix.chars());
+ self.incomplete_utf8 = Some(incomplete_suffix);
+ return;
+ },
+ }
+ }
}
- fn process_response_eof(&mut self, _response: Result<(), NetworkError>) {
+ fn process_response_eof(&mut self, _response: Result<ResourceFetchTiming, NetworkError>) {
+ if let Some(_) = self.incomplete_utf8.take() {
+ self.parse("\u{FFFD}".chars());
+ }
self.reestablish_the_connection();
}
+
+ fn resource_timing_mut(&mut self) -> &mut ResourceFetchTiming {
+ &mut self.resource_timing
+ }
+
+ fn resource_timing(&self) -> &ResourceFetchTiming {
+ &self.resource_timing
+ }
+
+ fn submit_resource_timing(&mut self) {
+ network_listener::submit_timing(self)
+ }
+}
+
+impl ResourceTimingListener for EventSourceContext {
+ fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
+ (InitiatorType::Other, self.event_source.root().url().clone())
+ }
+
+ fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
+ self.event_source.root().global()
+ }
}
impl PreInvoke for EventSourceContext {
@@ -314,39 +448,77 @@ impl EventSource {
EventSource {
eventtarget: EventTarget::new_inherited(),
url: url,
- request: DOMRefCell::new(None),
- last_event_id: DOMRefCell::new(DOMString::from("")),
+ request: DomRefCell::new(None),
+ last_event_id: DomRefCell::new(DOMString::from("")),
reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
generation_id: Cell::new(GenerationId(0)),
ready_state: Cell::new(ReadyState::Connecting),
with_credentials: with_credentials,
+ canceller: DomRefCell::new(Default::default()),
}
}
- fn new(global: &GlobalScope, url: ServoUrl, with_credentials: bool) -> Root<EventSource> {
- reflect_dom_object(box EventSource::new_inherited(url, with_credentials),
- global,
- Wrap)
+ fn new(global: &GlobalScope, url: ServoUrl, with_credentials: bool) -> DomRoot<EventSource> {
+ reflect_dom_object(
+ Box::new(EventSource::new_inherited(url, with_credentials)),
+ global,
+ )
}
- pub fn request(&self) -> RequestInit {
+ // https://html.spec.whatwg.org/multipage/#sse-processing-model:fail-the-connection-3
+ pub fn cancel(&self) {
+ self.canceller.borrow_mut().cancel();
+ self.fail_the_connection();
+ }
+
+ /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
+ pub fn fail_the_connection(&self) {
+ let global = self.global();
+ let event_source = Trusted::new(self);
+ // FIXME(nox): Why are errors silenced here?
+ let _ = global.remote_event_task_source().queue(
+ task!(fail_the_event_source_connection: move || {
+ let event_source = event_source.root();
+ if event_source.ready_state.get() != ReadyState::Closed {
+ event_source.ready_state.set(ReadyState::Closed);
+ event_source.upcast::<EventTarget>().fire_event(atom!("error"));
+ }
+ }),
+ &global,
+ );
+ }
+
+ pub fn request(&self) -> RequestBuilder {
self.request.borrow().clone().unwrap()
}
- pub fn Constructor(global: &GlobalScope,
- url: DOMString,
- event_source_init: &EventSourceInit) -> Fallible<Root<EventSource>> {
+ pub fn url(&self) -> &ServoUrl {
+ &self.url
+ }
+
+ // https://html.spec.whatwg.org/multipage/#dom-eventsource
+ #[allow(non_snake_case)]
+ pub fn Constructor(
+ global: &GlobalScope,
+ url: DOMString,
+ event_source_init: &EventSourceInit,
+ ) -> Fallible<DomRoot<EventSource>> {
// TODO: Step 2 relevant settings object
// Step 3
let base_url = global.api_base_url();
let url_record = match base_url.join(&*url) {
Ok(u) => u,
// Step 4
- Err(_) => return Err(Error::Syntax)
+ Err(_) => return Err(Error::Syntax),
};
// Step 1, 5
- let ev = EventSource::new(global, url_record.clone(), event_source_init.withCredentials);
+ let ev = EventSource::new(
+ global,
+ url_record.clone(),
+ event_source_init.withCredentials,
+ );
+ global.track_event_source(&ev);
// Steps 6-7
let cors_attribute_state = if event_source_init.withCredentials {
CorsSettings::UseCredentials
@@ -355,22 +527,22 @@ impl EventSource {
};
// Step 8
// TODO: Step 9 set request's client settings
- let mut request = RequestInit {
- url: url_record,
- origin: global.get_url(),
- pipeline_id: Some(global.pipeline_id()),
- // https://html.spec.whatwg.org/multipage/#create-a-potential-cors-request
- use_url_credentials: true,
- mode: RequestMode::CorsMode,
- credentials_mode: if cors_attribute_state == CorsSettings::Anonymous {
- CredentialsMode::CredentialsSameOrigin
- } else {
- CredentialsMode::Include
- },
- ..RequestInit::default()
- };
+ let mut request = create_a_potential_cors_request(
+ url_record,
+ Destination::None,
+ Some(cors_attribute_state),
+ Some(true),
+ global.get_referrer(),
+ )
+ .origin(global.origin().immutable().clone())
+ .pipeline_id(Some(global.pipeline_id()));
+
// Step 10
- request.headers.set(Accept(vec![qitem(mime!(Text / EventStream))]));
+ // TODO(eijebong): Replace once typed headers allow it
+ request.headers.insert(
+ header::ACCEPT,
+ HeaderValue::from_static("text/event-stream"),
+ );
// Step 11
request.cache_mode = CacheMode::NoStore;
// Step 12
@@ -378,6 +550,8 @@ impl EventSource {
// Step 14
let (action_sender, action_receiver) = ipc::channel().unwrap();
let context = EventSourceContext {
+ incomplete_utf8: None,
+
event_source: Trusted::new(&ev),
gen_id: ev.generation_id.get(),
action_sender: action_sender.clone(),
@@ -390,21 +564,41 @@ impl EventSource {
event_type: String::new(),
data: String::new(),
last_event_id: String::new(),
+ resource_timing: ResourceFetchTiming::new(ResourceTimingType::Resource),
};
let listener = NetworkListener {
context: Arc::new(Mutex::new(context)),
task_source: global.networking_task_source(),
- wrapper: Some(global.get_runnable_wrapper())
+ canceller: Some(global.task_canceller(TaskSourceName::Networking)),
};
- ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
- listener.notify_fetch(message.to().unwrap());
- });
- global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
+ ROUTER.add_route(
+ action_receiver.to_opaque(),
+ Box::new(move |message| {
+ listener.notify_fetch(message.to().unwrap());
+ }),
+ );
+ let cancel_receiver = ev.canceller.borrow_mut().initialize();
+ global
+ .core_resource_thread()
+ .send(CoreResourceMsg::Fetch(
+ request,
+ FetchChannels::ResponseMsg(action_sender, Some(cancel_receiver)),
+ ))
+ .unwrap();
// Step 13
Ok(ev)
}
}
+// https://html.spec.whatwg.org/multipage/#garbage-collection-2
+impl Drop for EventSource {
+ fn drop(&mut self) {
+ // If an EventSource object is garbage collected while its connection is still open,
+ // the user agent must abort any instance of the fetch algorithm opened by this EventSource.
+ self.canceller.borrow_mut().cancel();
+ }
+}
+
impl EventSourceMethods for EventSource {
// https://html.spec.whatwg.org/multipage/#handler-eventsource-onopen
event_handler!(open, GetOnopen, SetOnopen);
@@ -434,80 +628,16 @@ impl EventSourceMethods for EventSource {
fn Close(&self) {
let GenerationId(prev_id) = self.generation_id.get();
self.generation_id.set(GenerationId(prev_id + 1));
+ self.canceller.borrow_mut().cancel();
self.ready_state.set(ReadyState::Closed);
}
}
-pub struct AnnounceConnectionRunnable {
- event_source: Trusted<EventSource>,
-}
-
-impl Runnable for AnnounceConnectionRunnable {
- fn name(&self) -> &'static str { "EventSource AnnounceConnectionRunnable" }
-
- // https://html.spec.whatwg.org/multipage/#announce-the-connection
- fn handler(self: Box<AnnounceConnectionRunnable>) {
- let event_source = self.event_source.root();
- if event_source.ready_state.get() != ReadyState::Closed {
- event_source.ready_state.set(ReadyState::Open);
- event_source.upcast::<EventTarget>().fire_event(atom!("open"));
- }
- }
-}
-
-pub struct FailConnectionRunnable {
- event_source: Trusted<EventSource>,
-}
-
-impl Runnable for FailConnectionRunnable {
- fn name(&self) -> &'static str { "EventSource FailConnectionRunnable" }
-
- // https://html.spec.whatwg.org/multipage/#fail-the-connection
- fn handler(self: Box<FailConnectionRunnable>) {
- let event_source = self.event_source.root();
- if event_source.ready_state.get() != ReadyState::Closed {
- event_source.ready_state.set(ReadyState::Closed);
- event_source.upcast::<EventTarget>().fire_event(atom!("error"));
- }
- }
-}
-
-pub struct ReestablishConnectionRunnable {
- event_source: Trusted<EventSource>,
- action_sender: ipc::IpcSender<FetchResponseMsg>,
-}
-
-impl Runnable for ReestablishConnectionRunnable {
- fn name(&self) -> &'static str { "EventSource ReestablishConnectionRunnable" }
-
- // https://html.spec.whatwg.org/multipage/#reestablish-the-connection
- fn handler(self: Box<ReestablishConnectionRunnable>) {
- let event_source = self.event_source.root();
- // Step 1.1
- if event_source.ready_state.get() == ReadyState::Closed {
- return;
- }
- // Step 1.2
- event_source.ready_state.set(ReadyState::Connecting);
- // Step 1.3
- event_source.upcast::<EventTarget>().fire_event(atom!("error"));
- // Step 2
- let duration = Length::new(event_source.reconnection_time.get());
- // TODO Step 3: Optionally wait some more
- // Steps 4-5
- let callback = OneshotTimerCallback::EventSourceTimeout(EventSourceTimeoutCallback {
- event_source: self.event_source.clone(),
- action_sender: self.action_sender.clone()
- });
- let _ = event_source.global().schedule_callback(callback, duration);
- }
-}
-
-#[derive(JSTraceable, HeapSizeOf)]
+#[derive(JSTraceable, MallocSizeOf)]
pub struct EventSourceTimeoutCallback {
- #[ignore_heap_size_of = "Because it is non-owning"]
+ #[ignore_malloc_size_of = "Because it is non-owning"]
event_source: Trusted<EventSource>,
- #[ignore_heap_size_of = "Because it is non-owning"]
+ #[ignore_malloc_size_of = "Because it is non-owning"]
action_sender: ipc::IpcSender<FetchResponseMsg>,
}
@@ -524,27 +654,20 @@ impl EventSourceTimeoutCallback {
let mut request = event_source.request();
// Step 5.3
if !event_source.last_event_id.borrow().is_empty() {
- request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone())));
+ //TODO(eijebong): Change this once typed header support custom values
+ request.headers.insert(
+ HeaderName::from_static("last-event-id"),
+ HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
+ .unwrap(),
+ );
}
// Step 5.4
- global.core_resource_thread().send(CoreResourceMsg::Fetch(request, self.action_sender)).unwrap();
- }
-}
-
-pub struct DispatchEventRunnable {
- event_source: Trusted<EventSource>,
- event: Trusted<MessageEvent>,
-}
-
-impl Runnable for DispatchEventRunnable {
- fn name(&self) -> &'static str { "EventSource DispatchEventRunnable" }
-
- // https://html.spec.whatwg.org/multipage/#dispatchMessage
- fn handler(self: Box<DispatchEventRunnable>) {
- let event_source = self.event_source.root();
- // Step 8
- if event_source.ready_state.get() != ReadyState::Closed {
- self.event.root().upcast::<Event>().fire(&event_source.upcast());
- }
+ global
+ .core_resource_thread()
+ .send(CoreResourceMsg::Fetch(
+ request,
+ FetchChannels::ResponseMsg(self.action_sender, None),
+ ))
+ .unwrap();
}
}