diff options
author | Simon Sapin <simon.sapin@exyr.org> | 2017-12-17 23:53:32 +0100 |
---|---|---|
committer | Gregory Terzian <gterzian@users.noreply.github.com> | 2018-09-12 13:33:32 +0800 |
commit | 2a996fbc8fef722b264389680cc55c25c46807d1 (patch) | |
tree | 22c19321899cf4b35384c8c8e186f1a0bf7913c8 /components/script | |
parent | b977b4994c678ce1d9bca69be72d095522c25f71 (diff) | |
download | servo-2a996fbc8fef722b264389680cc55c25c46807d1.tar.gz servo-2a996fbc8fef722b264389680cc55c25c46807d1.zip |
Replace mpsc with crossbeam/servo channel, update ipc-channel
Co-authored-by: Gregory Terzian <gterzian@users.noreply.github.com>
Diffstat (limited to 'components/script')
21 files changed, 112 insertions, 152 deletions
diff --git a/components/script/Cargo.toml b/components/script/Cargo.toml index 1a475189a31..ad69761ec51 100644 --- a/components/script/Cargo.toml +++ b/components/script/Cargo.toml @@ -53,7 +53,7 @@ html5ever = "0.22" hyper = "0.10" hyper_serde = "0.8" image = "0.19" -ipc-channel = "0.10" +ipc-channel = "0.11" itertools = "0.7.6" jstraceable_derive = {path = "../jstraceable_derive"} lazy_static = "1" @@ -85,6 +85,7 @@ serde_bytes = "0.10" servo_allocator = {path = "../allocator"} servo_arc = {path = "../servo_arc"} servo_atoms = {path = "../atoms"} +servo_channel = {path = "../channel"} servo_config = {path = "../config"} servo_geometry = {path = "../geometry" } servo-media = {git = "https://github.com/servo/media"} diff --git a/components/script/dom/abstractworkerglobalscope.rs b/components/script/dom/abstractworkerglobalscope.rs index 45c594dbaee..e3e6fbcf813 100644 --- a/components/script/dom/abstractworkerglobalscope.rs +++ b/components/script/dom/abstractworkerglobalscope.rs @@ -11,7 +11,7 @@ use dom::globalscope::GlobalScope; use dom::worker::TrustedWorkerAddress; use dom::workerglobalscope::WorkerGlobalScope; use script_runtime::{ScriptChan, CommonScriptMsg, ScriptPort}; -use std::sync::mpsc::{Receiver, Select, Sender}; +use servo_channel::{Receiver, Sender}; use task_queue::{QueuedTaskConversion, TaskQueue}; /// A ScriptChan that can be cloned freely and will silently send a TrustedWorkerAddress with @@ -65,9 +65,9 @@ impl ScriptChan for WorkerThreadWorkerChan { impl ScriptPort for Receiver<DedicatedWorkerScriptMsg> { fn recv(&self) -> Result<CommonScriptMsg, ()> { let common_msg = match self.recv() { - Ok(DedicatedWorkerScriptMsg::CommonWorker(_worker, common_msg)) => common_msg, - Err(_) => return Err(()), - Ok(DedicatedWorkerScriptMsg::WakeUp) => panic!("unexpected worker event message!") + Some(DedicatedWorkerScriptMsg::CommonWorker(_worker, common_msg)) => common_msg, + None => return Err(()), + Some(DedicatedWorkerScriptMsg::WakeUp) => panic!("unexpected worker event message!") }; match common_msg { WorkerScriptMsg::Common(script_msg) => Ok(script_msg), @@ -89,7 +89,6 @@ pub trait WorkerEventLoopMethods { fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> Self::Event; } -#[allow(unsafe_code)] // https://html.spec.whatwg.org/multipage/#worker-event-loop pub fn run_worker_event_loop<T, TimerMsg, WorkerMsg, Event>(worker_scope: &T, worker: Option<&TrustedWorkerAddress>) @@ -101,31 +100,18 @@ where + DomObject { let scope = worker_scope.upcast::<WorkerGlobalScope>(); let timer_event_port = worker_scope.timer_event_port(); - let devtools_port = scope.from_devtools_receiver(); + let devtools_port = match scope.from_devtools_sender() { + Some(_) => Some(scope.from_devtools_receiver().select()), + None => None, + }; let task_queue = worker_scope.task_queue(); - let sel = Select::new(); - let mut worker_handle = sel.handle(task_queue.select()); - let mut timer_event_handle = sel.handle(timer_event_port); - let mut devtools_handle = sel.handle(devtools_port); - unsafe { - worker_handle.add(); - timer_event_handle.add(); - if scope.from_devtools_sender().is_some() { - devtools_handle.add(); - } - } - let ret = sel.wait(); - let event = { - if ret == worker_handle.id() { - task_queue.take_tasks(); + let event = select! { + recv(task_queue.select(), msg) => { + task_queue.take_tasks(msg.unwrap()); worker_scope.from_worker_msg(task_queue.recv().unwrap()) - } else if ret == timer_event_handle.id() { - worker_scope.from_timer_msg(timer_event_port.recv().unwrap()) - } else if ret == devtools_handle.id() { - worker_scope.from_devtools_msg(devtools_port.recv().unwrap()) - } else { - panic!("unexpected select result!") - } + }, + recv(timer_event_port.select(), msg) => worker_scope.from_timer_msg(msg.unwrap()), + recv(devtools_port, msg) => worker_scope.from_devtools_msg(msg.unwrap()), }; let mut sequential = vec![]; sequential.push(event); @@ -138,14 +124,14 @@ where // Batch all events that are ready. // The task queue will throttle non-priority tasks if necessary. match task_queue.try_recv() { - Err(_) => match timer_event_port.try_recv() { - Err(_) => match devtools_port.try_recv() { - Err(_) => break, - Ok(ev) => sequential.push(worker_scope.from_devtools_msg(ev)), + None => match timer_event_port.try_recv() { + None => match devtools_port.and_then(|port| port.try_recv()) { + None => break, + Some(ev) => sequential.push(worker_scope.from_devtools_msg(ev)), }, - Ok(ev) => sequential.push(worker_scope.from_timer_msg(ev)), + Some(ev) => sequential.push(worker_scope.from_timer_msg(ev)), }, - Ok(ev) => sequential.push(worker_scope.from_worker_msg(ev)), + Some(ev) => sequential.push(worker_scope.from_worker_msg(ev)), } } // Step 3 diff --git a/components/script/dom/bindings/trace.rs b/components/script/dom/bindings/trace.rs index 41d1231a580..33cd3e02b55 100644 --- a/components/script/dom/bindings/trace.rs +++ b/components/script/dom/bindings/trace.rs @@ -88,6 +88,7 @@ use selectors::matching::ElementSelectorFlags; use serde::{Deserialize, Serialize}; use servo_arc::Arc as ServoArc; use servo_atoms::Atom; +use servo_channel::{Receiver, Sender}; use servo_media::Backend; use servo_media::audio::buffer_source_node::AudioBuffer; use servo_media::audio::context::AudioContext; @@ -104,7 +105,6 @@ use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize}; -use std::sync::mpsc::{Receiver, Sender}; use std::time::{SystemTime, Instant}; use style::attr::{AttrIdentifier, AttrValue, LengthOrPercentageOrAuto}; use style::context::QuirksMode; diff --git a/components/script/dom/dedicatedworkerglobalscope.rs b/components/script/dom/dedicatedworkerglobalscope.rs index c582177f42e..791190e0fd2 100644 --- a/components/script/dom/dedicatedworkerglobalscope.rs +++ b/components/script/dom/dedicatedworkerglobalscope.rs @@ -36,12 +36,12 @@ use net_traits::request::{CredentialsMode, Destination, RequestInit}; use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort, new_rt_and_cx, Runtime}; use script_runtime::ScriptThreadEventCategory::WorkerEvent; use script_traits::{TimerEvent, TimerSource, WorkerGlobalScopeInit, WorkerScriptLoadOrigin}; +use servo_channel::{channel, route_ipc_receiver_to_new_servo_sender, Sender, Receiver}; use servo_rand::random; use servo_url::ServoUrl; use std::mem::replace; use std::sync::Arc; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; use style::thread_state::{self, ThreadState}; use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue}; @@ -313,7 +313,7 @@ impl DedicatedWorkerGlobalScope { let runtime = unsafe { new_rt_and_cx() }; let (devtools_mpsc_chan, devtools_mpsc_port) = channel(); - ROUTER.route_ipc_receiver_to_mpsc_sender(from_devtools_receiver, devtools_mpsc_chan); + route_ipc_receiver_to_new_servo_sender(from_devtools_receiver, devtools_mpsc_chan); let (timer_tx, timer_rx) = channel(); let (timer_ipc_chan, timer_ipc_port) = ipc::channel().unwrap(); diff --git a/components/script/dom/paintworkletglobalscope.rs b/components/script/dom/paintworkletglobalscope.rs index bf07610719f..352eae0ed0e 100644 --- a/components/script/dom/paintworkletglobalscope.rs +++ b/components/script/dom/paintworkletglobalscope.rs @@ -60,8 +60,6 @@ use std::ptr::null_mut; use std::rc::Rc; use std::sync::Arc; use std::sync::Mutex; -use std::sync::mpsc; -use std::sync::mpsc::Sender; use std::thread; use std::time::Duration; use style_traits::CSSPixel; @@ -352,7 +350,7 @@ impl PaintWorkletGlobalScope { arguments: Vec<String>) -> Result<DrawAPaintImageResult, PaintWorkletError> { let name = self.name.clone(); - let (sender, receiver) = mpsc::channel(); + let (sender, receiver) = channel(); let task = PaintWorkletTask::DrawAPaintImage(name, size, device_pixel_ratio, diff --git a/components/script/dom/serviceworkerglobalscope.rs b/components/script/dom/serviceworkerglobalscope.rs index 50080f9925b..3781ac3be69 100644 --- a/components/script/dom/serviceworkerglobalscope.rs +++ b/components/script/dom/serviceworkerglobalscope.rs @@ -22,17 +22,16 @@ use dom::worker::TrustedWorkerAddress; use dom::workerglobalscope::WorkerGlobalScope; use dom_struct::dom_struct; use ipc_channel::ipc::{self, IpcSender, IpcReceiver}; -use ipc_channel::router::ROUTER; use js::jsapi::{JSAutoCompartment, JSContext, JS_AddInterruptCallback}; use js::jsval::UndefinedValue; use net_traits::{load_whole_resource, IpcSend, CustomResponseMediator}; use net_traits::request::{CredentialsMode, Destination, RequestInit}; use script_runtime::{CommonScriptMsg, ScriptChan, new_rt_and_cx, Runtime}; use script_traits::{TimerEvent, WorkerGlobalScopeInit, ScopeThings, ServiceWorkerMsg, WorkerScriptLoadOrigin}; +use servo_channel::{channel, route_ipc_receiver_to_new_servo_sender, Receiver, Sender}; use servo_config::prefs::PREFS; use servo_rand::random; use servo_url::ServoUrl; -use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; use std::time::Duration; use style::thread_state::{self, ThreadState}; @@ -272,7 +271,7 @@ impl ServiceWorkerGlobalScope { let runtime = unsafe { new_rt_and_cx() }; let (devtools_mpsc_chan, devtools_mpsc_port) = channel(); - ROUTER.route_ipc_receiver_to_mpsc_sender(devtools_receiver, devtools_mpsc_chan); + route_ipc_receiver_to_new_servo_sender(devtools_receiver, devtools_mpsc_chan); // TODO XXXcreativcoder use this timer_ipc_port, when we have a service worker instance here let (timer_ipc_chan, _timer_ipc_port) = ipc::channel().unwrap(); let (timer_chan, timer_port) = channel(); diff --git a/components/script/dom/servoparser/async_html.rs b/components/script/dom/servoparser/async_html.rs index 7a1319dc31d..5fe911deb0b 100644 --- a/components/script/dom/servoparser/async_html.rs +++ b/components/script/dom/servoparser/async_html.rs @@ -27,12 +27,12 @@ use html5ever::tendril::fmt::UTF8; use html5ever::tokenizer::{Tokenizer as HtmlTokenizer, TokenizerOpts, TokenizerResult}; use html5ever::tree_builder::{ElementFlags, NodeOrText as HtmlNodeOrText, NextParserState, QuirksMode, TreeSink}; use html5ever::tree_builder::{TreeBuilder, TreeBuilderOpts}; +use servo_channel::{channel, Receiver, Sender}; use servo_url::ServoUrl; use std::borrow::Cow; use std::cell::Cell; use std::collections::HashMap; use std::collections::vec_deque::VecDeque; -use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread; use style::context::QuirksMode as ServoQuirksMode; diff --git a/components/script/dom/testworkletglobalscope.rs b/components/script/dom/testworkletglobalscope.rs index 22be6a4765f..a31159de215 100644 --- a/components/script/dom/testworkletglobalscope.rs +++ b/components/script/dom/testworkletglobalscope.rs @@ -13,9 +13,9 @@ use dom::workletglobalscope::WorkletGlobalScopeInit; use dom_struct::dom_struct; use js::rust::Runtime; use msg::constellation_msg::PipelineId; +use servo_channel::Sender; use servo_url::ServoUrl; use std::collections::HashMap; -use std::sync::mpsc::Sender; // check-tidy: no specs after this line diff --git a/components/script/dom/vrdisplay.rs b/components/script/dom/vrdisplay.rs index 397ec725a4d..d44d9d0fa22 100644 --- a/components/script/dom/vrdisplay.rs +++ b/components/script/dom/vrdisplay.rs @@ -36,11 +36,11 @@ use profile_traits::ipc; use script_runtime::CommonScriptMsg; use script_runtime::ScriptThreadEventCategory::WebVREvent; use serde_bytes::ByteBuf; +use servo_channel::{channel, Sender}; use std::cell::Cell; use std::mem; use std::ops::Deref; use std::rc::Rc; -use std::sync::mpsc; use std::thread; use task_source::TaskSourceName; use webvr_traits::{WebVRDisplayData, WebVRDisplayEvent, WebVRFrameData, WebVRLayer, WebVRMsg}; @@ -505,7 +505,7 @@ impl VRDisplay { // while the render thread is syncing the VRFrameData to be used for the current frame. // This thread runs until the user calls ExitPresent, the tab is closed or some unexpected error happened. thread::Builder::new().name("WebVR_RAF".into()).spawn(move || { - let (raf_sender, raf_receiver) = mpsc::channel(); + let (raf_sender, raf_receiver) = channel(); let mut near = near_init; let mut far = far_init; @@ -581,7 +581,7 @@ impl VRDisplay { self.frame_data_status.set(status); } - fn handle_raf(&self, end_sender: &mpsc::Sender<Result<(f64, f64), ()>>) { + fn handle_raf(&self, end_sender: &Sender<Result<(f64, f64), ()>>) { self.frame_data_status.set(VRFrameDataStatus::Waiting); self.running_display_raf.set(true); diff --git a/components/script/dom/window.rs b/components/script/dom/window.rs index 052f121fb2a..65630e126a5 100644 --- a/components/script/dom/window.rs +++ b/components/script/dom/window.rs @@ -94,6 +94,7 @@ use script_traits::{TimerSchedulerMsg, UntrustedNodeAddress, WindowSizeData, Win use script_traits::webdriver_msg::{WebDriverJSError, WebDriverJSResult}; use selectors::attr::CaseSensitivity; use servo_arc; +use servo_channel::{channel, Sender}; use servo_config::opts; use servo_geometry::{f32_rect_to_au_rect, MaxRect}; use servo_url::{Host, MutableOrigin, ImmutableOrigin, ServoUrl}; @@ -109,8 +110,6 @@ use std::mem; use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Sender, channel}; -use std::sync::mpsc::TryRecvError::{Disconnected, Empty}; use style::error_reporting::ParseErrorReporter; use style::media_queries; use style::parser::ParserContext as CssParserContext; @@ -1376,15 +1375,16 @@ impl Window { debug!("script: layout forked"); - let complete = match join_port.try_recv() { - Err(Empty) => { + let complete = select! { + recv(join_port.select(), msg) => if let Some(reflow_complete) = msg { + reflow_complete + } else { + panic!("Layout thread failed while script was waiting for a result."); + }, + default => { info!("script: waiting on layout"); join_port.recv().unwrap() } - Ok(reflow_complete) => reflow_complete, - Err(Disconnected) => { - panic!("Layout thread failed while script was waiting for a result."); - } }; debug!("script: layout joined"); diff --git a/components/script/dom/worker.rs b/components/script/dom/worker.rs index 653a2fa141c..e5a1bfed223 100644 --- a/components/script/dom/worker.rs +++ b/components/script/dom/worker.rs @@ -25,10 +25,10 @@ use js::jsapi::{JSAutoCompartment, JSContext, JS_RequestInterruptCallback}; use js::jsval::UndefinedValue; use js::rust::HandleValue; use script_traits::WorkerScriptLoadOrigin; +use servo_channel::{channel, Sender}; use std::cell::Cell; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Sender, channel}; use task::TaskOnce; pub type TrustedWorkerAddress = Trusted<Worker>; diff --git a/components/script/dom/workerglobalscope.rs b/components/script/dom/workerglobalscope.rs index ac372c5fe98..da1f0ba7d36 100644 --- a/components/script/dom/workerglobalscope.rs +++ b/components/script/dom/workerglobalscope.rs @@ -36,12 +36,12 @@ use net_traits::request::{CredentialsMode, Destination, RequestInit as NetReques use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort, get_reports, Runtime}; use script_traits::{TimerEvent, TimerEventId}; use script_traits::WorkerGlobalScopeInit; +use servo_channel::Receiver; use servo_url::{MutableOrigin, ServoUrl}; use std::default::Default; use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::Receiver; use task::TaskCanceller; use task_source::file_reading::FileReadingTaskSource; use task_source::networking::NetworkingTaskSource; diff --git a/components/script/dom/worklet.rs b/components/script/dom/worklet.rs index d1b7ba9c44a..498cce10680 100644 --- a/components/script/dom/worklet.rs +++ b/components/script/dom/worklet.rs @@ -48,6 +48,7 @@ use script_runtime::Runtime; use script_runtime::ScriptThreadEventCategory; use script_runtime::new_rt_and_cx; use script_thread::{MainThreadScriptMsg, ScriptThread}; +use servo_channel::{channel, Sender, Receiver}; use servo_rand; use servo_url::ImmutableOrigin; use servo_url::ServoUrl; @@ -58,9 +59,6 @@ use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::AtomicIsize; use std::sync::atomic::Ordering; -use std::sync::mpsc; -use std::sync::mpsc::Receiver; -use std::sync::mpsc::Sender; use std::thread; use style::thread_state::{self, ThreadState}; use swapper::Swapper; @@ -309,7 +307,7 @@ impl WorkletThreadPool { /// For testing. pub fn test_worklet_lookup(&self, id: WorkletId, key: String) -> Option<String> { - let (sender, receiver) = mpsc::channel(); + let (sender, receiver) = channel(); let msg = WorkletData::Task(id, WorkletTask::Test(TestWorkletTask::Lookup(key, sender))); let _ = self.primary_sender.send(msg); receiver.recv().expect("Test worklet has died?") @@ -355,7 +353,7 @@ struct WorkletThreadRole { impl WorkletThreadRole { fn new(is_hot_backup: bool, is_cold_backup: bool) -> WorkletThreadRole { - let (sender, receiver) = mpsc::channel(); + let (sender, receiver) = channel(); WorkletThreadRole { sender: sender, receiver: receiver, @@ -419,7 +417,7 @@ impl WorkletThread { #[allow(unsafe_code)] #[allow(unrooted_must_root)] fn spawn(role: WorkletThreadRole, init: WorkletThreadInit) -> Sender<WorkletControl> { - let (control_sender, control_receiver) = mpsc::channel(); + let (control_sender, control_receiver) = channel(); // TODO: name this thread thread::spawn(move || { // TODO: add a new IN_WORKLET thread state? @@ -488,12 +486,12 @@ impl WorkletThread { if let Some(control) = self.control_buffer.take() { self.process_control(control); } - while let Ok(control) = self.control_receiver.try_recv() { + while let Some(control) = self.control_receiver.try_recv() { self.process_control(control); } self.gc(); } else if self.control_buffer.is_none() { - if let Ok(control) = self.control_receiver.try_recv() { + if let Some(control) = self.control_receiver.try_recv() { self.control_buffer = Some(control); let msg = WorkletData::StartSwapRoles(self.role.sender.clone()); let _ = self.cold_backup_sender.send(msg); diff --git a/components/script/dom/workletglobalscope.rs b/components/script/dom/workletglobalscope.rs index 653c8e6468e..6a3dc033aad 100644 --- a/components/script/dom/workletglobalscope.rs +++ b/components/script/dom/workletglobalscope.rs @@ -26,11 +26,11 @@ use script_thread::MainThreadScriptMsg; use script_traits::{Painter, ScriptMsg}; use script_traits::{ScriptToConstellationChan, TimerSchedulerMsg}; use servo_atoms::Atom; +use servo_channel::Sender; use servo_url::ImmutableOrigin; use servo_url::MutableOrigin; use servo_url::ServoUrl; use std::sync::Arc; -use std::sync::mpsc::Sender; #[dom_struct] /// <https://drafts.css-houdini.org/worklets/#workletglobalscope> diff --git a/components/script/lib.rs b/components/script/lib.rs index 6f4ec73272a..fe0f18f7baf 100644 --- a/components/script/lib.rs +++ b/components/script/lib.rs @@ -6,7 +6,6 @@ #![cfg_attr(feature = "unstable", feature(on_unimplemented))] #![feature(const_fn)] #![feature(drain_filter)] -#![feature(mpsc_select)] #![feature(plugin)] #![feature(try_from)] diff --git a/components/script/script_thread.rs b/components/script/script_thread.rs index 8f9d98a2e93..4f55d49fc17 100644 --- a/components/script/script_thread.rs +++ b/components/script/script_thread.rs @@ -70,7 +70,6 @@ use hyper::header::ReferrerPolicy as ReferrerPolicyHeader; use hyper::mime::{Mime, SubLevel, TopLevel}; use hyper_serde::Serde; use ipc_channel::ipc::{self, IpcSender}; -use ipc_channel::router::ROUTER; use js::glue::GetWindowProxyClass; use js::jsapi::{JSAutoCompartment, JSContext, JS_SetWrapObjectCallbacks}; use js::jsapi::{JSTracer, SetWindowProxyClass}; @@ -101,6 +100,8 @@ use script_traits::CompositorEvent::{KeyEvent, MouseButtonEvent, MouseMoveEvent, use script_traits::webdriver_msg::WebDriverScriptCommand; use serviceworkerjob::{Job, JobQueue}; use servo_atoms::Atom; +use servo_channel::{channel, Receiver, Sender}; +use servo_channel::{route_ipc_receiver_to_new_servo_receiver, route_ipc_receiver_to_new_servo_sender}; use servo_config::opts; use servo_url::{ImmutableOrigin, MutableOrigin, ServoUrl}; use std::cell::Cell; @@ -113,7 +114,6 @@ use std::ptr; use std::rc::Rc; use std::result::Result; use std::sync::Arc; -use std::sync::mpsc::{Receiver, Select, Sender, channel}; use std::thread; use style::thread_state::{self, ThreadState}; use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue}; @@ -300,39 +300,39 @@ impl OpaqueSender<CommonScriptMsg> for Box<ScriptChan + Send> { impl ScriptPort for Receiver<CommonScriptMsg> { fn recv(&self) -> Result<CommonScriptMsg, ()> { - self.recv().map_err(|_| ()) + self.recv().ok_or(()) } } impl ScriptPort for Receiver<MainThreadScriptMsg> { fn recv(&self) -> Result<CommonScriptMsg, ()> { match self.recv() { - Ok(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg), - Ok(_) => panic!("unexpected main thread event message!"), - _ => Err(()), + Some(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg), + Some(_) => panic!("unexpected main thread event message!"), + None => Err(()), } } } impl ScriptPort for Receiver<(TrustedWorkerAddress, CommonScriptMsg)> { fn recv(&self) -> Result<CommonScriptMsg, ()> { - self.recv().map(|(_, msg)| msg).map_err(|_| ()) + self.recv().map(|(_, msg)| msg).ok_or(()) } } impl ScriptPort for Receiver<(TrustedWorkerAddress, MainThreadScriptMsg)> { fn recv(&self) -> Result<CommonScriptMsg, ()> { match self.recv().map(|(_, msg)| msg) { - Ok(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg), - Ok(_) => panic!("unexpected main thread event message!"), - _ => Err(()), + Some(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg), + Some(_) => panic!("unexpected main thread event message!"), + None => Err(()), } } } impl ScriptPort for Receiver<(TrustedServiceWorkerAddress, CommonScriptMsg)> { fn recv(&self) -> Result<CommonScriptMsg, ()> { - self.recv().map(|(_, msg)| msg).map_err(|_| ()) + self.recv().map(|(_, msg)| msg).ok_or(()) } } @@ -896,12 +896,12 @@ impl ScriptThread { // Ask the router to proxy IPC messages from the devtools to us. let (ipc_devtools_sender, ipc_devtools_receiver) = ipc::channel().unwrap(); - let devtools_port = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(ipc_devtools_receiver); + let devtools_port = route_ipc_receiver_to_new_servo_receiver(ipc_devtools_receiver); let (timer_event_chan, timer_event_port) = channel(); // Ask the router to proxy IPC messages from the control port to us. - let control_port = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(state.control_port); + let control_port = route_ipc_receiver_to_new_servo_receiver(state.control_port); let boxed_script_sender = Box::new(MainThreadScriptChan(chan.clone())); @@ -1019,37 +1019,15 @@ impl ScriptThread { // Receive at least one message so we don't spinloop. debug!("Waiting for event."); - let mut event = { - let sel = Select::new(); - let mut script_port = sel.handle(self.task_queue.select()); - let mut control_port = sel.handle(&self.control_port); - let mut timer_event_port = sel.handle(&self.timer_event_port); - let mut devtools_port = sel.handle(&self.devtools_port); - let mut image_cache_port = sel.handle(&self.image_cache_port); - unsafe { - script_port.add(); - control_port.add(); - timer_event_port.add(); - if self.devtools_chan.is_some() { - devtools_port.add(); - } - image_cache_port.add(); - } - let ret = sel.wait(); - if ret == script_port.id() { - self.task_queue.take_tasks(); + let mut event = select! { + recv(self.task_queue.select(), msg) => { + self.task_queue.take_tasks(msg.unwrap()); FromScript(self.task_queue.recv().unwrap()) - } else if ret == control_port.id() { - FromConstellation(self.control_port.recv().unwrap()) - } else if ret == timer_event_port.id() { - FromScheduler(self.timer_event_port.recv().unwrap()) - } else if ret == devtools_port.id() { - FromDevtools(self.devtools_port.recv().unwrap()) - } else if ret == image_cache_port.id() { - FromImageCache(self.image_cache_port.recv().unwrap()) - } else { - panic!("unexpected select result") - } + }, + recv(self.control_port.select(), msg) => FromConstellation(msg.unwrap()), + recv(self.timer_event_port.select(), msg) => FromScheduler(msg.unwrap()), + recv(self.devtools_chan.as_ref().map(|_| self.devtools_port.select()), msg) => FromDevtools(msg.unwrap()), + recv(self.image_cache_port.select(), msg) => FromImageCache(msg.unwrap()), }; debug!("Got event."); @@ -1131,20 +1109,20 @@ impl ScriptThread { // and check for more resize events. If there are no events pending, we'll move // on and execute the sequential non-resize events we've seen. match self.control_port.try_recv() { - Err(_) => match self.task_queue.try_recv() { - Err(_) => match self.timer_event_port.try_recv() { - Err(_) => match self.devtools_port.try_recv() { - Err(_) => match self.image_cache_port.try_recv() { - Err(_) => break, - Ok(ev) => event = FromImageCache(ev), + None => match self.task_queue.try_recv() { + None => match self.timer_event_port.try_recv() { + None => match self.devtools_port.try_recv() { + None => match self.image_cache_port.try_recv() { + None => break, + Some(ev) => event = FromImageCache(ev), }, - Ok(ev) => event = FromDevtools(ev), + Some(ev) => event = FromDevtools(ev), }, - Ok(ev) => event = FromScheduler(ev), + Some(ev) => event = FromScheduler(ev), }, - Ok(ev) => event = FromScript(ev), + Some(ev) => event = FromScript(ev), }, - Ok(ev) => event = FromConstellation(ev), + Some(ev) => event = FromConstellation(ev), } } @@ -2200,7 +2178,7 @@ impl ScriptThread { let HistoryTraversalTaskSource(ref history_sender) = self.history_traversal_task_source; let (ipc_timer_event_chan, ipc_timer_event_port) = ipc::channel().unwrap(); - ROUTER.route_ipc_receiver_to_mpsc_sender(ipc_timer_event_port, + route_ipc_receiver_to_new_servo_sender(ipc_timer_event_port, self.timer_event_chan.clone()); let origin = if final_url.as_str() == "about:blank" { diff --git a/components/script/serviceworker_manager.rs b/components/script/serviceworker_manager.rs index 2bf19aa25a5..4573a5666c3 100644 --- a/components/script/serviceworker_manager.rs +++ b/components/script/serviceworker_manager.rs @@ -13,13 +13,12 @@ use dom::bindings::structuredclone::StructuredCloneData; use dom::serviceworkerglobalscope::{ServiceWorkerGlobalScope, ServiceWorkerScriptMsg}; use dom::serviceworkerregistration::longest_prefix_match; use ipc_channel::ipc::{self, IpcSender}; -use ipc_channel::router::ROUTER; use net_traits::{CustomResponseMediator, CoreResourceMsg}; use script_traits::{ServiceWorkerMsg, ScopeThings, SWManagerMsg, SWManagerSenders, DOMMessage}; +use servo_channel::{channel, route_ipc_receiver_to_new_servo_receiver, Sender, Receiver}; use servo_config::prefs::PREFS; use servo_url::ServoUrl; use std::collections::HashMap; -use std::sync::mpsc::{channel, Sender, Receiver, RecvError}; use std::thread; enum Message { @@ -56,8 +55,8 @@ impl ServiceWorkerManager { pub fn spawn_manager(sw_senders: SWManagerSenders) { let (own_sender, from_constellation_receiver) = ipc::channel().unwrap(); let (resource_chan, resource_port) = ipc::channel().unwrap(); - let from_constellation = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(from_constellation_receiver); - let resource_port = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(resource_port); + let from_constellation = route_ipc_receiver_to_new_servo_receiver(from_constellation_receiver); + let resource_port = route_ipc_receiver_to_new_servo_receiver(resource_port); let _ = sw_senders.resource_sender.send(CoreResourceMsg::NetworkMediator(resource_chan)); let _ = sw_senders.swmanager_sender.send(SWManagerMsg::OwnSender(own_sender.clone())); thread::Builder::new().name("ServiceWorkerManager".to_owned()).spawn(move || { @@ -108,7 +107,7 @@ impl ServiceWorkerManager { } fn handle_message(&mut self) { - while let Ok(message) = self.receive_message() { + while let Some(message) = self.receive_message() { let should_continue = match message { Message::FromConstellation(msg) => { self.handle_message_from_constellation(msg) @@ -184,13 +183,10 @@ impl ServiceWorkerManager { true } - #[allow(unsafe_code)] - fn receive_message(&mut self) -> Result<Message, RecvError> { - let msg_from_constellation = &self.own_port; - let msg_from_resource = &self.resource_receiver; + fn receive_message(&mut self) -> Option<Message> { select! { - msg = msg_from_constellation.recv() => msg.map(Message::FromConstellation), - msg = msg_from_resource.recv() => msg.map(Message::FromResource) + recv(self.own_port.select(), msg) => msg.map(Message::FromConstellation), + recv(self.resource_receiver.select(), msg) => msg.map(Message::FromResource), } } } diff --git a/components/script/task_queue.rs b/components/script/task_queue.rs index 7093474e1ed..e7e2117b2f5 100644 --- a/components/script/task_queue.rs +++ b/components/script/task_queue.rs @@ -8,10 +8,10 @@ use dom::bindings::cell::DomRefCell; use dom::worker::TrustedWorkerAddress; use msg::constellation_msg::PipelineId; use script_runtime::ScriptThreadEventCategory; +use servo_channel::{Receiver, Sender, base_channel}; use std::cell::Cell; use std::collections::{HashMap, VecDeque}; use std::default::Default; -use std::sync::mpsc::{Receiver, Sender}; use task::TaskBox; use task_source::TaskSourceName; @@ -59,13 +59,18 @@ impl<T: QueuedTaskConversion> TaskQueue<T> { /// Process incoming tasks, immediately sending priority ones downstream, /// and categorizing potential throttles. - fn process_incoming_tasks(&self) { - let mut non_throttled: Vec<T> = self.port - .try_iter() - .filter(|msg| !msg.is_wake_up()) - .collect(); + fn process_incoming_tasks(&self, first_msg: T) { + let mut incoming = Vec::with_capacity(self.port.len() + 1); + if !first_msg.is_wake_up() { + incoming.push(first_msg); + } + while let Some(msg) = self.port.try_recv() { + if !msg.is_wake_up() { + incoming.push(msg); + } + } - let to_be_throttled: Vec<T> = non_throttled.drain_filter(|msg|{ + let to_be_throttled: Vec<T> = incoming.drain_filter(|msg|{ let task_source = match msg.task_source_name() { Some(task_source) => task_source, None => return false, @@ -80,7 +85,7 @@ impl<T: QueuedTaskConversion> TaskQueue<T> { } }).collect(); - for msg in non_throttled { + for msg in incoming { // Immediately send non-throttled tasks for processing. let _ = self.msg_queue.borrow_mut().push_back(msg); } @@ -101,31 +106,31 @@ impl<T: QueuedTaskConversion> TaskQueue<T> { /// Reset the queue for a new iteration of the event-loop, /// returning the port about whose readiness we want to be notified. - pub fn select(&self) -> &Receiver<T> { + pub fn select(&self) -> &base_channel::Receiver<T> { // This is a new iteration of the event-loop, so we reset the "business" counter. self.taken_task_counter.set(0); // We want to be notified when the script-port is ready to receive. // Hence that's the one we need to include in the select. - &self.port + self.port.select() } /// Take a message from the front of the queue, without waiting if empty. - pub fn recv(&self) -> Result<T, ()> { - self.msg_queue.borrow_mut().pop_front().ok_or(()) + pub fn recv(&self) -> Option<T> { + self.msg_queue.borrow_mut().pop_front() } /// Same as recv. - pub fn try_recv(&self) -> Result<T, ()> { + pub fn try_recv(&self) -> Option<T> { self.recv() } /// Drain the queue for the current iteration of the event-loop. /// Holding-back throttles above a given high-water mark. - pub fn take_tasks(&self) { + pub fn take_tasks(&self, first_msg: T) { // High-watermark: once reached, throttled tasks will be held-back. const PER_ITERATION_MAX: u64 = 5; // Always first check for new tasks, but don't reset 'taken_task_counter'. - self.process_incoming_tasks(); + self.process_incoming_tasks(first_msg); let mut throttled = self.throttled.borrow_mut(); let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum(); let task_source_names = TaskSourceName::all(); diff --git a/components/script/task_source/dom_manipulation.rs b/components/script/task_source/dom_manipulation.rs index f184aa3d828..e72110479de 100644 --- a/components/script/task_source/dom_manipulation.rs +++ b/components/script/task_source/dom_manipulation.rs @@ -11,9 +11,9 @@ use msg::constellation_msg::PipelineId; use script_runtime::{CommonScriptMsg, ScriptThreadEventCategory}; use script_thread::MainThreadScriptMsg; use servo_atoms::Atom; +use servo_channel::Sender; use std::fmt; use std::result::Result; -use std::sync::mpsc::Sender; use task::{TaskCanceller, TaskOnce}; use task_source::{TaskSource, TaskSourceName}; diff --git a/components/script/task_source/history_traversal.rs b/components/script/task_source/history_traversal.rs index ffd657adada..7f44ba368b5 100644 --- a/components/script/task_source/history_traversal.rs +++ b/components/script/task_source/history_traversal.rs @@ -4,7 +4,7 @@ use script_runtime::{ScriptChan, CommonScriptMsg}; use script_thread::MainThreadScriptMsg; -use std::sync::mpsc::Sender; +use servo_channel::Sender; #[derive(JSTraceable)] pub struct HistoryTraversalTaskSource(pub Sender<MainThreadScriptMsg>); diff --git a/components/script/task_source/user_interaction.rs b/components/script/task_source/user_interaction.rs index 2beaee95731..d30d54829b5 100644 --- a/components/script/task_source/user_interaction.rs +++ b/components/script/task_source/user_interaction.rs @@ -11,9 +11,9 @@ use msg::constellation_msg::PipelineId; use script_runtime::{CommonScriptMsg, ScriptThreadEventCategory}; use script_thread::MainThreadScriptMsg; use servo_atoms::Atom; +use servo_channel::Sender; use std::fmt; use std::result::Result; -use std::sync::mpsc::Sender; use task::{TaskCanceller, TaskOnce}; use task_source::{TaskSource, TaskSourceName}; |