aboutsummaryrefslogtreecommitdiffstats
path: root/components/util/workqueue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'components/util/workqueue.rs')
-rw-r--r--components/util/workqueue.rs33
1 files changed, 20 insertions, 13 deletions
diff --git a/components/util/workqueue.rs b/components/util/workqueue.rs
index ee14a4d1a50..c83902526fd 100644
--- a/components/util/workqueue.rs
+++ b/components/util/workqueue.rs
@@ -15,6 +15,7 @@ use rand::{Rng, XorShiftRng};
use std::mem;
use std::rand::weak_rng;
use std::sync::atomic::{AtomicUint, Ordering};
+use std::sync::mpsc::{channel, Sender, Receiver};
use deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
/// A unit of work.
@@ -31,7 +32,7 @@ pub struct WorkUnit<QueueData, WorkData> {
}
/// Messages from the supervisor to the worker.
-enum WorkerMsg<QueueData, WorkData> {
+enum WorkerMsg<QueueData: 'static, WorkData: 'static> {
/// Tells the worker to start work.
Start(Worker<WorkUnit<QueueData, WorkData>>, *mut AtomicUint, *const QueueData),
/// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`.
@@ -40,14 +41,18 @@ enum WorkerMsg<QueueData, WorkData> {
Exit,
}
+unsafe impl<QueueData: 'static, WorkData: 'static> Send for WorkerMsg<QueueData, WorkData> {}
+
/// Messages to the supervisor.
-enum SupervisorMsg<QueueData, WorkData> {
+enum SupervisorMsg<QueueData: 'static, WorkData: 'static> {
Finished,
ReturnDeque(uint, Worker<WorkUnit<QueueData, WorkData>>),
}
+unsafe impl<QueueData: 'static, WorkData: 'static> Send for SupervisorMsg<QueueData, WorkData> {}
+
/// Information that the supervisor thread keeps about the worker threads.
-struct WorkerInfo<QueueData, WorkData> {
+struct WorkerInfo<QueueData: 'static, WorkData: 'static> {
/// The communication channel to the workers.
chan: Sender<WorkerMsg<QueueData, WorkData>>,
/// The worker end of the deque, if we have it.
@@ -57,7 +62,7 @@ struct WorkerInfo<QueueData, WorkData> {
}
/// Information specific to each worker thread that the thread keeps.
-struct WorkerThread<QueueData, WorkData> {
+struct WorkerThread<QueueData: 'static, WorkData: 'static> {
/// The index of this worker.
index: uint,
/// The communication port from the supervisor.
@@ -70,6 +75,8 @@ struct WorkerThread<QueueData, WorkData> {
rng: XorShiftRng,
}
+unsafe impl<QueueData: 'static, WorkData: 'static> Send for WorkerThread<QueueData, WorkData> {}
+
static SPIN_COUNT: u32 = 128;
static SPINS_UNTIL_BACKOFF: u32 = 100;
static BACKOFF_INCREMENT_IN_US: u32 = 5;
@@ -80,7 +87,7 @@ impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
fn start(&mut self) {
loop {
// Wait for a start message.
- let (mut deque, ref_count, queue_data) = match self.port.recv() {
+ let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() {
WorkerMsg::Start(deque, ref_count, queue_data) => (deque, ref_count, queue_data),
WorkerMsg::Stop => panic!("unexpected stop message"),
WorkerMsg::Exit => return,
@@ -158,13 +165,13 @@ impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
// the last work unit in the queue, then send a message on the channel.
unsafe {
if (*ref_count).fetch_sub(1, Ordering::SeqCst) == 1 {
- self.chan.send(SupervisorMsg::Finished)
+ self.chan.send(SupervisorMsg::Finished).unwrap()
}
}
}
// Give the deque back to the supervisor.
- self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque))
+ self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque)).unwrap()
}
}
}
@@ -196,7 +203,7 @@ impl<'a, QueueData: 'static, WorkData: Send> WorkerProxy<'a, QueueData, WorkData
}
/// A work queue on which units of work can be submitted.
-pub struct WorkQueue<QueueData, WorkData> {
+pub struct WorkQueue<QueueData: 'static, WorkData: 'static> {
/// Information about each of the workers.
workers: Vec<WorkerInfo<QueueData, WorkData>>,
/// A port on which deques can be received from the workers.
@@ -250,7 +257,7 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
spawn_named(
format!("{} worker {}/{}", task_name, i+1, thread_count),
- proc() {
+ move || {
task_state::initialize(state | task_state::IN_WORKER);
let mut thread = thread;
thread.start()
@@ -283,7 +290,7 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
// Tell the workers to start.
let mut work_count = AtomicUint::new(self.work_count);
for worker in self.workers.iter_mut() {
- worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(), &mut work_count, &self.data))
+ worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(), &mut work_count, &self.data)).unwrap()
}
// Wait for the work to finish.
@@ -292,12 +299,12 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
// Tell everyone to stop.
for worker in self.workers.iter() {
- worker.chan.send(WorkerMsg::Stop)
+ worker.chan.send(WorkerMsg::Stop).unwrap()
}
// Get our deques back.
for _ in range(0, self.workers.len()) {
- match self.port.recv() {
+ match self.port.recv().unwrap() {
SupervisorMsg::ReturnDeque(index, deque) => self.workers[index].deque = Some(deque),
SupervisorMsg::Finished => panic!("unexpected finished message!"),
}
@@ -306,7 +313,7 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
pub fn shutdown(&mut self) {
for worker in self.workers.iter() {
- worker.chan.send(WorkerMsg::Exit)
+ worker.chan.send(WorkerMsg::Exit).unwrap()
}
}
}