diff options
Diffstat (limited to 'components/style/workqueue.rs')
-rw-r--r-- | components/style/workqueue.rs | 385 |
1 files changed, 0 insertions, 385 deletions
diff --git a/components/style/workqueue.rs b/components/style/workqueue.rs deleted file mode 100644 index fc4f66ea120..00000000000 --- a/components/style/workqueue.rs +++ /dev/null @@ -1,385 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -//! A work queue for scheduling units of work across threads in a fork-join fashion. -//! -//! Data associated with queues is simply a pair of unsigned integers. It is expected that a -//! higher-level API on top of this could allow safe fork-join parallelism. - -#![allow(unsafe_code)] - -#[cfg(windows)] -extern crate kernel32; -#[cfg(not(windows))] -extern crate libc; - -use deque::{self, Abort, Data, Empty, Stealer, Worker}; -use rand::{Rng, XorShiftRng, weak_rng}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::{Receiver, Sender, channel}; -use std::thread; -use thread_state; - -/// A unit of work. -/// -/// # Type parameters -/// -/// - `QueueData`: global custom data for the entire work queue. -/// - `WorkData`: custom data specific to each unit of work. -pub struct WorkUnit<QueueData, WorkData: Send> { - /// The function to execute. - pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>), - /// Arbitrary data. - pub data: WorkData, -} - -/// Messages from the supervisor to the worker. -enum WorkerMsg<QueueData: 'static, WorkData: 'static + Send> { - /// Tells the worker to start work. - Start(Worker<WorkUnit<QueueData, WorkData>>, *const AtomicUsize, *const QueueData), - /// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`. - Stop, - /// Tells the worker to measure the heap size of its TLS using the supplied function. - HeapSizeOfTLS(fn() -> usize), - /// Tells the worker thread to terminate. - Exit, -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerMsg<QueueData, WorkData> {} - -/// Messages to the supervisor. -enum SupervisorMsg<QueueData: 'static, WorkData: 'static + Send> { - Finished, - HeapSizeOfTLS(usize), - ReturnDeque(usize, Worker<WorkUnit<QueueData, WorkData>>), -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for SupervisorMsg<QueueData, WorkData> {} - -/// Information that the supervisor thread keeps about the worker threads. -struct WorkerInfo<QueueData: 'static, WorkData: 'static + Send> { - /// The communication channel to the workers. - chan: Sender<WorkerMsg<QueueData, WorkData>>, - /// The worker end of the deque, if we have it. - deque: Option<Worker<WorkUnit<QueueData, WorkData>>>, - /// The thief end of the work-stealing deque. - thief: Stealer<WorkUnit<QueueData, WorkData>>, -} - -/// Information specific to each worker thread that the thread keeps. -struct WorkerThread<QueueData: 'static, WorkData: 'static + Send> { - /// The index of this worker. - index: usize, - /// The communication port from the supervisor. - port: Receiver<WorkerMsg<QueueData, WorkData>>, - /// The communication channel on which messages are sent to the supervisor. - chan: Sender<SupervisorMsg<QueueData, WorkData>>, - /// The thief end of the work-stealing deque for all other workers. - other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>, - /// The random number generator for this worker. - rng: XorShiftRng, -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerThread<QueueData, WorkData> {} - -const SPINS_UNTIL_BACKOFF: u32 = 128; -const BACKOFF_INCREMENT_IN_US: u32 = 5; -const BACKOFFS_UNTIL_CONTROL_CHECK: u32 = 6; - -#[cfg(not(windows))] -fn sleep_microseconds(usec: u32) { - unsafe { - libc::usleep(usec); - } -} - -#[cfg(windows)] -fn sleep_microseconds(_: u32) { - unsafe { - kernel32::Sleep(0); - } -} - -impl<QueueData: Sync, WorkData: Send> WorkerThread<QueueData, WorkData> { - /// The main logic. This function starts up the worker and listens for - /// messages. - fn start(&mut self) { - let deque_index_mask = (self.other_deques.len() as u32).next_power_of_two() - 1; - loop { - // Wait for a start message. - let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() { - WorkerMsg::Start(deque, ref_count, queue_data) => (deque, ref_count, queue_data), - WorkerMsg::Stop => panic!("unexpected stop message"), - WorkerMsg::Exit => return, - WorkerMsg::HeapSizeOfTLS(f) => { - self.chan.send(SupervisorMsg::HeapSizeOfTLS(f())).unwrap(); - continue; - } - }; - - let mut back_off_sleep = 0 as u32; - - // We're off! - 'outer: loop { - let work_unit; - match deque.pop() { - Some(work) => work_unit = work, - None => { - // Become a thief. - let mut i = 0; - loop { - // Don't just use `rand % len` because that's slow on ARM. - let mut victim; - loop { - victim = self.rng.next_u32() & deque_index_mask; - if (victim as usize) < self.other_deques.len() { - break - } - } - - match self.other_deques[victim as usize].steal() { - Empty | Abort => { - // Continue. - } - Data(work) => { - work_unit = work; - back_off_sleep = 0 as u32; - break - } - } - - if i > SPINS_UNTIL_BACKOFF { - if back_off_sleep >= BACKOFF_INCREMENT_IN_US * - BACKOFFS_UNTIL_CONTROL_CHECK { - match self.port.try_recv() { - Ok(WorkerMsg::Stop) => break 'outer, - Ok(WorkerMsg::Exit) => return, - Ok(_) => panic!("unexpected message"), - _ => {} - } - } - - sleep_microseconds(back_off_sleep); - - back_off_sleep += BACKOFF_INCREMENT_IN_US; - i = 0 - } else { - i += 1 - } - } - } - } - - // At this point, we have some work. Perform it. - let mut proxy = WorkerProxy { - worker: &mut deque, - ref_count: ref_count, - // queue_data is kept alive in the stack frame of - // WorkQueue::run until we send the - // SupervisorMsg::ReturnDeque message below. - queue_data: unsafe { &*queue_data }, - worker_index: self.index as u8, - }; - (work_unit.fun)(work_unit.data, &mut proxy); - - // The work is done. Now decrement the count of outstanding work items. If this was - // the last work unit in the queue, then send a message on the channel. - unsafe { - if (*ref_count).fetch_sub(1, Ordering::Release) == 1 { - self.chan.send(SupervisorMsg::Finished).unwrap() - } - } - } - - // Give the deque back to the supervisor. - self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque)).unwrap() - } - } -} - -/// A handle to the work queue that individual work units have. -pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a + Send> { - worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>, - ref_count: *const AtomicUsize, - queue_data: &'a QueueData, - worker_index: u8, -} - -impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData, WorkData> { - /// Enqueues a block into the work queue. - #[inline] - pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) { - unsafe { - drop((*self.ref_count).fetch_add(1, Ordering::Relaxed)); - } - self.worker.push(work_unit); - } - - /// Retrieves the queue user data. - #[inline] - pub fn user_data(&self) -> &'a QueueData { - self.queue_data - } - - /// Retrieves the index of the worker. - #[inline] - pub fn worker_index(&self) -> u8 { - self.worker_index - } -} - -/// A work queue on which units of work can be submitted. -pub struct WorkQueue<QueueData: 'static, WorkData: 'static + Send> { - /// Information about each of the workers. - workers: Vec<WorkerInfo<QueueData, WorkData>>, - /// A port on which deques can be received from the workers. - port: Receiver<SupervisorMsg<QueueData, WorkData>>, - /// The amount of work that has been enqueued. - work_count: usize, -} - -impl<QueueData: Sync, WorkData: Send> WorkQueue<QueueData, WorkData> { - /// Creates a new work queue and spawns all the threads associated with - /// it. - pub fn new(thread_name: &'static str, - state: thread_state::ThreadState, - thread_count: usize) -> Result<WorkQueue<QueueData, WorkData>, ()> { - // Set up data structures. - let (supervisor_chan, supervisor_port) = channel(); - let mut infos = Vec::with_capacity(thread_count); - let mut threads = Vec::with_capacity(thread_count); - for i in 0..thread_count { - let (worker_chan, worker_port) = channel(); - let (worker, thief) = deque::new(); - infos.push(WorkerInfo { - chan: worker_chan, - deque: Some(worker), - thief: thief, - }); - threads.push(WorkerThread { - index: i, - port: worker_port, - chan: supervisor_chan.clone(), - other_deques: vec!(), - rng: weak_rng(), - }); - } - - // Connect workers to one another. - for (i, mut thread) in threads.iter_mut().enumerate() { - for (j, info) in infos.iter().enumerate() { - if i != j { - thread.other_deques.push(info.thief.clone()) - } - } - assert!(thread.other_deques.len() == thread_count - 1) - } - - // Spawn threads. - let mut thread_handles = vec![]; - for (i, thread) in threads.into_iter().enumerate() { - let handle = thread::Builder::new() - .name(format!("{} worker {}/{}", thread_name, i + 1, thread_count)) - .spawn(move || { - thread_state::initialize(state | thread_state::IN_WORKER); - let mut thread = thread; - thread.start() - }); - match handle { - Ok(handle) => { - thread_handles.push(handle); - } - Err(err) => { - warn!("Failed spawning thread: {:?}", err); - break; - } - } - } - - if thread_handles.len() != thread_count { - // At least one worker thread failed to be created, just close the - // rest of them, and return an error. - for (i, handle) in thread_handles.into_iter().enumerate() { - let _ = infos[i].chan.send(WorkerMsg::Exit); - let _ = handle.join(); - } - - return Err(()); - } - - Ok(WorkQueue { - workers: infos, - port: supervisor_port, - work_count: 0, - }) - } - - /// Enqueues a block into the work queue. - #[inline] - pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) { - let deque = &mut self.workers[0].deque; - match *deque { - None => { - panic!("tried to push a block but we don't have the deque?!") - } - Some(ref mut deque) => deque.push(work_unit), - } - self.work_count += 1 - } - - /// Synchronously runs all the enqueued tasks and waits for them to complete. - pub fn run(&mut self, data: &QueueData) { - // Tell the workers to start. - let work_count = AtomicUsize::new(self.work_count); - for worker in &mut self.workers { - worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(), - &work_count, - data)).unwrap() - } - - // Wait for the work to finish. - drop(self.port.recv()); - self.work_count = 0; - - // Tell everyone to stop. - for worker in &self.workers { - worker.chan.send(WorkerMsg::Stop).unwrap() - } - - // Get our deques back. - for _ in 0..self.workers.len() { - match self.port.recv().unwrap() { - SupervisorMsg::ReturnDeque(index, deque) => self.workers[index].deque = Some(deque), - SupervisorMsg::HeapSizeOfTLS(_) => panic!("unexpected HeapSizeOfTLS message"), - SupervisorMsg::Finished => panic!("unexpected finished message!"), - } - } - } - - /// Synchronously measure memory usage of any thread-local storage. - pub fn heap_size_of_tls(&self, f: fn() -> usize) -> Vec<usize> { - // Tell the workers to measure themselves. - for worker in &self.workers { - worker.chan.send(WorkerMsg::HeapSizeOfTLS(f)).unwrap() - } - - // Wait for the workers to finish measuring themselves. - let mut sizes = vec![]; - for _ in 0..self.workers.len() { - match self.port.recv().unwrap() { - SupervisorMsg::HeapSizeOfTLS(size) => { - sizes.push(size); - } - _ => panic!("unexpected message!"), - } - } - sizes - } - - pub fn shutdown(&mut self) { - for worker in &self.workers { - worker.chan.send(WorkerMsg::Exit).unwrap() - } - } -} |