diff options
Diffstat (limited to 'components/script/dom/eventsource.rs')
-rw-r--r-- | components/script/dom/eventsource.rs | 549 |
1 files changed, 336 insertions, 213 deletions
diff --git a/components/script/dom/eventsource.rs b/components/script/dom/eventsource.rs index 7945eec0b7d..49e612cd6eb 100644 --- a/components/script/dom/eventsource.rs +++ b/components/script/dom/eventsource.rs @@ -1,81 +1,85 @@ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -use dom::bindings::cell::DOMRefCell; -use dom::bindings::codegen::Bindings::EventHandlerBinding::EventHandlerNonNull; -use dom::bindings::codegen::Bindings::EventSourceBinding::{EventSourceInit, EventSourceMethods, Wrap}; -use dom::bindings::error::{Error, Fallible}; -use dom::bindings::inheritance::Castable; -use dom::bindings::js::Root; -use dom::bindings::refcounted::Trusted; -use dom::bindings::reflector::{DomObject, reflect_dom_object}; -use dom::bindings::str::DOMString; -use dom::event::Event; -use dom::eventtarget::EventTarget; -use dom::globalscope::GlobalScope; -use dom::messageevent::MessageEvent; + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use crate::dom::bindings::cell::DomRefCell; +use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{ + EventSourceInit, EventSourceMethods, +}; +use crate::dom::bindings::error::{Error, Fallible}; +use crate::dom::bindings::inheritance::Castable; +use crate::dom::bindings::refcounted::Trusted; +use crate::dom::bindings::reflector::{reflect_dom_object, DomObject}; +use crate::dom::bindings::root::DomRoot; +use crate::dom::bindings::str::DOMString; +use crate::dom::event::Event; +use crate::dom::eventtarget::EventTarget; +use crate::dom::globalscope::GlobalScope; +use crate::dom::messageevent::MessageEvent; +use crate::dom::performanceresourcetiming::InitiatorType; +use crate::fetch::{create_a_potential_cors_request, FetchCanceller}; +use crate::network_listener::{self, NetworkListener, PreInvoke, ResourceTimingListener}; +use crate::realms::enter_realm; +use crate::task_source::{TaskSource, TaskSourceName}; +use crate::timers::OneshotTimerCallback; use dom_struct::dom_struct; -use encoding::Encoding; -use encoding::all::UTF_8; -use euclid::length::Length; -use hyper::header::{Accept, qitem}; +use euclid::Length; +use headers::ContentType; +use http::header::{self, HeaderName, HeaderValue}; use ipc_channel::ipc; use ipc_channel::router::ROUTER; use js::conversions::ToJSValConvertible; -use js::jsapi::JSAutoCompartment; use js::jsval::UndefinedValue; -use mime::{Mime, TopLevel, SubLevel}; -use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg, FetchResponseListener, NetworkError}; -use net_traits::request::{CacheMode, CorsSettings, CredentialsMode}; -use net_traits::request::{RequestInit, RequestMode}; -use network_listener::{NetworkListener, PreInvoke}; -use script_thread::Runnable; +use mime::{self, Mime}; +use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder}; +use net_traits::{CoreResourceMsg, FetchChannels, FetchMetadata, FilteredMetadata}; +use net_traits::{FetchResponseListener, FetchResponseMsg, NetworkError}; +use net_traits::{ResourceFetchTiming, ResourceTimingType}; use servo_atoms::Atom; use servo_url::ServoUrl; use std::cell::Cell; use std::mem; use std::str::{Chars, FromStr}; use std::sync::{Arc, Mutex}; -use task_source::TaskSource; -use timers::OneshotTimerCallback; - -header! { (LastEventId, "Last-Event-ID") => [String] } +use utf8; const DEFAULT_RECONNECTION_TIME: u64 = 5000; -#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)] +#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)] struct GenerationId(u32); -#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)] -/// https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate +#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)] +/// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate> enum ReadyState { Connecting = 0, Open = 1, - Closed = 2 + Closed = 2, } #[dom_struct] pub struct EventSource { eventtarget: EventTarget, url: ServoUrl, - request: DOMRefCell<Option<RequestInit>>, - last_event_id: DOMRefCell<DOMString>, + request: DomRefCell<Option<RequestBuilder>>, + last_event_id: DomRefCell<DOMString>, reconnection_time: Cell<u64>, generation_id: Cell<GenerationId>, ready_state: Cell<ReadyState>, with_credentials: bool, + canceller: DomRefCell<FetchCanceller>, } enum ParserState { Field, Comment, Value, - Eol + Eol, } struct EventSourceContext { + incomplete_utf8: Option<utf8::Incomplete>, + event_source: Trusted<EventSource>, gen_id: GenerationId, action_sender: ipc::IpcSender<FetchResponseMsg>, @@ -88,29 +92,39 @@ struct EventSourceContext { event_type: String, data: String, last_event_id: String, + + resource_timing: ResourceFetchTiming, } impl EventSourceContext { + /// <https://html.spec.whatwg.org/multipage/#announce-the-connection> fn announce_the_connection(&self) { let event_source = self.event_source.root(); if self.gen_id != event_source.generation_id.get() { return; } - let runnable = box AnnounceConnectionRunnable { - event_source: self.event_source.clone() - }; - let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); + let global = event_source.global(); + let event_source = self.event_source.clone(); + // FIXME(nox): Why are errors silenced here? + let _ = global.remote_event_task_source().queue( + task!(announce_the_event_source_connection: move || { + let event_source = event_source.root(); + if event_source.ready_state.get() != ReadyState::Closed { + event_source.ready_state.set(ReadyState::Open); + event_source.upcast::<EventTarget>().fire_event(atom!("open")); + } + }), + &global, + ); } + /// <https://html.spec.whatwg.org/multipage/#fail-the-connection> fn fail_the_connection(&self) { let event_source = self.event_source.root(); if self.gen_id != event_source.generation_id.get() { return; } - let runnable = box FailConnectionRunnable { - event_source: self.event_source.clone() - }; - let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); + event_source.fail_the_connection(); } // https://html.spec.whatwg.org/multipage/#reestablish-the-connection @@ -121,12 +135,43 @@ impl EventSourceContext { return; } - // Step 1 - let runnable = box ReestablishConnectionRunnable { - event_source: self.event_source.clone(), - action_sender: self.action_sender.clone() - }; - let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); + let trusted_event_source = self.event_source.clone(); + let action_sender = self.action_sender.clone(); + let global = event_source.global(); + // FIXME(nox): Why are errors silenced here? + let _ = global.remote_event_task_source().queue( + task!(reestablish_the_event_source_onnection: move || { + let event_source = trusted_event_source.root(); + + // Step 1.1. + if event_source.ready_state.get() == ReadyState::Closed { + return; + } + + // Step 1.2. + event_source.ready_state.set(ReadyState::Connecting); + + // Step 1.3. + event_source.upcast::<EventTarget>().fire_event(atom!("error")); + + // Step 2. + let duration = Length::new(event_source.reconnection_time.get()); + + // Step 3. + // TODO: Optionally wait some more. + + // Steps 4-5. + let callback = OneshotTimerCallback::EventSourceTimeout( + EventSourceTimeoutCallback { + event_source: trusted_event_source, + action_sender, + } + ); + // FIXME(nox): Why are errors silenced here? + let _ = event_source.global().schedule_callback(callback, duration); + }), + &global, + ); } // https://html.spec.whatwg.org/multipage/#processField @@ -136,12 +181,14 @@ impl EventSourceContext { "data" => { self.data.push_str(&self.value); self.data.push('\n'); - } + }, "id" => mem::swap(&mut self.last_event_id, &mut self.value), - "retry" => if let Ok(time) = u64::from_str(&self.value) { - self.event_source.root().reconnection_time.set(time); + "retry" => { + if let Ok(time) = u64::from_str(&self.value) { + self.event_source.root().reconnection_time.set(time); + } }, - _ => () + _ => (), } self.field.clear(); @@ -174,23 +221,42 @@ impl EventSourceContext { }; // Steps 4-5 let event = { - let _ac = JSAutoCompartment::new(event_source.global().get_cx(), - event_source.reflector().get_jsobject().get()); - rooted!(in(event_source.global().get_cx()) let mut data = UndefinedValue()); - unsafe { self.data.to_jsval(event_source.global().get_cx(), data.handle_mut()) }; - MessageEvent::new(&*event_source.global(), type_, false, false, data.handle(), - DOMString::from(self.origin.clone()), - event_source.last_event_id.borrow().clone()) + let _ac = enter_realm(&*event_source); + rooted!(in(*event_source.global().get_cx()) let mut data = UndefinedValue()); + unsafe { + self.data + .to_jsval(*event_source.global().get_cx(), data.handle_mut()) + }; + MessageEvent::new( + &*event_source.global(), + type_, + false, + false, + data.handle(), + DOMString::from(self.origin.clone()), + None, + event_source.last_event_id.borrow().clone(), + Vec::with_capacity(0), + ) }; // Step 7 self.event_type.clear(); self.data.clear(); - // Step 8 - let runnable = box DispatchEventRunnable { - event_source: self.event_source.clone(), - event: Trusted::new(&event) - }; - let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); + + // Step 8. + let global = event_source.global(); + let event_source = self.event_source.clone(); + let event = Trusted::new(&*event); + // FIXME(nox): Why are errors silenced here? + let _ = global.remote_event_task_source().queue( + task!(dispatch_the_event_source_event: move || { + let event_source = event_source.root(); + if event_source.ready_state.get() != ReadyState::Closed { + event.root().upcast::<Event>().fire(&event_source.upcast()); + } + }), + &global, + ); } // https://html.spec.whatwg.org/multipage/#event-stream-interpretation @@ -205,31 +271,31 @@ impl EventSourceContext { if let Some(&' ') = stream.peek() { stream.next(); } - } + }, ('\n', &ParserState::Value) => { self.parser_state = ParserState::Eol; self.process_field(); - } + }, ('\r', &ParserState::Value) => { if let Some(&'\n') = stream.peek() { continue; } self.parser_state = ParserState::Eol; self.process_field(); - } + }, ('\n', &ParserState::Field) => { self.parser_state = ParserState::Eol; self.process_field(); - } + }, ('\r', &ParserState::Field) => { if let Some(&'\n') = stream.peek() { continue; } self.parser_state = ParserState::Eol; self.process_field(); - } + }, ('\n', &ParserState::Eol) => self.dispatch_event(), ('\r', &ParserState::Eol) => { @@ -237,7 +303,7 @@ impl EventSourceContext { continue; } self.dispatch_event(); - } + }, ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol, ('\r', &ParserState::Comment) => { @@ -245,14 +311,14 @@ impl EventSourceContext { continue; } self.parser_state = ParserState::Eol; - } + }, (_, &ParserState::Field) => self.field.push(ch), (_, &ParserState::Value) => self.value.push(ch), (_, &ParserState::Eol) => { self.parser_state = ParserState::Field; self.field.push(ch); - } + }, (_, &ParserState::Comment) => (), } } @@ -273,34 +339,102 @@ impl FetchResponseListener for EventSourceContext { Ok(fm) => { let meta = match fm { FetchMetadata::Unfiltered(m) => m, - FetchMetadata::Filtered { unsafe_, .. } => unsafe_ + FetchMetadata::Filtered { unsafe_, filtered } => match filtered { + FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => { + return self.fail_the_connection() + }, + _ => unsafe_, + }, }; - match meta.content_type { - None => self.fail_the_connection(), - Some(ct) => match ct.into_inner().0 { - Mime(TopLevel::Text, SubLevel::EventStream, _) => { - self.origin = meta.final_url.origin().unicode_serialization(); - self.announce_the_connection(); - } - _ => self.fail_the_connection() - } + let mime = match meta.content_type { + None => return self.fail_the_connection(), + Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()), + }; + if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) { + return self.fail_the_connection(); } - } + self.origin = meta.final_url.origin().ascii_serialization(); + self.announce_the_connection(); + }, Err(_) => { - self.reestablish_the_connection(); - } + // The spec advises failing here if reconnecting would be + // "futile", with no more specific advice; WPT tests + // consider a non-http(s) scheme to be futile. + match self.event_source.root().url.scheme() { + "http" | "https" => self.reestablish_the_connection(), + _ => self.fail_the_connection(), + } + }, } } fn process_response_chunk(&mut self, chunk: Vec<u8>) { - let mut stream = String::new(); - UTF_8.raw_decoder().raw_feed(&chunk, &mut stream); - self.parse(stream.chars()) + let mut input = &*chunk; + if let Some(mut incomplete) = self.incomplete_utf8.take() { + match incomplete.try_complete(input) { + None => return, + Some((result, remaining_input)) => { + self.parse(result.unwrap_or("\u{FFFD}").chars()); + input = remaining_input; + }, + } + } + + while !input.is_empty() { + match utf8::decode(&input) { + Ok(s) => { + self.parse(s.chars()); + return; + }, + Err(utf8::DecodeError::Invalid { + valid_prefix, + remaining_input, + .. + }) => { + self.parse(valid_prefix.chars()); + self.parse("\u{FFFD}".chars()); + input = remaining_input; + }, + Err(utf8::DecodeError::Incomplete { + valid_prefix, + incomplete_suffix, + }) => { + self.parse(valid_prefix.chars()); + self.incomplete_utf8 = Some(incomplete_suffix); + return; + }, + } + } } - fn process_response_eof(&mut self, _response: Result<(), NetworkError>) { + fn process_response_eof(&mut self, _response: Result<ResourceFetchTiming, NetworkError>) { + if let Some(_) = self.incomplete_utf8.take() { + self.parse("\u{FFFD}".chars()); + } self.reestablish_the_connection(); } + + fn resource_timing_mut(&mut self) -> &mut ResourceFetchTiming { + &mut self.resource_timing + } + + fn resource_timing(&self) -> &ResourceFetchTiming { + &self.resource_timing + } + + fn submit_resource_timing(&mut self) { + network_listener::submit_timing(self) + } +} + +impl ResourceTimingListener for EventSourceContext { + fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) { + (InitiatorType::Other, self.event_source.root().url().clone()) + } + + fn resource_timing_global(&self) -> DomRoot<GlobalScope> { + self.event_source.root().global() + } } impl PreInvoke for EventSourceContext { @@ -314,39 +448,77 @@ impl EventSource { EventSource { eventtarget: EventTarget::new_inherited(), url: url, - request: DOMRefCell::new(None), - last_event_id: DOMRefCell::new(DOMString::from("")), + request: DomRefCell::new(None), + last_event_id: DomRefCell::new(DOMString::from("")), reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME), generation_id: Cell::new(GenerationId(0)), ready_state: Cell::new(ReadyState::Connecting), with_credentials: with_credentials, + canceller: DomRefCell::new(Default::default()), } } - fn new(global: &GlobalScope, url: ServoUrl, with_credentials: bool) -> Root<EventSource> { - reflect_dom_object(box EventSource::new_inherited(url, with_credentials), - global, - Wrap) + fn new(global: &GlobalScope, url: ServoUrl, with_credentials: bool) -> DomRoot<EventSource> { + reflect_dom_object( + Box::new(EventSource::new_inherited(url, with_credentials)), + global, + ) } - pub fn request(&self) -> RequestInit { + // https://html.spec.whatwg.org/multipage/#sse-processing-model:fail-the-connection-3 + pub fn cancel(&self) { + self.canceller.borrow_mut().cancel(); + self.fail_the_connection(); + } + + /// <https://html.spec.whatwg.org/multipage/#fail-the-connection> + pub fn fail_the_connection(&self) { + let global = self.global(); + let event_source = Trusted::new(self); + // FIXME(nox): Why are errors silenced here? + let _ = global.remote_event_task_source().queue( + task!(fail_the_event_source_connection: move || { + let event_source = event_source.root(); + if event_source.ready_state.get() != ReadyState::Closed { + event_source.ready_state.set(ReadyState::Closed); + event_source.upcast::<EventTarget>().fire_event(atom!("error")); + } + }), + &global, + ); + } + + pub fn request(&self) -> RequestBuilder { self.request.borrow().clone().unwrap() } - pub fn Constructor(global: &GlobalScope, - url: DOMString, - event_source_init: &EventSourceInit) -> Fallible<Root<EventSource>> { + pub fn url(&self) -> &ServoUrl { + &self.url + } + + // https://html.spec.whatwg.org/multipage/#dom-eventsource + #[allow(non_snake_case)] + pub fn Constructor( + global: &GlobalScope, + url: DOMString, + event_source_init: &EventSourceInit, + ) -> Fallible<DomRoot<EventSource>> { // TODO: Step 2 relevant settings object // Step 3 let base_url = global.api_base_url(); let url_record = match base_url.join(&*url) { Ok(u) => u, // Step 4 - Err(_) => return Err(Error::Syntax) + Err(_) => return Err(Error::Syntax), }; // Step 1, 5 - let ev = EventSource::new(global, url_record.clone(), event_source_init.withCredentials); + let ev = EventSource::new( + global, + url_record.clone(), + event_source_init.withCredentials, + ); + global.track_event_source(&ev); // Steps 6-7 let cors_attribute_state = if event_source_init.withCredentials { CorsSettings::UseCredentials @@ -355,22 +527,22 @@ impl EventSource { }; // Step 8 // TODO: Step 9 set request's client settings - let mut request = RequestInit { - url: url_record, - origin: global.get_url(), - pipeline_id: Some(global.pipeline_id()), - // https://html.spec.whatwg.org/multipage/#create-a-potential-cors-request - use_url_credentials: true, - mode: RequestMode::CorsMode, - credentials_mode: if cors_attribute_state == CorsSettings::Anonymous { - CredentialsMode::CredentialsSameOrigin - } else { - CredentialsMode::Include - }, - ..RequestInit::default() - }; + let mut request = create_a_potential_cors_request( + url_record, + Destination::None, + Some(cors_attribute_state), + Some(true), + global.get_referrer(), + ) + .origin(global.origin().immutable().clone()) + .pipeline_id(Some(global.pipeline_id())); + // Step 10 - request.headers.set(Accept(vec![qitem(mime!(Text / EventStream))])); + // TODO(eijebong): Replace once typed headers allow it + request.headers.insert( + header::ACCEPT, + HeaderValue::from_static("text/event-stream"), + ); // Step 11 request.cache_mode = CacheMode::NoStore; // Step 12 @@ -378,6 +550,8 @@ impl EventSource { // Step 14 let (action_sender, action_receiver) = ipc::channel().unwrap(); let context = EventSourceContext { + incomplete_utf8: None, + event_source: Trusted::new(&ev), gen_id: ev.generation_id.get(), action_sender: action_sender.clone(), @@ -390,21 +564,41 @@ impl EventSource { event_type: String::new(), data: String::new(), last_event_id: String::new(), + resource_timing: ResourceFetchTiming::new(ResourceTimingType::Resource), }; let listener = NetworkListener { context: Arc::new(Mutex::new(context)), task_source: global.networking_task_source(), - wrapper: Some(global.get_runnable_wrapper()) + canceller: Some(global.task_canceller(TaskSourceName::Networking)), }; - ROUTER.add_route(action_receiver.to_opaque(), box move |message| { - listener.notify_fetch(message.to().unwrap()); - }); - global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap(); + ROUTER.add_route( + action_receiver.to_opaque(), + Box::new(move |message| { + listener.notify_fetch(message.to().unwrap()); + }), + ); + let cancel_receiver = ev.canceller.borrow_mut().initialize(); + global + .core_resource_thread() + .send(CoreResourceMsg::Fetch( + request, + FetchChannels::ResponseMsg(action_sender, Some(cancel_receiver)), + )) + .unwrap(); // Step 13 Ok(ev) } } +// https://html.spec.whatwg.org/multipage/#garbage-collection-2 +impl Drop for EventSource { + fn drop(&mut self) { + // If an EventSource object is garbage collected while its connection is still open, + // the user agent must abort any instance of the fetch algorithm opened by this EventSource. + self.canceller.borrow_mut().cancel(); + } +} + impl EventSourceMethods for EventSource { // https://html.spec.whatwg.org/multipage/#handler-eventsource-onopen event_handler!(open, GetOnopen, SetOnopen); @@ -434,80 +628,16 @@ impl EventSourceMethods for EventSource { fn Close(&self) { let GenerationId(prev_id) = self.generation_id.get(); self.generation_id.set(GenerationId(prev_id + 1)); + self.canceller.borrow_mut().cancel(); self.ready_state.set(ReadyState::Closed); } } -pub struct AnnounceConnectionRunnable { - event_source: Trusted<EventSource>, -} - -impl Runnable for AnnounceConnectionRunnable { - fn name(&self) -> &'static str { "EventSource AnnounceConnectionRunnable" } - - // https://html.spec.whatwg.org/multipage/#announce-the-connection - fn handler(self: Box<AnnounceConnectionRunnable>) { - let event_source = self.event_source.root(); - if event_source.ready_state.get() != ReadyState::Closed { - event_source.ready_state.set(ReadyState::Open); - event_source.upcast::<EventTarget>().fire_event(atom!("open")); - } - } -} - -pub struct FailConnectionRunnable { - event_source: Trusted<EventSource>, -} - -impl Runnable for FailConnectionRunnable { - fn name(&self) -> &'static str { "EventSource FailConnectionRunnable" } - - // https://html.spec.whatwg.org/multipage/#fail-the-connection - fn handler(self: Box<FailConnectionRunnable>) { - let event_source = self.event_source.root(); - if event_source.ready_state.get() != ReadyState::Closed { - event_source.ready_state.set(ReadyState::Closed); - event_source.upcast::<EventTarget>().fire_event(atom!("error")); - } - } -} - -pub struct ReestablishConnectionRunnable { - event_source: Trusted<EventSource>, - action_sender: ipc::IpcSender<FetchResponseMsg>, -} - -impl Runnable for ReestablishConnectionRunnable { - fn name(&self) -> &'static str { "EventSource ReestablishConnectionRunnable" } - - // https://html.spec.whatwg.org/multipage/#reestablish-the-connection - fn handler(self: Box<ReestablishConnectionRunnable>) { - let event_source = self.event_source.root(); - // Step 1.1 - if event_source.ready_state.get() == ReadyState::Closed { - return; - } - // Step 1.2 - event_source.ready_state.set(ReadyState::Connecting); - // Step 1.3 - event_source.upcast::<EventTarget>().fire_event(atom!("error")); - // Step 2 - let duration = Length::new(event_source.reconnection_time.get()); - // TODO Step 3: Optionally wait some more - // Steps 4-5 - let callback = OneshotTimerCallback::EventSourceTimeout(EventSourceTimeoutCallback { - event_source: self.event_source.clone(), - action_sender: self.action_sender.clone() - }); - let _ = event_source.global().schedule_callback(callback, duration); - } -} - -#[derive(JSTraceable, HeapSizeOf)] +#[derive(JSTraceable, MallocSizeOf)] pub struct EventSourceTimeoutCallback { - #[ignore_heap_size_of = "Because it is non-owning"] + #[ignore_malloc_size_of = "Because it is non-owning"] event_source: Trusted<EventSource>, - #[ignore_heap_size_of = "Because it is non-owning"] + #[ignore_malloc_size_of = "Because it is non-owning"] action_sender: ipc::IpcSender<FetchResponseMsg>, } @@ -524,27 +654,20 @@ impl EventSourceTimeoutCallback { let mut request = event_source.request(); // Step 5.3 if !event_source.last_event_id.borrow().is_empty() { - request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone()))); + //TODO(eijebong): Change this once typed header support custom values + request.headers.insert( + HeaderName::from_static("last-event-id"), + HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone())) + .unwrap(), + ); } // Step 5.4 - global.core_resource_thread().send(CoreResourceMsg::Fetch(request, self.action_sender)).unwrap(); - } -} - -pub struct DispatchEventRunnable { - event_source: Trusted<EventSource>, - event: Trusted<MessageEvent>, -} - -impl Runnable for DispatchEventRunnable { - fn name(&self) -> &'static str { "EventSource DispatchEventRunnable" } - - // https://html.spec.whatwg.org/multipage/#dispatchMessage - fn handler(self: Box<DispatchEventRunnable>) { - let event_source = self.event_source.root(); - // Step 8 - if event_source.ready_state.get() != ReadyState::Closed { - self.event.root().upcast::<Event>().fire(&event_source.upcast()); - } + global + .core_resource_thread() + .send(CoreResourceMsg::Fetch( + request, + FetchChannels::ResponseMsg(self.action_sender, None), + )) + .unwrap(); } } |