diff options
author | Patrick Walton <pcwalton@mimiga.net> | 2014-01-23 13:43:03 -0800 |
---|---|---|
committer | Patrick Walton <pcwalton@mimiga.net> | 2014-01-24 20:50:30 -0800 |
commit | 18a2050a64cd6f320cc59cb490a69b0e895f11d3 (patch) | |
tree | 0a27b979054bded5489300728f0ebcb6f5e4ed8d /src/components/util/workqueue.rs | |
parent | 86c29d253a6ffada3488cb08d0154d8901ec252e (diff) | |
download | servo-18a2050a64cd6f320cc59cb490a69b0e895f11d3.tar.gz servo-18a2050a64cd6f320cc59cb490a69b0e895f11d3.zip |
layout: Port parallel layout over to a generic "work queue"
infrastructure.
The work queue accepts abstract generic "work units", which in this case
are layout operations. The same speedups have been observed.
Diffstat (limited to 'src/components/util/workqueue.rs')
-rw-r--r-- | src/components/util/workqueue.rs | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/src/components/util/workqueue.rs b/src/components/util/workqueue.rs new file mode 100644 index 00000000000..0254b0c573f --- /dev/null +++ b/src/components/util/workqueue.rs @@ -0,0 +1,295 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! A work queue for scheduling units of work across threads in a fork-join fashion. +//! +//! Data associated with queues is simply a pair of unsigned integers. It is expected that a +//! higher-level API on top of this could allow safe fork-join parallelism. + +use native; +use std::cast; +use std::rand::{Rng, XorShiftRng}; +use std::rand; +use std::sync::atomics::{AtomicUint, SeqCst}; +use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker}; +use std::unstable::intrinsics; + +/// A unit of work. +/// +/// The type parameter `QUD` stands for "queue user data" and represents global custom data for the +/// entire work queue, and the type parameter `WUD` stands for "work user data" and represents +/// custom data specific to each unit of work. +pub struct WorkUnit<QUD,WUD> { + /// The function to execute. + fun: extern "Rust" fn(WUD, &mut WorkerProxy<QUD,WUD>), + /// Arbitrary data. + data: WUD, +} + +/// Messages from the supervisor to the worker. +enum WorkerMsg<QUD,WUD> { + /// Tells the worker to start work. + StartMsg(Worker<WorkUnit<QUD,WUD>>, *mut AtomicUint, *QUD), + + /// Tells the worker to stop. It can be restarted again with a `StartMsg`. + StopMsg, + + /// Tells the worker thread to terminate. + ExitMsg, +} + +/// Messages to the supervisor. +enum SupervisorMsg<QUD,WUD> { + FinishedMsg, + ReturnDequeMsg(uint, Worker<WorkUnit<QUD,WUD>>), +} + +/// Information that the supervisor thread keeps about the worker threads. +struct WorkerInfo<QUD,WUD> { + /// The communication channel to the workers. + chan: Chan<WorkerMsg<QUD,WUD>>, + /// The buffer pool for this deque. + pool: BufferPool<WorkUnit<QUD,WUD>>, + /// The worker end of the deque, if we have it. + deque: Option<Worker<WorkUnit<QUD,WUD>>>, + /// The thief end of the work-stealing deque. + thief: Stealer<WorkUnit<QUD,WUD>>, +} + +/// Information specific to each worker thread that the thread keeps. +struct WorkerThread<QUD,WUD> { + /// The index of this worker. + index: uint, + /// The communication port from the supervisor. + port: Port<WorkerMsg<QUD,WUD>>, + /// The communication channel on which messages are sent to the supervisor. + chan: SharedChan<SupervisorMsg<QUD,WUD>>, + /// The thief end of the work-stealing deque for all other workers. + other_deques: ~[Stealer<WorkUnit<QUD,WUD>>], + /// The random number generator for this worker. + rng: XorShiftRng, +} + +static SPIN_COUNT: uint = 1000; + +impl<QUD:Send,WUD:Send> WorkerThread<QUD,WUD> { + /// The main logic. This function starts up the worker and listens for + /// messages. + pub fn start(&mut self) { + loop { + // Wait for a start message. + let (mut deque, ref_count, queue_data) = match self.port.recv() { + StartMsg(deque, ref_count, queue_data) => (deque, ref_count, queue_data), + StopMsg => fail!("unexpected stop message"), + ExitMsg => return, + }; + + // We're off! + // + // FIXME(pcwalton): Can't use labeled break or continue cross-crate due to a Rust bug. + loop { + // FIXME(pcwalton): Nasty workaround for the lack of labeled break/continue + // cross-crate. + let mut work_unit = unsafe { + intrinsics::uninit() + }; + match deque.pop() { + Some(work) => work_unit = work, + None => { + // Become a thief. + let mut i = 0; + let mut should_continue = true; + loop { + let victim = (self.rng.next_u32() as uint) % self.other_deques.len(); + match self.other_deques[victim].steal() { + Empty | Abort => { + // Continue. + } + Data(work) => { + work_unit = work; + break + } + } + + if i == SPIN_COUNT { + match self.port.try_recv() { + Some(StopMsg) => { + should_continue = false; + break + } + Some(ExitMsg) => return, + Some(_) => fail!("unexpected message"), + None => {} + } + + i = 0 + } else { + i += 1 + } + } + + if !should_continue { + break + } + } + } + + // At this point, we have some work. Perform it. + let mut proxy = WorkerProxy { + worker: &mut deque, + ref_count: ref_count, + queue_data: queue_data, + }; + (work_unit.fun)(work_unit.data, &mut proxy); + + // The work is done. Now decrement the count of outstanding work items. If this was + // the last work unit in the queue, then send a message on the channel. + unsafe { + if (*ref_count).fetch_sub(1, SeqCst) == 1 { + self.chan.send(FinishedMsg) + } + } + } + + // Give the deque back to the supervisor. + self.chan.send(ReturnDequeMsg(self.index, deque)) + } + } +} + +/// A handle to the work queue that individual work units have. +pub struct WorkerProxy<'a,QUD,WUD> { + priv worker: &'a mut Worker<WorkUnit<QUD,WUD>>, + priv ref_count: *mut AtomicUint, + priv queue_data: *QUD, +} + +impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> { + /// Enqueues a block into the work queue. + #[inline] + pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) { + unsafe { + drop((*self.ref_count).fetch_add(1, SeqCst)); + } + self.worker.push(work_unit); + } + + /// Retrieves the queue user data. + #[inline] + pub fn user_data<'a>(&'a self) -> &'a QUD { + unsafe { + cast::transmute(self.queue_data) + } + } +} + +/// A work queue on which units of work can be submitted. +pub struct WorkQueue<QUD,WUD> { + /// Information about each of the workers. + priv workers: ~[WorkerInfo<QUD,WUD>], + /// A port on which deques can be received from the workers. + priv port: Port<SupervisorMsg<QUD,WUD>>, + /// The amount of work that has been enqueued. + priv work_count: uint, + /// Arbitrary user data. + data: QUD, +} + +impl<QUD:Send,WUD:Send> WorkQueue<QUD,WUD> { + /// Creates a new work queue and spawns all the threads associated with + /// it. + pub fn new(thread_count: uint, user_data: QUD) -> WorkQueue<QUD,WUD> { + // Set up data structures. + let (supervisor_port, supervisor_chan) = SharedChan::new(); + let (mut infos, mut threads) = (~[], ~[]); + for i in range(0, thread_count) { + let (worker_port, worker_chan) = Chan::new(); + let mut pool = BufferPool::new(); + let (worker, thief) = pool.deque(); + infos.push(WorkerInfo { + chan: worker_chan, + pool: pool, + deque: Some(worker), + thief: thief, + }); + threads.push(WorkerThread { + index: i, + port: worker_port, + chan: supervisor_chan.clone(), + other_deques: ~[], + rng: rand::weak_rng(), + }); + } + + // Connect workers to one another. + for i in range(0, thread_count) { + for j in range(0, thread_count) { + if i != j { + threads[i].other_deques.push(infos[j].thief.clone()) + } + } + assert!(threads[i].other_deques.len() == thread_count - 1) + } + + // Spawn threads. + for thread in threads.move_iter() { + native::task::spawn(proc() { + let mut thread = thread; + thread.start() + }) + } + + WorkQueue { + workers: infos, + port: supervisor_port, + work_count: 0, + data: user_data, + } + } + + /// Enqueues a block into the work queue. + #[inline] + pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) { + match self.workers[0].deque { + None => { + fail!("tried to push a block but we don't have the deque?!") + } + Some(ref mut deque) => deque.push(work_unit), + } + self.work_count += 1 + } + + /// Synchronously runs all the enqueued tasks and waits for them to complete. + pub fn run(&mut self) { + // Tell the workers to start. + let mut work_count = AtomicUint::new(self.work_count); + for worker in self.workers.mut_iter() { + worker.chan.send(StartMsg(worker.deque.take_unwrap(), &mut work_count, &self.data)) + } + + // Wait for the work to finish. + drop(self.port.recv()); + self.work_count = 0; + + // Tell everyone to stop. + for worker in self.workers.iter() { + worker.chan.send(StopMsg) + } + + // Get our deques back. + for _ in range(0, self.workers.len()) { + match self.port.recv() { + ReturnDequeMsg(index, deque) => self.workers[index].deque = Some(deque), + FinishedMsg => fail!("unexpected finished message!"), + } + } + } + + pub fn shutdown(&mut self) { + for worker in self.workers.iter() { + worker.chan.send(ExitMsg) + } + } +} + |