diff options
author | Gregory Terzian <gterzian@users.noreply.github.com> | 2018-08-11 00:08:00 +0200 |
---|---|---|
committer | Gregory Terzian <gterzian@users.noreply.github.com> | 2018-08-31 02:10:34 +0800 |
commit | ca6306c4306b0d13a7e886f4308ee703fcb1b2b3 (patch) | |
tree | a27c1a4dcbcaebd9a182de5efb1c1f1b5fa5e862 /components/script | |
parent | da36740f0b1a48f5a4b4ed86318e110f1e63554b (diff) | |
download | servo-ca6306c4306b0d13a7e886f4308ee703fcb1b2b3.tar.gz servo-ca6306c4306b0d13a7e886f4308ee703fcb1b2b3.zip |
introduce task-queues, and throttling the performance-timeline task-source, in script and worker threads.
queue
Diffstat (limited to 'components/script')
-rw-r--r-- | components/script/dom/abstractworkerglobalscope.rs | 41 | ||||
-rw-r--r-- | components/script/dom/dedicatedworkerglobalscope.rs | 157 | ||||
-rw-r--r-- | components/script/dom/serviceworkerglobalscope.rs | 130 | ||||
-rw-r--r-- | components/script/dom/worker.rs | 10 | ||||
-rw-r--r-- | components/script/dom/workerglobalscope.rs | 2 | ||||
-rw-r--r-- | components/script/lib.rs | 2 | ||||
-rw-r--r-- | components/script/script_thread.rs | 67 | ||||
-rw-r--r-- | components/script/task_queue.rs | 165 |
8 files changed, 481 insertions, 93 deletions
diff --git a/components/script/dom/abstractworkerglobalscope.rs b/components/script/dom/abstractworkerglobalscope.rs index f6d868765ed..99c08dc2c38 100644 --- a/components/script/dom/abstractworkerglobalscope.rs +++ b/components/script/dom/abstractworkerglobalscope.rs @@ -3,9 +3,8 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use dom::abstractworker::WorkerScriptMsg; -use dom::bindings::refcounted::Trusted; -use dom::bindings::reflector::DomObject; -use dom::bindings::trace::JSTraceable; +use dom::dedicatedworkerglobalscope::DedicatedWorkerScriptMsg; +use dom::worker::TrustedWorkerAddress; use script_runtime::{ScriptChan, CommonScriptMsg, ScriptPort}; use std::sync::mpsc::{Receiver, Sender}; @@ -13,14 +12,15 @@ use std::sync::mpsc::{Receiver, Sender}; /// common event loop messages. While this SendableWorkerScriptChan is alive, the associated /// Worker object will remain alive. #[derive(Clone, JSTraceable)] -pub struct SendableWorkerScriptChan<T: DomObject> { - pub sender: Sender<(Trusted<T>, CommonScriptMsg)>, - pub worker: Trusted<T>, +pub struct SendableWorkerScriptChan { + pub sender: Sender<DedicatedWorkerScriptMsg>, + pub worker: TrustedWorkerAddress, } -impl<T: JSTraceable + DomObject + 'static> ScriptChan for SendableWorkerScriptChan<T> { +impl ScriptChan for SendableWorkerScriptChan { fn send(&self, msg: CommonScriptMsg) -> Result<(), ()> { - self.sender.send((self.worker.clone(), msg)).map_err(|_| ()) + let msg = DedicatedWorkerScriptMsg::CommonWorker(self.worker.clone(), WorkerScriptMsg::Common(msg)); + self.sender.send(msg).map_err(|_| ()) } fn clone(&self) -> Box<ScriptChan + Send> { @@ -35,15 +35,16 @@ impl<T: JSTraceable + DomObject + 'static> ScriptChan for SendableWorkerScriptCh /// worker event loop messages. While this SendableWorkerScriptChan is alive, the associated /// Worker object will remain alive. #[derive(Clone, JSTraceable)] -pub struct WorkerThreadWorkerChan<T: DomObject> { - pub sender: Sender<(Trusted<T>, WorkerScriptMsg)>, - pub worker: Trusted<T>, +pub struct WorkerThreadWorkerChan { + pub sender: Sender<DedicatedWorkerScriptMsg>, + pub worker: TrustedWorkerAddress, } -impl<T: JSTraceable + DomObject + 'static> ScriptChan for WorkerThreadWorkerChan<T> { +impl ScriptChan for WorkerThreadWorkerChan { fn send(&self, msg: CommonScriptMsg) -> Result<(), ()> { + let msg = DedicatedWorkerScriptMsg::CommonWorker(self.worker.clone(), WorkerScriptMsg::Common(msg)); self.sender - .send((self.worker.clone(), WorkerScriptMsg::Common(msg))) + .send(msg) .map_err(|_| ()) } @@ -55,12 +56,16 @@ impl<T: JSTraceable + DomObject + 'static> ScriptChan for WorkerThreadWorkerChan } } -impl<T: DomObject> ScriptPort for Receiver<(Trusted<T>, WorkerScriptMsg)> { +impl ScriptPort for Receiver<DedicatedWorkerScriptMsg> { fn recv(&self) -> Result<CommonScriptMsg, ()> { - match self.recv().map(|(_, msg)| msg) { - Ok(WorkerScriptMsg::Common(script_msg)) => Ok(script_msg), - Ok(WorkerScriptMsg::DOMMessage(_)) => panic!("unexpected worker event message!"), - Err(_) => Err(()), + let common_msg = match self.recv() { + Ok(DedicatedWorkerScriptMsg::CommonWorker(_worker, common_msg)) => common_msg, + Err(_) => return Err(()), + Ok(DedicatedWorkerScriptMsg::WakeUp) => panic!("unexpected worker event message!") + }; + match common_msg { + WorkerScriptMsg::Common(script_msg) => Ok(script_msg), + WorkerScriptMsg::DOMMessage(_) => panic!("unexpected worker event message!"), } } } diff --git a/components/script/dom/dedicatedworkerglobalscope.rs b/components/script/dom/dedicatedworkerglobalscope.rs index 1c8c2316dde..b71f9cd69b7 100644 --- a/components/script/dom/dedicatedworkerglobalscope.rs +++ b/components/script/dom/dedicatedworkerglobalscope.rs @@ -32,7 +32,7 @@ use js::rust::HandleValue; use msg::constellation_msg::TopLevelBrowsingContextId; use net_traits::{IpcSend, load_whole_resource}; use net_traits::request::{CredentialsMode, Destination, RequestInit}; -use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort, new_rt_and_cx, Runtime}; +use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort, ScriptThreadEventCategory, new_rt_and_cx, Runtime}; use script_runtime::ScriptThreadEventCategory::WorkerEvent; use script_traits::{TimerEvent, TimerSource, WorkerGlobalScopeInit, WorkerScriptLoadOrigin}; use servo_rand::random; @@ -40,9 +40,10 @@ use servo_url::ServoUrl; use std::mem::replace; use std::sync::Arc; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{Receiver, RecvError, Select, Sender, channel}; +use std::sync::mpsc::{Receiver, Select, Sender, channel}; use std::thread; use style::thread_state::{self, ThreadState}; +use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue}; /// Set the `worker` field of a related DedicatedWorkerGlobalScope object to a particular /// value for the duration of this object's lifetime. This ensures that the related Worker @@ -70,20 +71,81 @@ impl<'a> Drop for AutoWorkerReset<'a> { } } +pub enum DedicatedWorkerScriptMsg { + /// Standard message from a worker. + CommonWorker(TrustedWorkerAddress, WorkerScriptMsg), + /// Wake-up call from the task queue. + WakeUp, +} + enum MixedMessage { - FromWorker((TrustedWorkerAddress, WorkerScriptMsg)), + FromWorker(DedicatedWorkerScriptMsg), FromScheduler((TrustedWorkerAddress, TimerEvent)), FromDevtools(DevtoolScriptControlMsg) } +impl QueuedTaskConversion for DedicatedWorkerScriptMsg { + fn task_category(&self) -> Option<&ScriptThreadEventCategory> { + let common_worker_msg = match self { + DedicatedWorkerScriptMsg::CommonWorker(_, common_worker_msg) => common_worker_msg, + _ => return None, + }; + let script_msg = match common_worker_msg { + WorkerScriptMsg::Common(ref script_msg) => script_msg, + _ => return None, + }; + let category = match script_msg { + CommonScriptMsg::Task(category, _boxed, _pipeline_id) => category, + _ => return None, + }; + Some(category) + } + + fn into_queued_task(self) -> Option<QueuedTask> { + let (worker, common_worker_msg) = match self { + DedicatedWorkerScriptMsg::CommonWorker(worker, common_worker_msg) => (worker, common_worker_msg), + _ => return None, + }; + let script_msg = match common_worker_msg { + WorkerScriptMsg::Common(script_msg) => script_msg, + _ => return None, + }; + let (category, boxed, pipeline_id) = match script_msg { + CommonScriptMsg::Task(category, boxed, pipeline_id) => + (category, boxed, pipeline_id), + _ => return None, + }; + Some((Some(worker), category, boxed, pipeline_id)) + } + + fn from_queued_task(queued_task: QueuedTask) -> Self { + let (worker, category, boxed, pipeline_id) = queued_task; + let script_msg = CommonScriptMsg::Task(category, boxed, pipeline_id); + DedicatedWorkerScriptMsg::CommonWorker(worker.unwrap(), WorkerScriptMsg::Common(script_msg)) + } + + fn wake_up_msg() -> Self { + DedicatedWorkerScriptMsg::WakeUp + } + + fn is_wake_up(&self) -> bool { + match self { + DedicatedWorkerScriptMsg::WakeUp => true, + _ => false, + } + } +} + +unsafe_no_jsmanaged_fields!(TaskQueue<DedicatedWorkerScriptMsg>); + // https://html.spec.whatwg.org/multipage/#dedicatedworkerglobalscope #[dom_struct] pub struct DedicatedWorkerGlobalScope { workerglobalscope: WorkerGlobalScope, #[ignore_malloc_size_of = "Defined in std"] - receiver: Receiver<(TrustedWorkerAddress, WorkerScriptMsg)>, + task_queue: TaskQueue<DedicatedWorkerScriptMsg>, #[ignore_malloc_size_of = "Defined in std"] - own_sender: Sender<(TrustedWorkerAddress, WorkerScriptMsg)>, + own_sender: Sender<DedicatedWorkerScriptMsg>, #[ignore_malloc_size_of = "Defined in std"] timer_event_port: Receiver<(TrustedWorkerAddress, TimerEvent)>, #[ignore_malloc_size_of = "Trusted<T> has unclear ownership like Dom<T>"] @@ -99,8 +161,8 @@ impl DedicatedWorkerGlobalScope { from_devtools_receiver: Receiver<DevtoolScriptControlMsg>, runtime: Runtime, parent_sender: Box<ScriptChan + Send>, - own_sender: Sender<(TrustedWorkerAddress, WorkerScriptMsg)>, - receiver: Receiver<(TrustedWorkerAddress, WorkerScriptMsg)>, + own_sender: Sender<DedicatedWorkerScriptMsg>, + receiver: Receiver<DedicatedWorkerScriptMsg>, timer_event_chan: IpcSender<TimerEvent>, timer_event_port: Receiver<(TrustedWorkerAddress, TimerEvent)>, closing: Arc<AtomicBool>) @@ -112,7 +174,7 @@ impl DedicatedWorkerGlobalScope { from_devtools_receiver, timer_event_chan, Some(closing)), - receiver: receiver, + task_queue: TaskQueue::new(receiver, own_sender.clone()), own_sender: own_sender, timer_event_port: timer_event_port, parent_sender: parent_sender, @@ -126,8 +188,8 @@ impl DedicatedWorkerGlobalScope { from_devtools_receiver: Receiver<DevtoolScriptControlMsg>, runtime: Runtime, parent_sender: Box<ScriptChan + Send>, - own_sender: Sender<(TrustedWorkerAddress, WorkerScriptMsg)>, - receiver: Receiver<(TrustedWorkerAddress, WorkerScriptMsg)>, + own_sender: Sender<DedicatedWorkerScriptMsg>, + receiver: Receiver<DedicatedWorkerScriptMsg>, timer_event_chan: IpcSender<TimerEvent>, timer_event_port: Receiver<(TrustedWorkerAddress, TimerEvent)>, closing: Arc<AtomicBool>) @@ -151,13 +213,14 @@ impl DedicatedWorkerGlobalScope { } #[allow(unsafe_code)] + // https://html.spec.whatwg.org/multipage/#run-a-worker pub fn run_worker_scope(init: WorkerGlobalScopeInit, worker_url: ServoUrl, from_devtools_receiver: IpcReceiver<DevtoolScriptControlMsg>, worker: TrustedWorkerAddress, parent_sender: Box<ScriptChan + Send>, - own_sender: Sender<(TrustedWorkerAddress, WorkerScriptMsg)>, - receiver: Receiver<(TrustedWorkerAddress, WorkerScriptMsg)>, + own_sender: Sender<DedicatedWorkerScriptMsg>, + receiver: Receiver<DedicatedWorkerScriptMsg>, worker_load_origin: WorkerScriptLoadOrigin, closing: Arc<AtomicBool>) { let serialized_worker_url = worker_url.to_string(); @@ -242,17 +305,11 @@ impl DedicatedWorkerGlobalScope { let reporter_name = format!("dedicated-worker-reporter-{}", random::<u64>()); scope.upcast::<GlobalScope>().mem_profiler_chan().run_with_memory_reporting(|| { - // https://html.spec.whatwg.org/multipage/#event-loop-processing-model - // Step 1 - while let Ok(event) = global.receive_event() { - if scope.is_closing() { - break; - } - // Step 3 - global.handle_event(event); - // Step 6 - let _ar = AutoWorkerReset::new(&global, worker.clone()); - global.upcast::<GlobalScope>().perform_a_microtask_checkpoint(); + // Step 29, Run the responsible event loop specified by inside settings until it is destroyed. + // The worker processing model remains on this step until the event loop is destroyed, + // which happens after the closing flag is set to true. + while !scope.is_closing() { + global.run_event_loop(worker.clone()); } }, reporter_name, parent_sender, CommonScriptMsg::CollectReports); }).expect("Thread spawning failed"); @@ -275,14 +332,13 @@ impl DedicatedWorkerGlobalScope { } #[allow(unsafe_code)] - fn receive_event(&self) -> Result<MixedMessage, RecvError> { + fn run_event_loop(&self, worker: TrustedWorkerAddress) { let scope = self.upcast::<WorkerGlobalScope>(); - let worker_port = &self.receiver; let timer_event_port = &self.timer_event_port; let devtools_port = scope.from_devtools_receiver(); let sel = Select::new(); - let mut worker_handle = sel.handle(worker_port); + let mut worker_handle = sel.handle(self.task_queue.select()); let mut timer_event_handle = sel.handle(timer_event_port); let mut devtools_handle = sel.handle(devtools_port); unsafe { @@ -293,14 +349,44 @@ impl DedicatedWorkerGlobalScope { } } let ret = sel.wait(); - if ret == worker_handle.id() { - Ok(MixedMessage::FromWorker(worker_port.recv()?)) - } else if ret == timer_event_handle.id() { - Ok(MixedMessage::FromScheduler(timer_event_port.recv()?)) - } else if ret == devtools_handle.id() { - Ok(MixedMessage::FromDevtools(devtools_port.recv()?)) - } else { - panic!("unexpected select result!") + let event = { + if ret == worker_handle.id() { + MixedMessage::FromWorker(self.task_queue.take_tasks().recv().unwrap()) + } else if ret == timer_event_handle.id() { + MixedMessage::FromScheduler(timer_event_port.recv().unwrap()) + } else if ret == devtools_handle.id() { + MixedMessage::FromDevtools(devtools_port.recv().unwrap()) + } else { + panic!("unexpected select result!") + } + }; + let mut sequential = vec![]; + sequential.push(event); + // https://html.spec.whatwg.org/multipage/#worker-event-loop + // Once the WorkerGlobalScope's closing flag is set to true, + // the event loop's task queues must discard any further tasks + // that would be added to them + // (tasks already on the queue are unaffected except where otherwise specified). + while !scope.is_closing() { + // Batch all events that are ready. + // The task queue will throttle non-priority tasks if necessary. + match self.task_queue.take_tasks().try_recv() { + Err(_) => match timer_event_port.try_recv() { + Err(_) => match devtools_port.try_recv() { + Err(_) => break, + Ok(ev) => sequential.push(MixedMessage::FromDevtools(ev)), + }, + Ok(ev) => sequential.push(MixedMessage::FromScheduler(ev)), + }, + Ok(ev) => sequential.push(MixedMessage::FromWorker(ev)), + } + } + // Step 3 + for event in sequential { + self.handle_event(event); + // Step 6 + let _ar = AutoWorkerReset::new(&self, worker.clone()); + self.upcast::<GlobalScope>().perform_a_microtask_checkpoint(); } } @@ -346,10 +432,11 @@ impl DedicatedWorkerGlobalScope { } } } - MixedMessage::FromWorker((linked_worker, msg)) => { + MixedMessage::FromWorker(DedicatedWorkerScriptMsg::CommonWorker(linked_worker, msg)) => { let _ar = AutoWorkerReset::new(self, linked_worker); self.handle_script_event(msg); } + MixedMessage::FromWorker(DedicatedWorkerScriptMsg::WakeUp) => {}, } } diff --git a/components/script/dom/serviceworkerglobalscope.rs b/components/script/dom/serviceworkerglobalscope.rs index 7e737b8d3d0..1a95d3adba5 100644 --- a/components/script/dom/serviceworkerglobalscope.rs +++ b/components/script/dom/serviceworkerglobalscope.rs @@ -24,22 +24,69 @@ use js::jsapi::{JSAutoCompartment, JSContext, JS_AddInterruptCallback}; use js::jsval::UndefinedValue; use net_traits::{load_whole_resource, IpcSend, CustomResponseMediator}; use net_traits::request::{CredentialsMode, Destination, RequestInit}; -use script_runtime::{CommonScriptMsg, ScriptChan, new_rt_and_cx, Runtime}; +use script_runtime::{CommonScriptMsg, ScriptChan, ScriptThreadEventCategory, new_rt_and_cx, Runtime}; use script_traits::{TimerEvent, WorkerGlobalScopeInit, ScopeThings, ServiceWorkerMsg, WorkerScriptLoadOrigin}; use servo_config::prefs::PREFS; use servo_rand::random; use servo_url::ServoUrl; -use std::sync::mpsc::{Receiver, RecvError, Select, Sender, channel}; +use std::sync::mpsc::{Receiver, Select, Sender, channel}; use std::thread; use std::time::Duration; use style::thread_state::{self, ThreadState}; +use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue}; /// Messages used to control service worker event loop pub enum ServiceWorkerScriptMsg { /// Message common to all workers CommonWorker(WorkerScriptMsg), - // Message to request a custom response by the service worker - Response(CustomResponseMediator) + /// Message to request a custom response by the service worker + Response(CustomResponseMediator), + /// Wake-up call from the task queue. + WakeUp, +} + +impl QueuedTaskConversion for ServiceWorkerScriptMsg { + fn task_category(&self) -> Option<&ScriptThreadEventCategory> { + let script_msg = match self { + ServiceWorkerScriptMsg::CommonWorker(WorkerScriptMsg::Common(script_msg)) => script_msg, + _ => return None, + }; + let category = match script_msg { + CommonScriptMsg::Task(category, _boxed, _pipeline_id) => category, + _ => return None, + }; + Some(&category) + } + + fn into_queued_task(self) -> Option<QueuedTask> { + let script_msg = match self { + ServiceWorkerScriptMsg::CommonWorker(WorkerScriptMsg::Common(script_msg)) => script_msg, + _ => return None, + }; + let (category, boxed, pipeline_id) = match script_msg { + CommonScriptMsg::Task(category, boxed, pipeline_id) => + (category, boxed, pipeline_id), + _ => return None, + }; + Some((None, category, boxed, pipeline_id)) + } + + fn from_queued_task(queued_task: QueuedTask) -> Self { + let (_worker, category, boxed, pipeline_id) = queued_task; + let script_msg = CommonScriptMsg::Task(category, boxed, pipeline_id); + ServiceWorkerScriptMsg::CommonWorker(WorkerScriptMsg::Common(script_msg)) + } + + fn wake_up_msg() -> Self { + ServiceWorkerScriptMsg::WakeUp + } + + fn is_wake_up(&self) -> bool { + match self { + ServiceWorkerScriptMsg::WakeUp => true, + _ => false, + } + } } pub enum MixedMessage { @@ -67,11 +114,13 @@ impl ScriptChan for ServiceWorkerChan { } } +unsafe_no_jsmanaged_fields!(TaskQueue<ServiceWorkerScriptMsg>); + #[dom_struct] pub struct ServiceWorkerGlobalScope { workerglobalscope: WorkerGlobalScope, #[ignore_malloc_size_of = "Defined in std"] - receiver: Receiver<ServiceWorkerScriptMsg>, + task_queue: TaskQueue<ServiceWorkerScriptMsg>, #[ignore_malloc_size_of = "Defined in std"] own_sender: Sender<ServiceWorkerScriptMsg>, #[ignore_malloc_size_of = "Defined in std"] @@ -100,7 +149,7 @@ impl ServiceWorkerGlobalScope { from_devtools_receiver, timer_event_chan, None), - receiver: receiver, + task_queue: TaskQueue::new(receiver, own_sender.clone()), timer_event_port: timer_event_port, own_sender: own_sender, swmanager_sender: swmanager_sender, @@ -139,6 +188,7 @@ impl ServiceWorkerGlobalScope { } #[allow(unsafe_code)] + // https://html.spec.whatwg.org/multipage/#run-a-worker pub fn run_serviceworker_scope(scope_things: ScopeThings, own_sender: Sender<ServiceWorkerScriptMsg>, receiver: Receiver<ServiceWorkerScriptMsg>, @@ -211,15 +261,11 @@ impl ServiceWorkerGlobalScope { global.dispatch_activate(); let reporter_name = format!("service-worker-reporter-{}", random::<u64>()); scope.upcast::<GlobalScope>().mem_profiler_chan().run_with_memory_reporting(|| { - // https://html.spec.whatwg.org/multipage/#event-loop-processing-model - // Step 1 - while let Ok(event) = global.receive_event() { - // Step 3 - if !global.handle_event(event) { - break; - } - // Step 6 - global.upcast::<GlobalScope>().perform_a_microtask_checkpoint(); + // Step 29, Run the responsible event loop specified by inside settings until it is destroyed. + // The worker processing model remains on this step until the event loop is destroyed, + // which happens after the closing flag is set to true. + while !scope.is_closing() { + global.run_event_loop(); } }, reporter_name, scope.script_chan(), CommonScriptMsg::CollectReports); }).expect("Thread spawning failed"); @@ -271,19 +317,19 @@ impl ServiceWorkerGlobalScope { // https://slightlyoff.github.io/ServiceWorker/spec/service_worker_1/index.html#fetch-event-section self.upcast::<EventTarget>().fire_event(atom!("fetch")); let _ = mediator.response_chan.send(None); - } + }, + WakeUp => {}, } } #[allow(unsafe_code)] - fn receive_event(&self) -> Result<MixedMessage, RecvError> { + fn run_event_loop(&self) { let scope = self.upcast::<WorkerGlobalScope>(); - let worker_port = &self.receiver; let devtools_port = scope.from_devtools_receiver(); let timer_event_port = &self.timer_event_port; let sel = Select::new(); - let mut worker_handle = sel.handle(worker_port); + let mut worker_handle = sel.handle(self.task_queue.select()); let mut devtools_handle = sel.handle(devtools_port); let mut timer_port_handle = sel.handle(timer_event_port); unsafe { @@ -295,14 +341,44 @@ impl ServiceWorkerGlobalScope { } let ret = sel.wait(); - if ret == worker_handle.id() { - Ok(MixedMessage::FromServiceWorker(worker_port.recv()?)) - }else if ret == devtools_handle.id() { - Ok(MixedMessage::FromDevtools(devtools_port.recv()?)) - } else if ret == timer_port_handle.id() { - Ok(MixedMessage::FromTimeoutThread(timer_event_port.recv()?)) - } else { - panic!("unexpected select result!") + let event = { + if ret == worker_handle.id() { + MixedMessage::FromServiceWorker(self.task_queue.take_tasks().recv().unwrap()) + } else if ret == devtools_handle.id() { + MixedMessage::FromDevtools(devtools_port.recv().unwrap()) + } else if ret == timer_port_handle.id() { + MixedMessage::FromTimeoutThread(timer_event_port.recv().unwrap()) + } else { + panic!("unexpected select result!") + } + }; + + let mut sequential = vec![]; + sequential.push(event); + // https://html.spec.whatwg.org/multipage/#worker-event-loop + // Once the WorkerGlobalScope's closing flag is set to true, + // the event loop's task queues must discard any further tasks + // that would be added to them + // (tasks already on the queue are unaffected except where otherwise specified). + while !scope.is_closing() { + // Batch all events that are ready. + // The task queue will throttle non-priority tasks if necessary. + match self.task_queue.take_tasks().try_recv() { + Err(_) => match timer_event_port.try_recv() { + Err(_) => match devtools_port.try_recv() { + Err(_) => break, + Ok(ev) => sequential.push(MixedMessage::FromDevtools(ev)), + }, + Ok(ev) => sequential.push(MixedMessage::FromTimeoutThread(ev)), + }, + Ok(ev) => sequential.push(MixedMessage::FromServiceWorker(ev)), + } + } + // Step 3 + for event in sequential { + self.handle_event(event); + // Step 6 + self.upcast::<GlobalScope>().perform_a_microtask_checkpoint(); } } diff --git a/components/script/dom/worker.rs b/components/script/dom/worker.rs index b3ba42afb2b..653a2fa141c 100644 --- a/components/script/dom/worker.rs +++ b/components/script/dom/worker.rs @@ -14,7 +14,7 @@ use dom::bindings::reflector::{DomObject, reflect_dom_object}; use dom::bindings::root::DomRoot; use dom::bindings::str::DOMString; use dom::bindings::structuredclone::StructuredCloneData; -use dom::dedicatedworkerglobalscope::DedicatedWorkerGlobalScope; +use dom::dedicatedworkerglobalscope::{DedicatedWorkerGlobalScope, DedicatedWorkerScriptMsg}; use dom::eventtarget::EventTarget; use dom::globalscope::GlobalScope; use dom::messageevent::MessageEvent; @@ -40,14 +40,14 @@ pub struct Worker { #[ignore_malloc_size_of = "Defined in std"] /// Sender to the Receiver associated with the DedicatedWorkerGlobalScope /// this Worker created. - sender: Sender<(TrustedWorkerAddress, WorkerScriptMsg)>, + sender: Sender<DedicatedWorkerScriptMsg>, #[ignore_malloc_size_of = "Arc"] closing: Arc<AtomicBool>, terminated: Cell<bool>, } impl Worker { - fn new_inherited(sender: Sender<(TrustedWorkerAddress, WorkerScriptMsg)>, + fn new_inherited(sender: Sender<DedicatedWorkerScriptMsg>, closing: Arc<AtomicBool>) -> Worker { Worker { eventtarget: EventTarget::new_inherited(), @@ -58,7 +58,7 @@ impl Worker { } pub fn new(global: &GlobalScope, - sender: Sender<(TrustedWorkerAddress, WorkerScriptMsg)>, + sender: Sender<DedicatedWorkerScriptMsg>, closing: Arc<AtomicBool>) -> DomRoot<Worker> { reflect_dom_object(Box::new(Worker::new_inherited(sender, closing)), global, @@ -148,7 +148,7 @@ impl WorkerMethods for Worker { // NOTE: step 9 of https://html.spec.whatwg.org/multipage/#dom-messageport-postmessage // indicates that a nonexistent communication channel should result in a silent error. - let _ = self.sender.send((address, WorkerScriptMsg::DOMMessage(data))); + let _ = self.sender.send(DedicatedWorkerScriptMsg::CommonWorker(address, WorkerScriptMsg::DOMMessage(data))); Ok(()) } diff --git a/components/script/dom/workerglobalscope.rs b/components/script/dom/workerglobalscope.rs index 5415492b0da..1718286e55a 100644 --- a/components/script/dom/workerglobalscope.rs +++ b/components/script/dom/workerglobalscope.rs @@ -407,8 +407,6 @@ impl WorkerGlobalScope { reports_chan.send(reports); }, } - - // FIXME(jdm): Should we do a microtask checkpoint here? } pub fn handle_fire_timer(&self, timer_id: TimerEventId) { diff --git a/components/script/lib.rs b/components/script/lib.rs index d7a57a28898..1ce71a02684 100644 --- a/components/script/lib.rs +++ b/components/script/lib.rs @@ -5,6 +5,7 @@ #![cfg_attr(feature = "unstable", feature(core_intrinsics))] #![cfg_attr(feature = "unstable", feature(on_unimplemented))] #![feature(const_fn)] +#![feature(drain_filter)] #![feature(mpsc_select)] #![feature(plugin)] #![feature(string_retain)] @@ -125,6 +126,7 @@ pub mod script_thread; mod serviceworker_manager; mod serviceworkerjob; mod stylesheet_loader; +mod task_queue; mod task_source; pub mod test; pub mod textinput; diff --git a/components/script/script_thread.rs b/components/script/script_thread.rs index 011a3f0106f..e6de4a60fbc 100644 --- a/components/script/script_thread.rs +++ b/components/script/script_thread.rs @@ -116,6 +116,7 @@ use std::sync::Arc; use std::sync::mpsc::{Receiver, Select, Sender, channel}; use std::thread; use style::thread_state::{self, ThreadState}; +use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue}; use task_source::dom_manipulation::DOMManipulationTaskSource; use task_source::file_reading::FileReadingTaskSource; use task_source::history_traversal::HistoryTraversalTaskSource; @@ -242,6 +243,52 @@ pub enum MainThreadScriptMsg { }, /// Dispatches a job queue. DispatchJobQueue { scope_url: ServoUrl }, + /// Wake-up call from the task queue. + WakeUp, +} + +impl QueuedTaskConversion for MainThreadScriptMsg { + fn task_category(&self) -> Option<&ScriptThreadEventCategory> { + let script_msg = match self { + MainThreadScriptMsg::Common(script_msg) => script_msg, + _ => return None, + }; + let category = match script_msg { + CommonScriptMsg::Task(category, _boxed, _pipeline_id) => category, + _ => return None, + }; + Some(&category) + } + + fn into_queued_task(self) -> Option<QueuedTask> { + let script_msg = match self { + MainThreadScriptMsg::Common(script_msg) => script_msg, + _ => return None, + }; + let (category, boxed, pipeline_id) = match script_msg { + CommonScriptMsg::Task(category, boxed, pipeline_id) => + (category, boxed, pipeline_id), + _ => return None, + }; + Some((None, category, boxed, pipeline_id)) + } + + fn from_queued_task(queued_task: QueuedTask) -> Self { + let (_worker, category, boxed, pipeline_id) = queued_task; + let script_msg = CommonScriptMsg::Task(category, boxed, pipeline_id); + MainThreadScriptMsg::Common(script_msg) + } + + fn wake_up_msg() -> Self { + MainThreadScriptMsg::WakeUp + } + + fn is_wake_up(&self) -> bool { + match self { + MainThreadScriptMsg::WakeUp => true, + _ => false, + } + } } impl OpaqueSender<CommonScriptMsg> for Box<ScriptChan + Send> { @@ -398,6 +445,8 @@ impl<'a> Iterator for DocumentsIter<'a> { type IncompleteParserContexts = Vec<(PipelineId, ParserContext)>; unsafe_no_jsmanaged_fields!(RefCell<IncompleteParserContexts>); +unsafe_no_jsmanaged_fields!(TaskQueue<MainThreadScriptMsg>); + #[derive(JSTraceable)] // ScriptThread instances are rooted on creation, so this is okay #[allow(unrooted_must_root)] @@ -423,8 +472,9 @@ pub struct ScriptThread { /// A handle to the bluetooth thread. bluetooth_thread: IpcSender<BluetoothRequest>, - /// The port on which the script thread receives messages (load URL, exit, etc.) - port: Receiver<MainThreadScriptMsg>, + /// A queue of tasks to be executed in this script-thread. + task_queue: TaskQueue<MainThreadScriptMsg>, + /// A channel to hand out to script thread-based entities that need to be able to enqueue /// events in the event queue. chan: MainThreadScriptChan, @@ -856,6 +906,8 @@ impl ScriptThread { let (image_cache_channel, image_cache_port) = channel(); + let task_queue = TaskQueue::new(port, chan.clone()); + ScriptThread { documents: DomRefCell::new(Documents::new()), window_proxies: DomRefCell::new(HashMap::new()), @@ -871,7 +923,7 @@ impl ScriptThread { resource_threads: state.resource_threads, bluetooth_thread: state.bluetooth_thread, - port: port, + task_queue, chan: MainThreadScriptChan(chan.clone()), dom_manipulation_task_sender: chan.clone(), @@ -968,7 +1020,7 @@ impl ScriptThread { debug!("Waiting for event."); let mut event = { let sel = Select::new(); - let mut script_port = sel.handle(&self.port); + let mut script_port = sel.handle(self.task_queue.select()); let mut control_port = sel.handle(&self.control_port); let mut timer_event_port = sel.handle(&self.timer_event_port); let mut devtools_port = sel.handle(&self.devtools_port); @@ -984,7 +1036,8 @@ impl ScriptThread { } let ret = sel.wait(); if ret == script_port.id() { - FromScript(self.port.recv().unwrap()) + self.task_queue.take_tasks(); + FromScript(self.task_queue.recv().unwrap()) } else if ret == control_port.id() { FromConstellation(self.control_port.recv().unwrap()) } else if ret == timer_event_port.id() { @@ -1077,7 +1130,7 @@ impl ScriptThread { // and check for more resize events. If there are no events pending, we'll move // on and execute the sequential non-resize events we've seen. match self.control_port.try_recv() { - Err(_) => match self.port.try_recv() { + Err(_) => match self.task_queue.try_recv() { Err(_) => match self.timer_event_port.try_recv() { Err(_) => match self.devtools_port.try_recv() { Err(_) => match self.image_cache_port.try_recv() { @@ -1233,6 +1286,7 @@ impl ScriptThread { MainThreadScriptMsg::WorkletLoaded(pipeline_id) => Some(pipeline_id), MainThreadScriptMsg::RegisterPaintWorklet { pipeline_id, .. } => Some(pipeline_id), MainThreadScriptMsg::DispatchJobQueue { .. } => None, + MainThreadScriptMsg::WakeUp => None, } }, MixedMessage::FromImageCache((pipeline_id, _)) => Some(pipeline_id), @@ -1404,6 +1458,7 @@ impl ScriptThread { MainThreadScriptMsg::DispatchJobQueue { scope_url } => { self.job_queue_map.run_job(scope_url, self) } + MainThreadScriptMsg::WakeUp => {}, } } diff --git a/components/script/task_queue.rs b/components/script/task_queue.rs new file mode 100644 index 00000000000..df72bb9a069 --- /dev/null +++ b/components/script/task_queue.rs @@ -0,0 +1,165 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! Machinery for [task-queue](https://html.spec.whatwg.org/multipage/#task-queue). + +use dom::bindings::cell::DomRefCell; +use dom::worker::TrustedWorkerAddress; +use msg::constellation_msg::PipelineId; +use script_runtime::ScriptThreadEventCategory; +use std::cell::Cell; +use std::collections::{HashMap, VecDeque}; +use std::default::Default; +use std::sync::mpsc::{Receiver, Sender}; +use task::TaskBox; +use task_source::TaskSourceName; + + +pub type QueuedTask = (Option<TrustedWorkerAddress>, ScriptThreadEventCategory, Box<TaskBox>, Option<PipelineId>); + +/// Defining the operations used to convert from a msg T to a QueuedTask. +pub trait QueuedTaskConversion { + fn task_category(&self) -> Option<&ScriptThreadEventCategory>; + fn into_queued_task(self) -> Option<QueuedTask>; + fn from_queued_task(queued_task: QueuedTask) -> Self; + fn wake_up_msg() -> Self; + fn is_wake_up(&self) -> bool; +} + +pub struct TaskQueue<T> { + /// The original port on which the task-sources send tasks as messages. + port: Receiver<T>, + /// A sender to ensure the port doesn't block on select while there are throttled tasks. + wake_up_sender: Sender<T>, + /// A queue from which the event-loop can drain tasks. + msg_queue: DomRefCell<VecDeque<T>>, + /// A "business" counter, reset for each iteration of the event-loop + taken_task_counter: Cell<u64>, + /// Tasks that will be throttled for as long as we are "busy". + throttled: DomRefCell<HashMap<TaskSourceName, VecDeque<QueuedTask>>> +} + +impl<T: QueuedTaskConversion> TaskQueue<T> { + pub fn new(port: Receiver<T>, wake_up_sender: Sender<T>) -> TaskQueue<T> { + TaskQueue { + port, + wake_up_sender, + msg_queue: DomRefCell::new(VecDeque::new()), + taken_task_counter: Default::default(), + throttled: Default::default(), + } + } + + /// Process incoming tasks, immediately sending priority ones downstream, + /// and categorizing potential throttles. + fn process_incoming_tasks(&self) { + let mut non_throttled: Vec<T> = self.port + .try_iter() + .filter(|msg| !msg.is_wake_up()) + .collect(); + + let to_be_throttled: Vec<T> = non_throttled.drain_filter(|msg|{ + let category = match msg.task_category() { + Some(category) => category, + None => return false, + }; + match category { + ScriptThreadEventCategory::PerformanceTimelineTask => return true, + _ => { + // A task that will not be throttled, start counting "business" + self.taken_task_counter.set(self.taken_task_counter.get() + 1); + return false + }, + } + }).collect(); + + for msg in non_throttled { + // Immediately send non-throttled tasks for processing. + let _ = self.msg_queue.borrow_mut().push_back(msg); + } + + for msg in to_be_throttled { + // Categorize tasks per task queue. + let (worker, category, boxed, pipeline_id) = match msg.into_queued_task() { + Some((worker, category, boxed, pipeline_id)) => (worker, category, boxed, pipeline_id), + None => unreachable!("A message to be throttled should always be convertible into a queued task"), + }; + // FIXME: Add the task-source name directly to CommonScriptMsg::Task. + let task_source = match category { + ScriptThreadEventCategory::PerformanceTimelineTask => TaskSourceName::PerformanceTimeline, + _ => unreachable!(), + }; + let mut throttled_tasks = self.throttled.borrow_mut(); + throttled_tasks + .entry(task_source) + .or_insert(VecDeque::new()) + .push_back((worker, category, boxed, pipeline_id)); + } + } + + /// Reset the queue for a new iteration of the event-loop, + /// returning the port about whose readiness we want to be notified. + pub fn select(&self) -> &Receiver<T> { + // This is a new iteration of the event-loop, so we reset the "business" counter. + self.taken_task_counter.set(0); + // We want to be notified when the script-port is ready to receive. + // Hence that's the one we need to include in the select. + &self.port + } + + /// Take a message from the front of the queue, without waiting if empty. + pub fn recv(&self) -> Result<T, ()> { + self.msg_queue.borrow_mut().pop_front().ok_or(()) + } + + /// Same as recv. + pub fn try_recv(&self) -> Result<T, ()> { + self.recv() + } + + /// Drain the queue for the current iteration of the event-loop. + /// Holding-back throttles above a given high-water mark. + pub fn take_tasks(&self) { + // High-watermark: once reached, throttled tasks will be held-back. + const PER_ITERATION_MAX: u64 = 5; + // Always first check for new tasks, but don't reset 'taken_task_counter'. + self.process_incoming_tasks(); + let mut throttled = self.throttled.borrow_mut(); + let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum(); + let task_source_names = TaskSourceName::all(); + let mut task_source_cycler = task_source_names.iter().cycle(); + // "being busy", is defined as having more than x tasks for this loop's iteration. + // As long as we're not busy, and there are throttled tasks left: + loop { + let max_reached = self.taken_task_counter.get() > PER_ITERATION_MAX; + let none_left = throttled_length == 0; + match (max_reached, none_left) { + (_, true) => break, + (true, false) => { + // We have reached the high-watermark for this iteration of the event-loop, + // yet also have throttled messages left in the queue. + // Ensure the select wakes up in the next iteration of the event-loop + let _ = self.wake_up_sender.send(T::wake_up_msg()); + break; + }, + (false, false) => { + // Cycle through non-priority task sources, taking one throttled task from each. + let task_source = task_source_cycler.next().unwrap(); + let throttled_queue = match throttled.get_mut(&task_source) { + Some(queue) => queue, + None => continue, + }; + let queued_task = match throttled_queue.pop_front() { + Some(queued_task) => queued_task, + None => continue, + }; + let msg = T::from_queued_task(queued_task); + let _ = self.msg_queue.borrow_mut().push_back(msg); + self.taken_task_counter.set(self.taken_task_counter.get() + 1); + throttled_length = throttled_length - 1; + }, + } + } + } +} |