aboutsummaryrefslogtreecommitdiffstats
path: root/components/style/workqueue.rs
diff options
context:
space:
mode:
authorAnthony Ramine <n.oxyde@gmail.com>2016-07-04 17:07:45 +0200
committerAnthony Ramine <n.oxyde@gmail.com>2016-07-04 21:56:09 +0200
commitbf34fdde1fbee49bf518ef0dcbb40447fc35fe0f (patch)
tree1065799bd8dffa8d3137506183f8ec6f23d9e79e /components/style/workqueue.rs
parentb6ca1b9b3bdf6cba99e082426dec860f66c05073 (diff)
downloadservo-bf34fdde1fbee49bf518ef0dcbb40447fc35fe0f.tar.gz
servo-bf34fdde1fbee49bf518ef0dcbb40447fc35fe0f.zip
Move util::workqueue to style
Diffstat (limited to 'components/style/workqueue.rs')
-rw-r--r--components/style/workqueue.rs363
1 files changed, 363 insertions, 0 deletions
diff --git a/components/style/workqueue.rs b/components/style/workqueue.rs
new file mode 100644
index 00000000000..acee9faf5f8
--- /dev/null
+++ b/components/style/workqueue.rs
@@ -0,0 +1,363 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! A work queue for scheduling units of work across threads in a fork-join fashion.
+//!
+//! Data associated with queues is simply a pair of unsigned integers. It is expected that a
+//! higher-level API on top of this could allow safe fork-join parallelism.
+
+#![allow(unsafe_code)]
+
+#[cfg(windows)]
+extern crate kernel32;
+
+use deque::{self, Abort, Data, Empty, Stealer, Worker};
+#[cfg(not(windows))]
+use libc::usleep;
+use rand::{Rng, XorShiftRng, weak_rng};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::{Receiver, Sender, channel};
+use util::thread::spawn_named;
+use util::thread_state;
+
+/// A unit of work.
+///
+/// # Type parameters
+///
+/// - `QueueData`: global custom data for the entire work queue.
+/// - `WorkData`: custom data specific to each unit of work.
+pub struct WorkUnit<QueueData, WorkData: Send> {
+ /// The function to execute.
+ pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>),
+ /// Arbitrary data.
+ pub data: WorkData,
+}
+
+/// Messages from the supervisor to the worker.
+enum WorkerMsg<QueueData: 'static, WorkData: 'static + Send> {
+ /// Tells the worker to start work.
+ Start(Worker<WorkUnit<QueueData, WorkData>>, *const AtomicUsize, *const QueueData),
+ /// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`.
+ Stop,
+ /// Tells the worker to measure the heap size of its TLS using the supplied function.
+ HeapSizeOfTLS(fn() -> usize),
+ /// Tells the worker thread to terminate.
+ Exit,
+}
+
+unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerMsg<QueueData, WorkData> {}
+
+/// Messages to the supervisor.
+enum SupervisorMsg<QueueData: 'static, WorkData: 'static + Send> {
+ Finished,
+ HeapSizeOfTLS(usize),
+ ReturnDeque(usize, Worker<WorkUnit<QueueData, WorkData>>),
+}
+
+unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for SupervisorMsg<QueueData, WorkData> {}
+
+/// Information that the supervisor thread keeps about the worker threads.
+struct WorkerInfo<QueueData: 'static, WorkData: 'static + Send> {
+ /// The communication channel to the workers.
+ chan: Sender<WorkerMsg<QueueData, WorkData>>,
+ /// The worker end of the deque, if we have it.
+ deque: Option<Worker<WorkUnit<QueueData, WorkData>>>,
+ /// The thief end of the work-stealing deque.
+ thief: Stealer<WorkUnit<QueueData, WorkData>>,
+}
+
+/// Information specific to each worker thread that the thread keeps.
+struct WorkerThread<QueueData: 'static, WorkData: 'static + Send> {
+ /// The index of this worker.
+ index: usize,
+ /// The communication port from the supervisor.
+ port: Receiver<WorkerMsg<QueueData, WorkData>>,
+ /// The communication channel on which messages are sent to the supervisor.
+ chan: Sender<SupervisorMsg<QueueData, WorkData>>,
+ /// The thief end of the work-stealing deque for all other workers.
+ other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>,
+ /// The random number generator for this worker.
+ rng: XorShiftRng,
+}
+
+unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerThread<QueueData, WorkData> {}
+
+const SPINS_UNTIL_BACKOFF: u32 = 128;
+const BACKOFF_INCREMENT_IN_US: u32 = 5;
+const BACKOFFS_UNTIL_CONTROL_CHECK: u32 = 6;
+
+#[cfg(not(windows))]
+fn sleep_microseconds(usec: u32) {
+ unsafe {
+ usleep(usec);
+ }
+}
+
+#[cfg(windows)]
+fn sleep_microseconds(_: u32) {
+ unsafe {
+ kernel32::Sleep(0);
+ }
+}
+
+impl<QueueData: Sync, WorkData: Send> WorkerThread<QueueData, WorkData> {
+ /// The main logic. This function starts up the worker and listens for
+ /// messages.
+ fn start(&mut self) {
+ let deque_index_mask = (self.other_deques.len() as u32).next_power_of_two() - 1;
+ loop {
+ // Wait for a start message.
+ let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() {
+ WorkerMsg::Start(deque, ref_count, queue_data) => (deque, ref_count, queue_data),
+ WorkerMsg::Stop => panic!("unexpected stop message"),
+ WorkerMsg::Exit => return,
+ WorkerMsg::HeapSizeOfTLS(f) => {
+ self.chan.send(SupervisorMsg::HeapSizeOfTLS(f())).unwrap();
+ continue;
+ }
+ };
+
+ let mut back_off_sleep = 0 as u32;
+
+ // We're off!
+ 'outer: loop {
+ let work_unit;
+ match deque.pop() {
+ Some(work) => work_unit = work,
+ None => {
+ // Become a thief.
+ let mut i = 0;
+ loop {
+ // Don't just use `rand % len` because that's slow on ARM.
+ let mut victim;
+ loop {
+ victim = self.rng.next_u32() & deque_index_mask;
+ if (victim as usize) < self.other_deques.len() {
+ break
+ }
+ }
+
+ match self.other_deques[victim as usize].steal() {
+ Empty | Abort => {
+ // Continue.
+ }
+ Data(work) => {
+ work_unit = work;
+ back_off_sleep = 0 as u32;
+ break
+ }
+ }
+
+ if i > SPINS_UNTIL_BACKOFF {
+ if back_off_sleep >= BACKOFF_INCREMENT_IN_US *
+ BACKOFFS_UNTIL_CONTROL_CHECK {
+ match self.port.try_recv() {
+ Ok(WorkerMsg::Stop) => break 'outer,
+ Ok(WorkerMsg::Exit) => return,
+ Ok(_) => panic!("unexpected message"),
+ _ => {}
+ }
+ }
+
+ sleep_microseconds(back_off_sleep);
+
+ back_off_sleep += BACKOFF_INCREMENT_IN_US;
+ i = 0
+ } else {
+ i += 1
+ }
+ }
+ }
+ }
+
+ // At this point, we have some work. Perform it.
+ let mut proxy = WorkerProxy {
+ worker: &mut deque,
+ ref_count: ref_count,
+ // queue_data is kept alive in the stack frame of
+ // WorkQueue::run until we send the
+ // SupervisorMsg::ReturnDeque message below.
+ queue_data: unsafe { &*queue_data },
+ worker_index: self.index as u8,
+ };
+ (work_unit.fun)(work_unit.data, &mut proxy);
+
+ // The work is done. Now decrement the count of outstanding work items. If this was
+ // the last work unit in the queue, then send a message on the channel.
+ unsafe {
+ if (*ref_count).fetch_sub(1, Ordering::Release) == 1 {
+ self.chan.send(SupervisorMsg::Finished).unwrap()
+ }
+ }
+ }
+
+ // Give the deque back to the supervisor.
+ self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque)).unwrap()
+ }
+ }
+}
+
+/// A handle to the work queue that individual work units have.
+pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a + Send> {
+ worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>,
+ ref_count: *const AtomicUsize,
+ queue_data: &'a QueueData,
+ worker_index: u8,
+}
+
+impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData, WorkData> {
+ /// Enqueues a block into the work queue.
+ #[inline]
+ pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
+ unsafe {
+ drop((*self.ref_count).fetch_add(1, Ordering::Relaxed));
+ }
+ self.worker.push(work_unit);
+ }
+
+ /// Retrieves the queue user data.
+ #[inline]
+ pub fn user_data(&self) -> &'a QueueData {
+ self.queue_data
+ }
+
+ /// Retrieves the index of the worker.
+ #[inline]
+ pub fn worker_index(&self) -> u8 {
+ self.worker_index
+ }
+}
+
+/// A work queue on which units of work can be submitted.
+pub struct WorkQueue<QueueData: 'static, WorkData: 'static + Send> {
+ /// Information about each of the workers.
+ workers: Vec<WorkerInfo<QueueData, WorkData>>,
+ /// A port on which deques can be received from the workers.
+ port: Receiver<SupervisorMsg<QueueData, WorkData>>,
+ /// The amount of work that has been enqueued.
+ work_count: usize,
+}
+
+impl<QueueData: Sync, WorkData: Send> WorkQueue<QueueData, WorkData> {
+ /// Creates a new work queue and spawns all the threads associated with
+ /// it.
+ pub fn new(thread_name: &'static str,
+ state: thread_state::ThreadState,
+ thread_count: usize) -> WorkQueue<QueueData, WorkData> {
+ // Set up data structures.
+ let (supervisor_chan, supervisor_port) = channel();
+ let (mut infos, mut threads) = (vec!(), vec!());
+ for i in 0..thread_count {
+ let (worker_chan, worker_port) = channel();
+ let (worker, thief) = deque::new();
+ infos.push(WorkerInfo {
+ chan: worker_chan,
+ deque: Some(worker),
+ thief: thief,
+ });
+ threads.push(WorkerThread {
+ index: i,
+ port: worker_port,
+ chan: supervisor_chan.clone(),
+ other_deques: vec!(),
+ rng: weak_rng(),
+ });
+ }
+
+ // Connect workers to one another.
+ for (i, mut thread) in threads.iter_mut().enumerate() {
+ for (j, info) in infos.iter().enumerate() {
+ if i != j {
+ thread.other_deques.push(info.thief.clone())
+ }
+ }
+ assert!(thread.other_deques.len() == thread_count - 1)
+ }
+
+ // Spawn threads.
+ for (i, thread) in threads.into_iter().enumerate() {
+ spawn_named(
+ format!("{} worker {}/{}", thread_name, i + 1, thread_count),
+ move || {
+ thread_state::initialize(state | thread_state::IN_WORKER);
+ let mut thread = thread;
+ thread.start()
+ })
+ }
+
+ WorkQueue {
+ workers: infos,
+ port: supervisor_port,
+ work_count: 0,
+ }
+ }
+
+ /// Enqueues a block into the work queue.
+ #[inline]
+ pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
+ let deque = &mut self.workers[0].deque;
+ match *deque {
+ None => {
+ panic!("tried to push a block but we don't have the deque?!")
+ }
+ Some(ref mut deque) => deque.push(work_unit),
+ }
+ self.work_count += 1
+ }
+
+ /// Synchronously runs all the enqueued tasks and waits for them to complete.
+ pub fn run(&mut self, data: &QueueData) {
+ // Tell the workers to start.
+ let work_count = AtomicUsize::new(self.work_count);
+ for worker in &mut self.workers {
+ worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(),
+ &work_count,
+ data)).unwrap()
+ }
+
+ // Wait for the work to finish.
+ drop(self.port.recv());
+ self.work_count = 0;
+
+ // Tell everyone to stop.
+ for worker in &self.workers {
+ worker.chan.send(WorkerMsg::Stop).unwrap()
+ }
+
+ // Get our deques back.
+ for _ in 0..self.workers.len() {
+ match self.port.recv().unwrap() {
+ SupervisorMsg::ReturnDeque(index, deque) => self.workers[index].deque = Some(deque),
+ SupervisorMsg::HeapSizeOfTLS(_) => panic!("unexpected HeapSizeOfTLS message"),
+ SupervisorMsg::Finished => panic!("unexpected finished message!"),
+ }
+ }
+ }
+
+ /// Synchronously measure memory usage of any thread-local storage.
+ pub fn heap_size_of_tls(&self, f: fn() -> usize) -> Vec<usize> {
+ // Tell the workers to measure themselves.
+ for worker in &self.workers {
+ worker.chan.send(WorkerMsg::HeapSizeOfTLS(f)).unwrap()
+ }
+
+ // Wait for the workers to finish measuring themselves.
+ let mut sizes = vec![];
+ for _ in 0..self.workers.len() {
+ match self.port.recv().unwrap() {
+ SupervisorMsg::HeapSizeOfTLS(size) => {
+ sizes.push(size);
+ }
+ _ => panic!("unexpected message!"),
+ }
+ }
+ sizes
+ }
+
+ pub fn shutdown(&mut self) {
+ for worker in &self.workers {
+ worker.chan.send(WorkerMsg::Exit).unwrap()
+ }
+ }
+}