diff options
Diffstat (limited to 'src/components/util/workqueue.rs')
-rw-r--r-- | src/components/util/workqueue.rs | 291 |
1 files changed, 0 insertions, 291 deletions
diff --git a/src/components/util/workqueue.rs b/src/components/util/workqueue.rs deleted file mode 100644 index 5b27b4a5dab..00000000000 --- a/src/components/util/workqueue.rs +++ /dev/null @@ -1,291 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -//! A work queue for scheduling units of work across threads in a fork-join fashion. -//! -//! Data associated with queues is simply a pair of unsigned integers. It is expected that a -//! higher-level API on top of this could allow safe fork-join parallelism. - -use native::task::NativeTaskBuilder; -use rand::{Rng, XorShiftRng}; -use std::mem; -use std::rand::weak_rng; -use std::sync::atomics::{AtomicUint, SeqCst}; -use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker}; -use std::task::TaskBuilder; - -/// A unit of work. -/// -/// # Type parameters -/// -/// - `QueueData`: global custom data for the entire work queue. -/// - `WorkData`: custom data specific to each unit of work. -pub struct WorkUnit<QueueData, WorkData> { - /// The function to execute. - pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>), - /// Arbitrary data. - pub data: WorkData, -} - -/// Messages from the supervisor to the worker. -enum WorkerMsg<QueueData, WorkData> { - /// Tells the worker to start work. - StartMsg(Worker<WorkUnit<QueueData, WorkData>>, *mut AtomicUint, *const QueueData), - /// Tells the worker to stop. It can be restarted again with a `StartMsg`. - StopMsg, - /// Tells the worker thread to terminate. - ExitMsg, -} - -/// Messages to the supervisor. -enum SupervisorMsg<QueueData, WorkData> { - FinishedMsg, - ReturnDequeMsg(uint, Worker<WorkUnit<QueueData, WorkData>>), -} - -/// Information that the supervisor thread keeps about the worker threads. -struct WorkerInfo<QueueData, WorkData> { - /// The communication channel to the workers. - chan: Sender<WorkerMsg<QueueData, WorkData>>, - /// The worker end of the deque, if we have it. - deque: Option<Worker<WorkUnit<QueueData, WorkData>>>, - /// The thief end of the work-stealing deque. - thief: Stealer<WorkUnit<QueueData, WorkData>>, -} - -/// Information specific to each worker thread that the thread keeps. -struct WorkerThread<QueueData, WorkData> { - /// The index of this worker. - index: uint, - /// The communication port from the supervisor. - port: Receiver<WorkerMsg<QueueData, WorkData>>, - /// The communication channel on which messages are sent to the supervisor. - chan: Sender<SupervisorMsg<QueueData, WorkData>>, - /// The thief end of the work-stealing deque for all other workers. - other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>, - /// The random number generator for this worker. - rng: XorShiftRng, -} - -static SPIN_COUNT: uint = 1000; - -impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> { - /// The main logic. This function starts up the worker and listens for - /// messages. - fn start(&mut self) { - loop { - // Wait for a start message. - let (mut deque, ref_count, queue_data) = match self.port.recv() { - StartMsg(deque, ref_count, queue_data) => (deque, ref_count, queue_data), - StopMsg => fail!("unexpected stop message"), - ExitMsg => return, - }; - - // We're off! - // - // FIXME(pcwalton): Can't use labeled break or continue cross-crate due to a Rust bug. - loop { - // FIXME(pcwalton): Nasty workaround for the lack of labeled break/continue - // cross-crate. - let mut work_unit = unsafe { - mem::uninitialized() - }; - match deque.pop() { - Some(work) => work_unit = work, - None => { - // Become a thief. - let mut i = 0; - let mut should_continue = true; - loop { - let victim = (self.rng.next_u32() as uint) % self.other_deques.len(); - match self.other_deques.get_mut(victim).steal() { - Empty | Abort => { - // Continue. - } - Data(work) => { - work_unit = work; - break - } - } - - if i == SPIN_COUNT { - match self.port.try_recv() { - Ok(StopMsg) => { - should_continue = false; - break - } - Ok(ExitMsg) => return, - Ok(_) => fail!("unexpected message"), - _ => {} - } - - i = 0 - } else { - i += 1 - } - } - - if !should_continue { - break - } - } - } - - // At this point, we have some work. Perform it. - let mut proxy = WorkerProxy { - worker: &mut deque, - ref_count: ref_count, - queue_data: queue_data, - }; - (work_unit.fun)(work_unit.data, &mut proxy); - - // The work is done. Now decrement the count of outstanding work items. If this was - // the last work unit in the queue, then send a message on the channel. - unsafe { - if (*ref_count).fetch_sub(1, SeqCst) == 1 { - self.chan.send(FinishedMsg) - } - } - } - - // Give the deque back to the supervisor. - self.chan.send(ReturnDequeMsg(self.index, deque)) - } - } -} - -/// A handle to the work queue that individual work units have. -pub struct WorkerProxy<'a, QueueData, WorkData> { - worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>, - ref_count: *mut AtomicUint, - queue_data: *const QueueData, -} - -impl<'a, QueueData, WorkData: Send> WorkerProxy<'a, QueueData, WorkData> { - /// Enqueues a block into the work queue. - #[inline] - pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) { - unsafe { - drop((*self.ref_count).fetch_add(1, SeqCst)); - } - self.worker.push(work_unit); - } - - /// Retrieves the queue user data. - #[inline] - pub fn user_data<'a>(&'a self) -> &'a QueueData { - unsafe { - mem::transmute(self.queue_data) - } - } -} - -/// A work queue on which units of work can be submitted. -pub struct WorkQueue<QueueData, WorkData> { - /// Information about each of the workers. - workers: Vec<WorkerInfo<QueueData, WorkData>>, - /// A port on which deques can be received from the workers. - port: Receiver<SupervisorMsg<QueueData, WorkData>>, - /// The amount of work that has been enqueued. - work_count: uint, - /// Arbitrary user data. - pub data: QueueData, -} - -impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> { - /// Creates a new work queue and spawns all the threads associated with - /// it. - pub fn new(task_name: &'static str, thread_count: uint, user_data: QueueData) -> WorkQueue<QueueData, WorkData> { - // Set up data structures. - let (supervisor_chan, supervisor_port) = channel(); - let (mut infos, mut threads) = (vec!(), vec!()); - for i in range(0, thread_count) { - let (worker_chan, worker_port) = channel(); - let pool = BufferPool::new(); - let (worker, thief) = pool.deque(); - infos.push(WorkerInfo { - chan: worker_chan, - deque: Some(worker), - thief: thief, - }); - threads.push(WorkerThread { - index: i, - port: worker_port, - chan: supervisor_chan.clone(), - other_deques: vec!(), - rng: weak_rng(), - }); - } - - // Connect workers to one another. - for i in range(0, thread_count) { - for j in range(0, thread_count) { - if i != j { - threads.get_mut(i).other_deques.push(infos[j].thief.clone()) - } - } - assert!(threads.get(i).other_deques.len() == thread_count - 1) - } - - // Spawn threads. - for thread in threads.move_iter() { - TaskBuilder::new().named(task_name).native().spawn(proc() { - let mut thread = thread; - thread.start() - }) - } - - WorkQueue { - workers: infos, - port: supervisor_port, - work_count: 0, - data: user_data, - } - } - - /// Enqueues a block into the work queue. - #[inline] - pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) { - match self.workers.get_mut(0).deque { - None => { - fail!("tried to push a block but we don't have the deque?!") - } - Some(ref mut deque) => deque.push(work_unit), - } - self.work_count += 1 - } - - /// Synchronously runs all the enqueued tasks and waits for them to complete. - pub fn run(&mut self) { - // Tell the workers to start. - let mut work_count = AtomicUint::new(self.work_count); - for worker in self.workers.mut_iter() { - worker.chan.send(StartMsg(worker.deque.take_unwrap(), &mut work_count, &self.data)) - } - - // Wait for the work to finish. - drop(self.port.recv()); - self.work_count = 0; - - // Tell everyone to stop. - for worker in self.workers.iter() { - worker.chan.send(StopMsg) - } - - // Get our deques back. - for _ in range(0, self.workers.len()) { - match self.port.recv() { - ReturnDequeMsg(index, deque) => self.workers.get_mut(index).deque = Some(deque), - FinishedMsg => fail!("unexpected finished message!"), - } - } - } - - pub fn shutdown(&mut self) { - for worker in self.workers.iter() { - worker.chan.send(ExitMsg) - } - } -} - |