aboutsummaryrefslogtreecommitdiffstats
path: root/src/components/util/workqueue.rs
diff options
context:
space:
mode:
authorPatrick Walton <pcwalton@mimiga.net>2014-01-23 13:43:03 -0800
committerPatrick Walton <pcwalton@mimiga.net>2014-01-24 20:50:30 -0800
commit18a2050a64cd6f320cc59cb490a69b0e895f11d3 (patch)
tree0a27b979054bded5489300728f0ebcb6f5e4ed8d /src/components/util/workqueue.rs
parent86c29d253a6ffada3488cb08d0154d8901ec252e (diff)
downloadservo-18a2050a64cd6f320cc59cb490a69b0e895f11d3.tar.gz
servo-18a2050a64cd6f320cc59cb490a69b0e895f11d3.zip
layout: Port parallel layout over to a generic "work queue"
infrastructure. The work queue accepts abstract generic "work units", which in this case are layout operations. The same speedups have been observed.
Diffstat (limited to 'src/components/util/workqueue.rs')
-rw-r--r--src/components/util/workqueue.rs295
1 files changed, 295 insertions, 0 deletions
diff --git a/src/components/util/workqueue.rs b/src/components/util/workqueue.rs
new file mode 100644
index 00000000000..0254b0c573f
--- /dev/null
+++ b/src/components/util/workqueue.rs
@@ -0,0 +1,295 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! A work queue for scheduling units of work across threads in a fork-join fashion.
+//!
+//! Data associated with queues is simply a pair of unsigned integers. It is expected that a
+//! higher-level API on top of this could allow safe fork-join parallelism.
+
+use native;
+use std::cast;
+use std::rand::{Rng, XorShiftRng};
+use std::rand;
+use std::sync::atomics::{AtomicUint, SeqCst};
+use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
+use std::unstable::intrinsics;
+
+/// A unit of work.
+///
+/// The type parameter `QUD` stands for "queue user data" and represents global custom data for the
+/// entire work queue, and the type parameter `WUD` stands for "work user data" and represents
+/// custom data specific to each unit of work.
+pub struct WorkUnit<QUD,WUD> {
+ /// The function to execute.
+ fun: extern "Rust" fn(WUD, &mut WorkerProxy<QUD,WUD>),
+ /// Arbitrary data.
+ data: WUD,
+}
+
+/// Messages from the supervisor to the worker.
+enum WorkerMsg<QUD,WUD> {
+ /// Tells the worker to start work.
+ StartMsg(Worker<WorkUnit<QUD,WUD>>, *mut AtomicUint, *QUD),
+
+ /// Tells the worker to stop. It can be restarted again with a `StartMsg`.
+ StopMsg,
+
+ /// Tells the worker thread to terminate.
+ ExitMsg,
+}
+
+/// Messages to the supervisor.
+enum SupervisorMsg<QUD,WUD> {
+ FinishedMsg,
+ ReturnDequeMsg(uint, Worker<WorkUnit<QUD,WUD>>),
+}
+
+/// Information that the supervisor thread keeps about the worker threads.
+struct WorkerInfo<QUD,WUD> {
+ /// The communication channel to the workers.
+ chan: Chan<WorkerMsg<QUD,WUD>>,
+ /// The buffer pool for this deque.
+ pool: BufferPool<WorkUnit<QUD,WUD>>,
+ /// The worker end of the deque, if we have it.
+ deque: Option<Worker<WorkUnit<QUD,WUD>>>,
+ /// The thief end of the work-stealing deque.
+ thief: Stealer<WorkUnit<QUD,WUD>>,
+}
+
+/// Information specific to each worker thread that the thread keeps.
+struct WorkerThread<QUD,WUD> {
+ /// The index of this worker.
+ index: uint,
+ /// The communication port from the supervisor.
+ port: Port<WorkerMsg<QUD,WUD>>,
+ /// The communication channel on which messages are sent to the supervisor.
+ chan: SharedChan<SupervisorMsg<QUD,WUD>>,
+ /// The thief end of the work-stealing deque for all other workers.
+ other_deques: ~[Stealer<WorkUnit<QUD,WUD>>],
+ /// The random number generator for this worker.
+ rng: XorShiftRng,
+}
+
+static SPIN_COUNT: uint = 1000;
+
+impl<QUD:Send,WUD:Send> WorkerThread<QUD,WUD> {
+ /// The main logic. This function starts up the worker and listens for
+ /// messages.
+ pub fn start(&mut self) {
+ loop {
+ // Wait for a start message.
+ let (mut deque, ref_count, queue_data) = match self.port.recv() {
+ StartMsg(deque, ref_count, queue_data) => (deque, ref_count, queue_data),
+ StopMsg => fail!("unexpected stop message"),
+ ExitMsg => return,
+ };
+
+ // We're off!
+ //
+ // FIXME(pcwalton): Can't use labeled break or continue cross-crate due to a Rust bug.
+ loop {
+ // FIXME(pcwalton): Nasty workaround for the lack of labeled break/continue
+ // cross-crate.
+ let mut work_unit = unsafe {
+ intrinsics::uninit()
+ };
+ match deque.pop() {
+ Some(work) => work_unit = work,
+ None => {
+ // Become a thief.
+ let mut i = 0;
+ let mut should_continue = true;
+ loop {
+ let victim = (self.rng.next_u32() as uint) % self.other_deques.len();
+ match self.other_deques[victim].steal() {
+ Empty | Abort => {
+ // Continue.
+ }
+ Data(work) => {
+ work_unit = work;
+ break
+ }
+ }
+
+ if i == SPIN_COUNT {
+ match self.port.try_recv() {
+ Some(StopMsg) => {
+ should_continue = false;
+ break
+ }
+ Some(ExitMsg) => return,
+ Some(_) => fail!("unexpected message"),
+ None => {}
+ }
+
+ i = 0
+ } else {
+ i += 1
+ }
+ }
+
+ if !should_continue {
+ break
+ }
+ }
+ }
+
+ // At this point, we have some work. Perform it.
+ let mut proxy = WorkerProxy {
+ worker: &mut deque,
+ ref_count: ref_count,
+ queue_data: queue_data,
+ };
+ (work_unit.fun)(work_unit.data, &mut proxy);
+
+ // The work is done. Now decrement the count of outstanding work items. If this was
+ // the last work unit in the queue, then send a message on the channel.
+ unsafe {
+ if (*ref_count).fetch_sub(1, SeqCst) == 1 {
+ self.chan.send(FinishedMsg)
+ }
+ }
+ }
+
+ // Give the deque back to the supervisor.
+ self.chan.send(ReturnDequeMsg(self.index, deque))
+ }
+ }
+}
+
+/// A handle to the work queue that individual work units have.
+pub struct WorkerProxy<'a,QUD,WUD> {
+ priv worker: &'a mut Worker<WorkUnit<QUD,WUD>>,
+ priv ref_count: *mut AtomicUint,
+ priv queue_data: *QUD,
+}
+
+impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> {
+ /// Enqueues a block into the work queue.
+ #[inline]
+ pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) {
+ unsafe {
+ drop((*self.ref_count).fetch_add(1, SeqCst));
+ }
+ self.worker.push(work_unit);
+ }
+
+ /// Retrieves the queue user data.
+ #[inline]
+ pub fn user_data<'a>(&'a self) -> &'a QUD {
+ unsafe {
+ cast::transmute(self.queue_data)
+ }
+ }
+}
+
+/// A work queue on which units of work can be submitted.
+pub struct WorkQueue<QUD,WUD> {
+ /// Information about each of the workers.
+ priv workers: ~[WorkerInfo<QUD,WUD>],
+ /// A port on which deques can be received from the workers.
+ priv port: Port<SupervisorMsg<QUD,WUD>>,
+ /// The amount of work that has been enqueued.
+ priv work_count: uint,
+ /// Arbitrary user data.
+ data: QUD,
+}
+
+impl<QUD:Send,WUD:Send> WorkQueue<QUD,WUD> {
+ /// Creates a new work queue and spawns all the threads associated with
+ /// it.
+ pub fn new(thread_count: uint, user_data: QUD) -> WorkQueue<QUD,WUD> {
+ // Set up data structures.
+ let (supervisor_port, supervisor_chan) = SharedChan::new();
+ let (mut infos, mut threads) = (~[], ~[]);
+ for i in range(0, thread_count) {
+ let (worker_port, worker_chan) = Chan::new();
+ let mut pool = BufferPool::new();
+ let (worker, thief) = pool.deque();
+ infos.push(WorkerInfo {
+ chan: worker_chan,
+ pool: pool,
+ deque: Some(worker),
+ thief: thief,
+ });
+ threads.push(WorkerThread {
+ index: i,
+ port: worker_port,
+ chan: supervisor_chan.clone(),
+ other_deques: ~[],
+ rng: rand::weak_rng(),
+ });
+ }
+
+ // Connect workers to one another.
+ for i in range(0, thread_count) {
+ for j in range(0, thread_count) {
+ if i != j {
+ threads[i].other_deques.push(infos[j].thief.clone())
+ }
+ }
+ assert!(threads[i].other_deques.len() == thread_count - 1)
+ }
+
+ // Spawn threads.
+ for thread in threads.move_iter() {
+ native::task::spawn(proc() {
+ let mut thread = thread;
+ thread.start()
+ })
+ }
+
+ WorkQueue {
+ workers: infos,
+ port: supervisor_port,
+ work_count: 0,
+ data: user_data,
+ }
+ }
+
+ /// Enqueues a block into the work queue.
+ #[inline]
+ pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) {
+ match self.workers[0].deque {
+ None => {
+ fail!("tried to push a block but we don't have the deque?!")
+ }
+ Some(ref mut deque) => deque.push(work_unit),
+ }
+ self.work_count += 1
+ }
+
+ /// Synchronously runs all the enqueued tasks and waits for them to complete.
+ pub fn run(&mut self) {
+ // Tell the workers to start.
+ let mut work_count = AtomicUint::new(self.work_count);
+ for worker in self.workers.mut_iter() {
+ worker.chan.send(StartMsg(worker.deque.take_unwrap(), &mut work_count, &self.data))
+ }
+
+ // Wait for the work to finish.
+ drop(self.port.recv());
+ self.work_count = 0;
+
+ // Tell everyone to stop.
+ for worker in self.workers.iter() {
+ worker.chan.send(StopMsg)
+ }
+
+ // Get our deques back.
+ for _ in range(0, self.workers.len()) {
+ match self.port.recv() {
+ ReturnDequeMsg(index, deque) => self.workers[index].deque = Some(deque),
+ FinishedMsg => fail!("unexpected finished message!"),
+ }
+ }
+ }
+
+ pub fn shutdown(&mut self) {
+ for worker in self.workers.iter() {
+ worker.chan.send(ExitMsg)
+ }
+ }
+}
+