diff options
author | bors-servo <lbergstrom+bors@mozilla.com> | 2018-09-12 13:33:45 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-12 13:33:45 -0400 |
commit | 910cc23a6e85cced43905f7615065b23bdb54b42 (patch) | |
tree | 3ceae959c0ebdf3f224c5c07f5b8c2cbd31dddf1 /components/script/task_queue.rs | |
parent | 9a83ab6297ddb62937fc54521b7fd4d19017e6b1 (diff) | |
parent | 2a996fbc8fef722b264389680cc55c25c46807d1 (diff) | |
download | servo-910cc23a6e85cced43905f7615065b23bdb54b42.tar.gz servo-910cc23a6e85cced43905f7615065b23bdb54b42.zip |
Auto merge of #21325 - gterzian:crossbeam_integration, r=SimonSapin,jdm
Replace mpsc with crossbeam-channel
Follow up on https://github.com/servo/servo/pull/19515
---
Selecting over multiple channels in `std::sync::mpsc` is not stable and likely never will be:
https://github.com/rust-lang/rust/issues/27800#issuecomment-260136777
> It seems the only thing keeping `mpsc_select` around is Servo.
crossbeam-channel is designed specifically to replace `std::sync::mpsc` and fix many of its shortcomings:
https://github.com/stjepang/rfcs-crossbeam/blob/channel/text/2017-11-09-channel.md
This is to be landed together with https://github.com/servo/ipc-channel/pull/183.
<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/servo/21325)
<!-- Reviewable:end -->
Diffstat (limited to 'components/script/task_queue.rs')
-rw-r--r-- | components/script/task_queue.rs | 35 |
1 files changed, 20 insertions, 15 deletions
diff --git a/components/script/task_queue.rs b/components/script/task_queue.rs index 7093474e1ed..e7e2117b2f5 100644 --- a/components/script/task_queue.rs +++ b/components/script/task_queue.rs @@ -8,10 +8,10 @@ use dom::bindings::cell::DomRefCell; use dom::worker::TrustedWorkerAddress; use msg::constellation_msg::PipelineId; use script_runtime::ScriptThreadEventCategory; +use servo_channel::{Receiver, Sender, base_channel}; use std::cell::Cell; use std::collections::{HashMap, VecDeque}; use std::default::Default; -use std::sync::mpsc::{Receiver, Sender}; use task::TaskBox; use task_source::TaskSourceName; @@ -59,13 +59,18 @@ impl<T: QueuedTaskConversion> TaskQueue<T> { /// Process incoming tasks, immediately sending priority ones downstream, /// and categorizing potential throttles. - fn process_incoming_tasks(&self) { - let mut non_throttled: Vec<T> = self.port - .try_iter() - .filter(|msg| !msg.is_wake_up()) - .collect(); + fn process_incoming_tasks(&self, first_msg: T) { + let mut incoming = Vec::with_capacity(self.port.len() + 1); + if !first_msg.is_wake_up() { + incoming.push(first_msg); + } + while let Some(msg) = self.port.try_recv() { + if !msg.is_wake_up() { + incoming.push(msg); + } + } - let to_be_throttled: Vec<T> = non_throttled.drain_filter(|msg|{ + let to_be_throttled: Vec<T> = incoming.drain_filter(|msg|{ let task_source = match msg.task_source_name() { Some(task_source) => task_source, None => return false, @@ -80,7 +85,7 @@ impl<T: QueuedTaskConversion> TaskQueue<T> { } }).collect(); - for msg in non_throttled { + for msg in incoming { // Immediately send non-throttled tasks for processing. let _ = self.msg_queue.borrow_mut().push_back(msg); } @@ -101,31 +106,31 @@ impl<T: QueuedTaskConversion> TaskQueue<T> { /// Reset the queue for a new iteration of the event-loop, /// returning the port about whose readiness we want to be notified. - pub fn select(&self) -> &Receiver<T> { + pub fn select(&self) -> &base_channel::Receiver<T> { // This is a new iteration of the event-loop, so we reset the "business" counter. self.taken_task_counter.set(0); // We want to be notified when the script-port is ready to receive. // Hence that's the one we need to include in the select. - &self.port + self.port.select() } /// Take a message from the front of the queue, without waiting if empty. - pub fn recv(&self) -> Result<T, ()> { - self.msg_queue.borrow_mut().pop_front().ok_or(()) + pub fn recv(&self) -> Option<T> { + self.msg_queue.borrow_mut().pop_front() } /// Same as recv. - pub fn try_recv(&self) -> Result<T, ()> { + pub fn try_recv(&self) -> Option<T> { self.recv() } /// Drain the queue for the current iteration of the event-loop. /// Holding-back throttles above a given high-water mark. - pub fn take_tasks(&self) { + pub fn take_tasks(&self, first_msg: T) { // High-watermark: once reached, throttled tasks will be held-back. const PER_ITERATION_MAX: u64 = 5; // Always first check for new tasks, but don't reset 'taken_task_counter'. - self.process_incoming_tasks(); + self.process_incoming_tasks(first_msg); let mut throttled = self.throttled.borrow_mut(); let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum(); let task_source_names = TaskSourceName::all(); |