diff options
Diffstat (limited to 'components/script/task_queue.rs')
-rw-r--r-- | components/script/task_queue.rs | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/components/script/task_queue.rs b/components/script/task_queue.rs new file mode 100644 index 00000000000..df72bb9a069 --- /dev/null +++ b/components/script/task_queue.rs @@ -0,0 +1,165 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! Machinery for [task-queue](https://html.spec.whatwg.org/multipage/#task-queue). + +use dom::bindings::cell::DomRefCell; +use dom::worker::TrustedWorkerAddress; +use msg::constellation_msg::PipelineId; +use script_runtime::ScriptThreadEventCategory; +use std::cell::Cell; +use std::collections::{HashMap, VecDeque}; +use std::default::Default; +use std::sync::mpsc::{Receiver, Sender}; +use task::TaskBox; +use task_source::TaskSourceName; + + +pub type QueuedTask = (Option<TrustedWorkerAddress>, ScriptThreadEventCategory, Box<TaskBox>, Option<PipelineId>); + +/// Defining the operations used to convert from a msg T to a QueuedTask. +pub trait QueuedTaskConversion { + fn task_category(&self) -> Option<&ScriptThreadEventCategory>; + fn into_queued_task(self) -> Option<QueuedTask>; + fn from_queued_task(queued_task: QueuedTask) -> Self; + fn wake_up_msg() -> Self; + fn is_wake_up(&self) -> bool; +} + +pub struct TaskQueue<T> { + /// The original port on which the task-sources send tasks as messages. + port: Receiver<T>, + /// A sender to ensure the port doesn't block on select while there are throttled tasks. + wake_up_sender: Sender<T>, + /// A queue from which the event-loop can drain tasks. + msg_queue: DomRefCell<VecDeque<T>>, + /// A "business" counter, reset for each iteration of the event-loop + taken_task_counter: Cell<u64>, + /// Tasks that will be throttled for as long as we are "busy". + throttled: DomRefCell<HashMap<TaskSourceName, VecDeque<QueuedTask>>> +} + +impl<T: QueuedTaskConversion> TaskQueue<T> { + pub fn new(port: Receiver<T>, wake_up_sender: Sender<T>) -> TaskQueue<T> { + TaskQueue { + port, + wake_up_sender, + msg_queue: DomRefCell::new(VecDeque::new()), + taken_task_counter: Default::default(), + throttled: Default::default(), + } + } + + /// Process incoming tasks, immediately sending priority ones downstream, + /// and categorizing potential throttles. + fn process_incoming_tasks(&self) { + let mut non_throttled: Vec<T> = self.port + .try_iter() + .filter(|msg| !msg.is_wake_up()) + .collect(); + + let to_be_throttled: Vec<T> = non_throttled.drain_filter(|msg|{ + let category = match msg.task_category() { + Some(category) => category, + None => return false, + }; + match category { + ScriptThreadEventCategory::PerformanceTimelineTask => return true, + _ => { + // A task that will not be throttled, start counting "business" + self.taken_task_counter.set(self.taken_task_counter.get() + 1); + return false + }, + } + }).collect(); + + for msg in non_throttled { + // Immediately send non-throttled tasks for processing. + let _ = self.msg_queue.borrow_mut().push_back(msg); + } + + for msg in to_be_throttled { + // Categorize tasks per task queue. + let (worker, category, boxed, pipeline_id) = match msg.into_queued_task() { + Some((worker, category, boxed, pipeline_id)) => (worker, category, boxed, pipeline_id), + None => unreachable!("A message to be throttled should always be convertible into a queued task"), + }; + // FIXME: Add the task-source name directly to CommonScriptMsg::Task. + let task_source = match category { + ScriptThreadEventCategory::PerformanceTimelineTask => TaskSourceName::PerformanceTimeline, + _ => unreachable!(), + }; + let mut throttled_tasks = self.throttled.borrow_mut(); + throttled_tasks + .entry(task_source) + .or_insert(VecDeque::new()) + .push_back((worker, category, boxed, pipeline_id)); + } + } + + /// Reset the queue for a new iteration of the event-loop, + /// returning the port about whose readiness we want to be notified. + pub fn select(&self) -> &Receiver<T> { + // This is a new iteration of the event-loop, so we reset the "business" counter. + self.taken_task_counter.set(0); + // We want to be notified when the script-port is ready to receive. + // Hence that's the one we need to include in the select. + &self.port + } + + /// Take a message from the front of the queue, without waiting if empty. + pub fn recv(&self) -> Result<T, ()> { + self.msg_queue.borrow_mut().pop_front().ok_or(()) + } + + /// Same as recv. + pub fn try_recv(&self) -> Result<T, ()> { + self.recv() + } + + /// Drain the queue for the current iteration of the event-loop. + /// Holding-back throttles above a given high-water mark. + pub fn take_tasks(&self) { + // High-watermark: once reached, throttled tasks will be held-back. + const PER_ITERATION_MAX: u64 = 5; + // Always first check for new tasks, but don't reset 'taken_task_counter'. + self.process_incoming_tasks(); + let mut throttled = self.throttled.borrow_mut(); + let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum(); + let task_source_names = TaskSourceName::all(); + let mut task_source_cycler = task_source_names.iter().cycle(); + // "being busy", is defined as having more than x tasks for this loop's iteration. + // As long as we're not busy, and there are throttled tasks left: + loop { + let max_reached = self.taken_task_counter.get() > PER_ITERATION_MAX; + let none_left = throttled_length == 0; + match (max_reached, none_left) { + (_, true) => break, + (true, false) => { + // We have reached the high-watermark for this iteration of the event-loop, + // yet also have throttled messages left in the queue. + // Ensure the select wakes up in the next iteration of the event-loop + let _ = self.wake_up_sender.send(T::wake_up_msg()); + break; + }, + (false, false) => { + // Cycle through non-priority task sources, taking one throttled task from each. + let task_source = task_source_cycler.next().unwrap(); + let throttled_queue = match throttled.get_mut(&task_source) { + Some(queue) => queue, + None => continue, + }; + let queued_task = match throttled_queue.pop_front() { + Some(queued_task) => queued_task, + None => continue, + }; + let msg = T::from_queued_task(queued_task); + let _ = self.msg_queue.borrow_mut().push_back(msg); + self.taken_task_counter.set(self.taken_task_counter.get() + 1); + throttled_length = throttled_length - 1; + }, + } + } + } +} |