aboutsummaryrefslogtreecommitdiffstats
path: root/components/script/task_queue.rs
diff options
context:
space:
mode:
authorbors-servo <lbergstrom+bors@mozilla.com>2018-09-12 13:33:45 -0400
committerGitHub <noreply@github.com>2018-09-12 13:33:45 -0400
commit910cc23a6e85cced43905f7615065b23bdb54b42 (patch)
tree3ceae959c0ebdf3f224c5c07f5b8c2cbd31dddf1 /components/script/task_queue.rs
parent9a83ab6297ddb62937fc54521b7fd4d19017e6b1 (diff)
parent2a996fbc8fef722b264389680cc55c25c46807d1 (diff)
downloadservo-910cc23a6e85cced43905f7615065b23bdb54b42.tar.gz
servo-910cc23a6e85cced43905f7615065b23bdb54b42.zip
Auto merge of #21325 - gterzian:crossbeam_integration, r=SimonSapin,jdm
Replace mpsc with crossbeam-channel Follow up on https://github.com/servo/servo/pull/19515 --- Selecting over multiple channels in `std::sync::mpsc` is not stable and likely never will be: https://github.com/rust-lang/rust/issues/27800#issuecomment-260136777 > It seems the only thing keeping `mpsc_select` around is Servo. crossbeam-channel is designed specifically to replace `std::sync::mpsc` and fix many of its shortcomings: https://github.com/stjepang/rfcs-crossbeam/blob/channel/text/2017-11-09-channel.md This is to be landed together with https://github.com/servo/ipc-channel/pull/183. <!-- Reviewable:start --> --- This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/servo/21325) <!-- Reviewable:end -->
Diffstat (limited to 'components/script/task_queue.rs')
-rw-r--r--components/script/task_queue.rs35
1 files changed, 20 insertions, 15 deletions
diff --git a/components/script/task_queue.rs b/components/script/task_queue.rs
index 7093474e1ed..e7e2117b2f5 100644
--- a/components/script/task_queue.rs
+++ b/components/script/task_queue.rs
@@ -8,10 +8,10 @@ use dom::bindings::cell::DomRefCell;
use dom::worker::TrustedWorkerAddress;
use msg::constellation_msg::PipelineId;
use script_runtime::ScriptThreadEventCategory;
+use servo_channel::{Receiver, Sender, base_channel};
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
use std::default::Default;
-use std::sync::mpsc::{Receiver, Sender};
use task::TaskBox;
use task_source::TaskSourceName;
@@ -59,13 +59,18 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
/// Process incoming tasks, immediately sending priority ones downstream,
/// and categorizing potential throttles.
- fn process_incoming_tasks(&self) {
- let mut non_throttled: Vec<T> = self.port
- .try_iter()
- .filter(|msg| !msg.is_wake_up())
- .collect();
+ fn process_incoming_tasks(&self, first_msg: T) {
+ let mut incoming = Vec::with_capacity(self.port.len() + 1);
+ if !first_msg.is_wake_up() {
+ incoming.push(first_msg);
+ }
+ while let Some(msg) = self.port.try_recv() {
+ if !msg.is_wake_up() {
+ incoming.push(msg);
+ }
+ }
- let to_be_throttled: Vec<T> = non_throttled.drain_filter(|msg|{
+ let to_be_throttled: Vec<T> = incoming.drain_filter(|msg|{
let task_source = match msg.task_source_name() {
Some(task_source) => task_source,
None => return false,
@@ -80,7 +85,7 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
}
}).collect();
- for msg in non_throttled {
+ for msg in incoming {
// Immediately send non-throttled tasks for processing.
let _ = self.msg_queue.borrow_mut().push_back(msg);
}
@@ -101,31 +106,31 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
/// Reset the queue for a new iteration of the event-loop,
/// returning the port about whose readiness we want to be notified.
- pub fn select(&self) -> &Receiver<T> {
+ pub fn select(&self) -> &base_channel::Receiver<T> {
// This is a new iteration of the event-loop, so we reset the "business" counter.
self.taken_task_counter.set(0);
// We want to be notified when the script-port is ready to receive.
// Hence that's the one we need to include in the select.
- &self.port
+ self.port.select()
}
/// Take a message from the front of the queue, without waiting if empty.
- pub fn recv(&self) -> Result<T, ()> {
- self.msg_queue.borrow_mut().pop_front().ok_or(())
+ pub fn recv(&self) -> Option<T> {
+ self.msg_queue.borrow_mut().pop_front()
}
/// Same as recv.
- pub fn try_recv(&self) -> Result<T, ()> {
+ pub fn try_recv(&self) -> Option<T> {
self.recv()
}
/// Drain the queue for the current iteration of the event-loop.
/// Holding-back throttles above a given high-water mark.
- pub fn take_tasks(&self) {
+ pub fn take_tasks(&self, first_msg: T) {
// High-watermark: once reached, throttled tasks will be held-back.
const PER_ITERATION_MAX: u64 = 5;
// Always first check for new tasks, but don't reset 'taken_task_counter'.
- self.process_incoming_tasks();
+ self.process_incoming_tasks(first_msg);
let mut throttled = self.throttled.borrow_mut();
let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
let task_source_names = TaskSourceName::all();