diff options
Diffstat (limited to 'components/util/workqueue.rs')
-rw-r--r-- | components/util/workqueue.rs | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/components/util/workqueue.rs b/components/util/workqueue.rs index ee14a4d1a50..c83902526fd 100644 --- a/components/util/workqueue.rs +++ b/components/util/workqueue.rs @@ -15,6 +15,7 @@ use rand::{Rng, XorShiftRng}; use std::mem; use std::rand::weak_rng; use std::sync::atomic::{AtomicUint, Ordering}; +use std::sync::mpsc::{channel, Sender, Receiver}; use deque::{Abort, BufferPool, Data, Empty, Stealer, Worker}; /// A unit of work. @@ -31,7 +32,7 @@ pub struct WorkUnit<QueueData, WorkData> { } /// Messages from the supervisor to the worker. -enum WorkerMsg<QueueData, WorkData> { +enum WorkerMsg<QueueData: 'static, WorkData: 'static> { /// Tells the worker to start work. Start(Worker<WorkUnit<QueueData, WorkData>>, *mut AtomicUint, *const QueueData), /// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`. @@ -40,14 +41,18 @@ enum WorkerMsg<QueueData, WorkData> { Exit, } +unsafe impl<QueueData: 'static, WorkData: 'static> Send for WorkerMsg<QueueData, WorkData> {} + /// Messages to the supervisor. -enum SupervisorMsg<QueueData, WorkData> { +enum SupervisorMsg<QueueData: 'static, WorkData: 'static> { Finished, ReturnDeque(uint, Worker<WorkUnit<QueueData, WorkData>>), } +unsafe impl<QueueData: 'static, WorkData: 'static> Send for SupervisorMsg<QueueData, WorkData> {} + /// Information that the supervisor thread keeps about the worker threads. -struct WorkerInfo<QueueData, WorkData> { +struct WorkerInfo<QueueData: 'static, WorkData: 'static> { /// The communication channel to the workers. chan: Sender<WorkerMsg<QueueData, WorkData>>, /// The worker end of the deque, if we have it. @@ -57,7 +62,7 @@ struct WorkerInfo<QueueData, WorkData> { } /// Information specific to each worker thread that the thread keeps. -struct WorkerThread<QueueData, WorkData> { +struct WorkerThread<QueueData: 'static, WorkData: 'static> { /// The index of this worker. index: uint, /// The communication port from the supervisor. @@ -70,6 +75,8 @@ struct WorkerThread<QueueData, WorkData> { rng: XorShiftRng, } +unsafe impl<QueueData: 'static, WorkData: 'static> Send for WorkerThread<QueueData, WorkData> {} + static SPIN_COUNT: u32 = 128; static SPINS_UNTIL_BACKOFF: u32 = 100; static BACKOFF_INCREMENT_IN_US: u32 = 5; @@ -80,7 +87,7 @@ impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> { fn start(&mut self) { loop { // Wait for a start message. - let (mut deque, ref_count, queue_data) = match self.port.recv() { + let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() { WorkerMsg::Start(deque, ref_count, queue_data) => (deque, ref_count, queue_data), WorkerMsg::Stop => panic!("unexpected stop message"), WorkerMsg::Exit => return, @@ -158,13 +165,13 @@ impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> { // the last work unit in the queue, then send a message on the channel. unsafe { if (*ref_count).fetch_sub(1, Ordering::SeqCst) == 1 { - self.chan.send(SupervisorMsg::Finished) + self.chan.send(SupervisorMsg::Finished).unwrap() } } } // Give the deque back to the supervisor. - self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque)) + self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque)).unwrap() } } } @@ -196,7 +203,7 @@ impl<'a, QueueData: 'static, WorkData: Send> WorkerProxy<'a, QueueData, WorkData } /// A work queue on which units of work can be submitted. -pub struct WorkQueue<QueueData, WorkData> { +pub struct WorkQueue<QueueData: 'static, WorkData: 'static> { /// Information about each of the workers. workers: Vec<WorkerInfo<QueueData, WorkData>>, /// A port on which deques can be received from the workers. @@ -250,7 +257,7 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> { spawn_named( format!("{} worker {}/{}", task_name, i+1, thread_count), - proc() { + move || { task_state::initialize(state | task_state::IN_WORKER); let mut thread = thread; thread.start() @@ -283,7 +290,7 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> { // Tell the workers to start. let mut work_count = AtomicUint::new(self.work_count); for worker in self.workers.iter_mut() { - worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(), &mut work_count, &self.data)) + worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(), &mut work_count, &self.data)).unwrap() } // Wait for the work to finish. @@ -292,12 +299,12 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> { // Tell everyone to stop. for worker in self.workers.iter() { - worker.chan.send(WorkerMsg::Stop) + worker.chan.send(WorkerMsg::Stop).unwrap() } // Get our deques back. for _ in range(0, self.workers.len()) { - match self.port.recv() { + match self.port.recv().unwrap() { SupervisorMsg::ReturnDeque(index, deque) => self.workers[index].deque = Some(deque), SupervisorMsg::Finished => panic!("unexpected finished message!"), } @@ -306,7 +313,7 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> { pub fn shutdown(&mut self) { for worker in self.workers.iter() { - worker.chan.send(WorkerMsg::Exit) + worker.chan.send(WorkerMsg::Exit).unwrap() } } } |