diff options
author | Emilio Cobos Álvarez <ecoal95@gmail.com> | 2016-10-07 12:22:06 +0200 |
---|---|---|
committer | Emilio Cobos Álvarez <ecoal95@gmail.com> | 2016-11-14 21:24:19 +0100 |
commit | 73917cce83d2225b51b29c374d861d71ec69435f (patch) | |
tree | ac8c6b562ca3e88a7d853ddcab88d114362559e7 /components/style | |
parent | b7eb36fa84e6c6c77727ea2cd02c57f6750dc7af (diff) | |
download | servo-73917cce83d2225b51b29c374d861d71ec69435f.tar.gz servo-73917cce83d2225b51b29c374d861d71ec69435f.zip |
style: Use rayon instead of our custom work queue.
Diffstat (limited to 'components/style')
-rw-r--r-- | components/style/Cargo.toml | 15 | ||||
-rw-r--r-- | components/style/gecko/data.rs | 15 | ||||
-rw-r--r-- | components/style/gecko/wrapper.rs | 3 | ||||
-rw-r--r-- | components/style/lib.rs | 5 | ||||
-rw-r--r-- | components/style/parallel.rs | 76 | ||||
-rw-r--r-- | components/style/thread_state.rs | 3 | ||||
-rw-r--r-- | components/style/workqueue.rs | 385 |
7 files changed, 46 insertions, 456 deletions
diff --git a/components/style/Cargo.toml b/components/style/Cargo.toml index 3494ea263c2..1374926b08b 100644 --- a/components/style/Cargo.toml +++ b/components/style/Cargo.toml @@ -13,11 +13,11 @@ path = "lib.rs" doctest = false [features] -gecko = ["nsstring_vendor"] +gecko = ["nsstring_vendor", "num_cpus", "rayon/unstable"] servo = ["serde/unstable", "serde", "serde_derive", "heapsize_derive", "style_traits/servo", "app_units/plugins", "servo_atoms", "html5ever-atoms", "cssparser/heap_size", "cssparser/serde-serialization", - "url/heap_size", "plugins"] + "url/heap_size", "plugins", "rayon/unstable"] testing = [] [dependencies] @@ -25,7 +25,6 @@ app_units = "0.3" bitflags = "0.7" cfg-if = "0.1.0" cssparser = "0.7" -deque = "0.3.1" encoding = "0.2" euclid = "0.10.1" fnv = "1.0" @@ -34,17 +33,16 @@ heapsize_derive = {version = "0.1", optional = true} html5ever-atoms = {version = "0.1", optional = true} lazy_static = "0.2" log = "0.3.5" -libc = "0.2" matches = "0.1" nsstring_vendor = {path = "gecko_bindings/nsstring_vendor", optional = true} num-integer = "0.1.32" num-traits = "0.1.32" -num_cpus = "1.1.0" ordered-float = "0.2.2" owning_ref = "0.2.2" parking_lot = "0.3.3" quickersort = "2.0.0" rand = "0.3" +rayon = "0.5" rustc-serialize = "0.3" selectors = "0.14" serde = {version = "0.8", optional = true} @@ -58,11 +56,12 @@ url = "1.2" util = {path = "../util"} plugins = {path = "../plugins", optional = true} +[dependencies.num_cpus] +optional = true +version = "1.0" + [target.'cfg(windows)'.dependencies] kernel32-sys = "0.2" -[target.'cfg(not(windows))'.dependencies] -libc = "0.2" - [build-dependencies] walkdir = "0.1" diff --git a/components/style/gecko/data.rs b/components/style/gecko/data.rs index 1cc360fdeb8..2f6ed96c71f 100644 --- a/components/style/gecko/data.rs +++ b/components/style/gecko/data.rs @@ -4,15 +4,14 @@ use animation::Animation; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; -use context::SharedStyleContext; use dom::OpaqueNode; use euclid::size::TypedSize2D; use gecko_bindings::bindings::RawServoStyleSet; use gecko_bindings::sugar::ownership::{HasBoxFFI, HasFFI, HasSimpleFFI}; use media_queries::{Device, MediaType}; use num_cpus; -use parallel::WorkQueueData; use parking_lot::RwLock; +use rayon; use selector_matching::Stylist; use std::cmp; use std::collections::HashMap; @@ -21,8 +20,6 @@ use std::sync::Arc; use std::sync::mpsc::{Receiver, Sender, channel}; use style_traits::ViewportPx; use stylesheets::Stylesheet; -use thread_state; -use workqueue::WorkQueue; pub struct PerDocumentStyleDataImpl { /// Rule processor. @@ -41,7 +38,7 @@ pub struct PerDocumentStyleDataImpl { pub expired_animations: Arc<RwLock<HashMap<OpaqueNode, Vec<Animation>>>>, // FIXME(bholley): This shouldn't be per-document. - pub work_queue: Option<WorkQueue<SharedStyleContext, WorkQueueData>>, + pub work_queue: Option<rayon::ThreadPool>, pub num_threads: usize, } @@ -76,7 +73,9 @@ impl PerDocumentStyleData { work_queue: if *NUM_THREADS <= 1 { None } else { - WorkQueue::new("StyleWorker", thread_state::LAYOUT, *NUM_THREADS).ok() + let configuration = + rayon::Configuration::new().set_num_threads(*NUM_THREADS); + rayon::ThreadPool::new(configuration).ok() }, num_threads: *NUM_THREADS, })) @@ -112,8 +111,6 @@ unsafe impl HasBoxFFI for PerDocumentStyleData {} impl Drop for PerDocumentStyleDataImpl { fn drop(&mut self) { - if let Some(ref mut queue) = self.work_queue { - queue.shutdown(); - } + let _ = self.work_queue.take(); } } diff --git a/components/style/gecko/wrapper.rs b/components/style/gecko/wrapper.rs index 9adac5f269f..cc40f22ae97 100644 --- a/components/style/gecko/wrapper.rs +++ b/components/style/gecko/wrapper.rs @@ -30,7 +30,6 @@ use gecko_bindings::bindings::Gecko_StoreStyleDifference; use gecko_bindings::structs; use gecko_bindings::structs::{NODE_HAS_DIRTY_DESCENDANTS_FOR_SERVO, NODE_IS_DIRTY_FOR_SERVO}; use gecko_bindings::structs::{nsIAtom, nsIContent, nsStyleContext}; -use libc::uintptr_t; use parking_lot::RwLock; use parser::ParserContextExtraData; use properties::{ComputedValues, parse_style_attribute}; @@ -114,7 +113,7 @@ impl<'ln> TNode for GeckoNode<'ln> { } fn opaque(&self) -> OpaqueNode { - let ptr: uintptr_t = self.0 as *const _ as uintptr_t; + let ptr: usize = self.0 as *const _ as usize; OpaqueNode(ptr) } diff --git a/components/style/lib.rs b/components/style/lib.rs index e7c764ba87c..64fe606d9c1 100644 --- a/components/style/lib.rs +++ b/components/style/lib.rs @@ -49,7 +49,6 @@ extern crate cfg_if; extern crate core; #[macro_use] extern crate cssparser; -extern crate deque; extern crate encoding; extern crate euclid; extern crate fnv; @@ -60,7 +59,6 @@ extern crate heapsize; #[allow(unused_extern_crates)] #[macro_use] extern crate lazy_static; -#[cfg(feature = "gecko")] extern crate libc; #[macro_use] extern crate log; #[allow(unused_extern_crates)] @@ -74,7 +72,7 @@ extern crate ordered_float; extern crate owning_ref; extern crate parking_lot; extern crate quickersort; -extern crate rand; +extern crate rayon; extern crate rustc_serialize; extern crate selectors; #[cfg(feature = "servo")] @@ -131,7 +129,6 @@ pub mod traversal; #[allow(non_camel_case_types)] pub mod values; pub mod viewport; -pub mod workqueue; use std::fmt; use std::sync::Arc; diff --git a/components/style/parallel.rs b/components/style/parallel.rs index 6740c4f379c..e3f4eaf50b1 100644 --- a/components/style/parallel.rs +++ b/components/style/parallel.rs @@ -6,45 +6,18 @@ //! //! This code is highly unsafe. Keep this file small and easy to audit. -#![allow(unsafe_code)] - use dom::{OpaqueNode, StylingMode, TElement, TNode, UnsafeNode}; -use std::mem; +use rayon; use std::sync::atomic::Ordering; use traversal::{STYLE_SHARING_CACHE_HITS, STYLE_SHARING_CACHE_MISSES}; use traversal::DomTraversalContext; use util::opts; -use workqueue::{WorkQueue, WorkUnit, WorkerProxy}; - -#[allow(dead_code)] -fn static_assertion(node: UnsafeNode) { - unsafe { - let _: UnsafeNodeList = mem::transmute(node); - } -} - -pub type UnsafeNodeList = (Box<Vec<UnsafeNode>>, OpaqueNode); pub const CHUNK_SIZE: usize = 64; -pub struct WorkQueueData(usize, usize); - -pub fn run_queue_with_custom_work_data_type<To, F, SharedContext: Sync>( - queue: &mut WorkQueue<SharedContext, WorkQueueData>, - callback: F, - shared: &SharedContext) - where To: 'static + Send, F: FnOnce(&mut WorkQueue<SharedContext, To>) -{ - let queue: &mut WorkQueue<SharedContext, To> = unsafe { - mem::transmute(queue) - }; - callback(queue); - queue.run(shared); -} - pub fn traverse_dom<N, C>(root: N, - queue_data: &C::SharedContext, - queue: &mut WorkQueue<C::SharedContext, WorkQueueData>) + shared_context: &C::SharedContext, + queue: &rayon::ThreadPool) where N: TNode, C: DomTraversalContext<N> { @@ -53,12 +26,15 @@ pub fn traverse_dom<N, C>(root: N, STYLE_SHARING_CACHE_HITS.store(0, Ordering::SeqCst); STYLE_SHARING_CACHE_MISSES.store(0, Ordering::SeqCst); } - run_queue_with_custom_work_data_type(queue, |queue| { - queue.push(WorkUnit { - fun: top_down_dom::<N, C>, - data: (Box::new(vec![root.to_unsafe()]), root.opaque()), + + let nodes = vec![root.to_unsafe()].into_boxed_slice(); + let root = root.opaque(); + queue.install(|| { + rayon::scope(|scope| { + let nodes = nodes; + top_down_dom::<N, C>(&nodes, root, scope, shared_context); }); - }, queue_data); + }); if opts::get().style_sharing_stats { let hits = STYLE_SHARING_CACHE_HITS.load(Ordering::SeqCst); @@ -72,14 +48,18 @@ pub fn traverse_dom<N, C>(root: N, /// A parallel top-down DOM traversal. #[inline(always)] -fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList, - proxy: &mut WorkerProxy<C::SharedContext, UnsafeNodeList>) - where N: TNode, C: DomTraversalContext<N> +#[allow(unsafe_code)] +fn top_down_dom<'a, 'scope, N, C>(unsafe_nodes: &'a [UnsafeNode], + root: OpaqueNode, + scope: &'a rayon::Scope<'scope>, + shared_context: &'scope C::SharedContext) + where N: TNode, + C: DomTraversalContext<N>, { - let context = C::new(proxy.user_data(), unsafe_nodes.1); + let context = C::new(shared_context, root); let mut discovered_child_nodes = vec![]; - for unsafe_node in *unsafe_nodes.0 { + for unsafe_node in unsafe_nodes { // Get a real layout node. let node = unsafe { N::from_unsafe(&unsafe_node) }; @@ -98,7 +78,7 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList, if context.needs_postorder_traversal() { if children_to_process == 0 { // If there were no more children, start walking back up. - bottom_up_dom::<N, C>(unsafe_nodes.1, unsafe_node, proxy) + bottom_up_dom::<N, C>(root, *unsafe_node, shared_context) } else { // Otherwise record the number of children to process when the // time comes. @@ -112,10 +92,11 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList, context.local_context().style_sharing_candidate_cache.borrow_mut().clear(); for chunk in discovered_child_nodes.chunks(CHUNK_SIZE) { - proxy.push(WorkUnit { - fun: top_down_dom::<N, C>, - data: (Box::new(chunk.iter().cloned().collect()), unsafe_nodes.1), - }); + let nodes = chunk.iter().cloned().collect::<Vec<_>>().into_boxed_slice(); + scope.spawn(move |scope| { + let nodes = nodes; + top_down_dom::<N, C>(&nodes, root, scope, shared_context) + }) } } @@ -130,13 +111,14 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList, /// /// The only communication between siblings is that they both /// fetch-and-subtract the parent's children count. +#[allow(unsafe_code)] fn bottom_up_dom<N, C>(root: OpaqueNode, unsafe_node: UnsafeNode, - proxy: &mut WorkerProxy<C::SharedContext, UnsafeNodeList>) + shared_context: &C::SharedContext) where N: TNode, C: DomTraversalContext<N> { - let context = C::new(proxy.user_data(), root); + let context = C::new(shared_context, root); // Get a real layout node. let mut node = unsafe { N::from_unsafe(&unsafe_node) }; diff --git a/components/style/thread_state.rs b/components/style/thread_state.rs index 12e52425f55..b0fbd5f4294 100644 --- a/components/style/thread_state.rs +++ b/components/style/thread_state.rs @@ -72,7 +72,8 @@ mod imp { pub fn get() -> ThreadState { let state = STATE.with(|ref k| { match *k.borrow() { - None => panic!("Thread state not initialized"), + // This is one of the layout threads, that use rayon. + None => super::LAYOUT | super::IN_WORKER, Some(s) => s, } }); diff --git a/components/style/workqueue.rs b/components/style/workqueue.rs deleted file mode 100644 index fc4f66ea120..00000000000 --- a/components/style/workqueue.rs +++ /dev/null @@ -1,385 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -//! A work queue for scheduling units of work across threads in a fork-join fashion. -//! -//! Data associated with queues is simply a pair of unsigned integers. It is expected that a -//! higher-level API on top of this could allow safe fork-join parallelism. - -#![allow(unsafe_code)] - -#[cfg(windows)] -extern crate kernel32; -#[cfg(not(windows))] -extern crate libc; - -use deque::{self, Abort, Data, Empty, Stealer, Worker}; -use rand::{Rng, XorShiftRng, weak_rng}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::{Receiver, Sender, channel}; -use std::thread; -use thread_state; - -/// A unit of work. -/// -/// # Type parameters -/// -/// - `QueueData`: global custom data for the entire work queue. -/// - `WorkData`: custom data specific to each unit of work. -pub struct WorkUnit<QueueData, WorkData: Send> { - /// The function to execute. - pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>), - /// Arbitrary data. - pub data: WorkData, -} - -/// Messages from the supervisor to the worker. -enum WorkerMsg<QueueData: 'static, WorkData: 'static + Send> { - /// Tells the worker to start work. - Start(Worker<WorkUnit<QueueData, WorkData>>, *const AtomicUsize, *const QueueData), - /// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`. - Stop, - /// Tells the worker to measure the heap size of its TLS using the supplied function. - HeapSizeOfTLS(fn() -> usize), - /// Tells the worker thread to terminate. - Exit, -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerMsg<QueueData, WorkData> {} - -/// Messages to the supervisor. -enum SupervisorMsg<QueueData: 'static, WorkData: 'static + Send> { - Finished, - HeapSizeOfTLS(usize), - ReturnDeque(usize, Worker<WorkUnit<QueueData, WorkData>>), -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for SupervisorMsg<QueueData, WorkData> {} - -/// Information that the supervisor thread keeps about the worker threads. -struct WorkerInfo<QueueData: 'static, WorkData: 'static + Send> { - /// The communication channel to the workers. - chan: Sender<WorkerMsg<QueueData, WorkData>>, - /// The worker end of the deque, if we have it. - deque: Option<Worker<WorkUnit<QueueData, WorkData>>>, - /// The thief end of the work-stealing deque. - thief: Stealer<WorkUnit<QueueData, WorkData>>, -} - -/// Information specific to each worker thread that the thread keeps. -struct WorkerThread<QueueData: 'static, WorkData: 'static + Send> { - /// The index of this worker. - index: usize, - /// The communication port from the supervisor. - port: Receiver<WorkerMsg<QueueData, WorkData>>, - /// The communication channel on which messages are sent to the supervisor. - chan: Sender<SupervisorMsg<QueueData, WorkData>>, - /// The thief end of the work-stealing deque for all other workers. - other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>, - /// The random number generator for this worker. - rng: XorShiftRng, -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerThread<QueueData, WorkData> {} - -const SPINS_UNTIL_BACKOFF: u32 = 128; -const BACKOFF_INCREMENT_IN_US: u32 = 5; -const BACKOFFS_UNTIL_CONTROL_CHECK: u32 = 6; - -#[cfg(not(windows))] -fn sleep_microseconds(usec: u32) { - unsafe { - libc::usleep(usec); - } -} - -#[cfg(windows)] -fn sleep_microseconds(_: u32) { - unsafe { - kernel32::Sleep(0); - } -} - -impl<QueueData: Sync, WorkData: Send> WorkerThread<QueueData, WorkData> { - /// The main logic. This function starts up the worker and listens for - /// messages. - fn start(&mut self) { - let deque_index_mask = (self.other_deques.len() as u32).next_power_of_two() - 1; - loop { - // Wait for a start message. - let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() { - WorkerMsg::Start(deque, ref_count, queue_data) => (deque, ref_count, queue_data), - WorkerMsg::Stop => panic!("unexpected stop message"), - WorkerMsg::Exit => return, - WorkerMsg::HeapSizeOfTLS(f) => { - self.chan.send(SupervisorMsg::HeapSizeOfTLS(f())).unwrap(); - continue; - } - }; - - let mut back_off_sleep = 0 as u32; - - // We're off! - 'outer: loop { - let work_unit; - match deque.pop() { - Some(work) => work_unit = work, - None => { - // Become a thief. - let mut i = 0; - loop { - // Don't just use `rand % len` because that's slow on ARM. - let mut victim; - loop { - victim = self.rng.next_u32() & deque_index_mask; - if (victim as usize) < self.other_deques.len() { - break - } - } - - match self.other_deques[victim as usize].steal() { - Empty | Abort => { - // Continue. - } - Data(work) => { - work_unit = work; - back_off_sleep = 0 as u32; - break - } - } - - if i > SPINS_UNTIL_BACKOFF { - if back_off_sleep >= BACKOFF_INCREMENT_IN_US * - BACKOFFS_UNTIL_CONTROL_CHECK { - match self.port.try_recv() { - Ok(WorkerMsg::Stop) => break 'outer, - Ok(WorkerMsg::Exit) => return, - Ok(_) => panic!("unexpected message"), - _ => {} - } - } - - sleep_microseconds(back_off_sleep); - - back_off_sleep += BACKOFF_INCREMENT_IN_US; - i = 0 - } else { - i += 1 - } - } - } - } - - // At this point, we have some work. Perform it. - let mut proxy = WorkerProxy { - worker: &mut deque, - ref_count: ref_count, - // queue_data is kept alive in the stack frame of - // WorkQueue::run until we send the - // SupervisorMsg::ReturnDeque message below. - queue_data: unsafe { &*queue_data }, - worker_index: self.index as u8, - }; - (work_unit.fun)(work_unit.data, &mut proxy); - - // The work is done. Now decrement the count of outstanding work items. If this was - // the last work unit in the queue, then send a message on the channel. - unsafe { - if (*ref_count).fetch_sub(1, Ordering::Release) == 1 { - self.chan.send(SupervisorMsg::Finished).unwrap() - } - } - } - - // Give the deque back to the supervisor. - self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque)).unwrap() - } - } -} - -/// A handle to the work queue that individual work units have. -pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a + Send> { - worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>, - ref_count: *const AtomicUsize, - queue_data: &'a QueueData, - worker_index: u8, -} - -impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData, WorkData> { - /// Enqueues a block into the work queue. - #[inline] - pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) { - unsafe { - drop((*self.ref_count).fetch_add(1, Ordering::Relaxed)); - } - self.worker.push(work_unit); - } - - /// Retrieves the queue user data. - #[inline] - pub fn user_data(&self) -> &'a QueueData { - self.queue_data - } - - /// Retrieves the index of the worker. - #[inline] - pub fn worker_index(&self) -> u8 { - self.worker_index - } -} - -/// A work queue on which units of work can be submitted. -pub struct WorkQueue<QueueData: 'static, WorkData: 'static + Send> { - /// Information about each of the workers. - workers: Vec<WorkerInfo<QueueData, WorkData>>, - /// A port on which deques can be received from the workers. - port: Receiver<SupervisorMsg<QueueData, WorkData>>, - /// The amount of work that has been enqueued. - work_count: usize, -} - -impl<QueueData: Sync, WorkData: Send> WorkQueue<QueueData, WorkData> { - /// Creates a new work queue and spawns all the threads associated with - /// it. - pub fn new(thread_name: &'static str, - state: thread_state::ThreadState, - thread_count: usize) -> Result<WorkQueue<QueueData, WorkData>, ()> { - // Set up data structures. - let (supervisor_chan, supervisor_port) = channel(); - let mut infos = Vec::with_capacity(thread_count); - let mut threads = Vec::with_capacity(thread_count); - for i in 0..thread_count { - let (worker_chan, worker_port) = channel(); - let (worker, thief) = deque::new(); - infos.push(WorkerInfo { - chan: worker_chan, - deque: Some(worker), - thief: thief, - }); - threads.push(WorkerThread { - index: i, - port: worker_port, - chan: supervisor_chan.clone(), - other_deques: vec!(), - rng: weak_rng(), - }); - } - - // Connect workers to one another. - for (i, mut thread) in threads.iter_mut().enumerate() { - for (j, info) in infos.iter().enumerate() { - if i != j { - thread.other_deques.push(info.thief.clone()) - } - } - assert!(thread.other_deques.len() == thread_count - 1) - } - - // Spawn threads. - let mut thread_handles = vec![]; - for (i, thread) in threads.into_iter().enumerate() { - let handle = thread::Builder::new() - .name(format!("{} worker {}/{}", thread_name, i + 1, thread_count)) - .spawn(move || { - thread_state::initialize(state | thread_state::IN_WORKER); - let mut thread = thread; - thread.start() - }); - match handle { - Ok(handle) => { - thread_handles.push(handle); - } - Err(err) => { - warn!("Failed spawning thread: {:?}", err); - break; - } - } - } - - if thread_handles.len() != thread_count { - // At least one worker thread failed to be created, just close the - // rest of them, and return an error. - for (i, handle) in thread_handles.into_iter().enumerate() { - let _ = infos[i].chan.send(WorkerMsg::Exit); - let _ = handle.join(); - } - - return Err(()); - } - - Ok(WorkQueue { - workers: infos, - port: supervisor_port, - work_count: 0, - }) - } - - /// Enqueues a block into the work queue. - #[inline] - pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) { - let deque = &mut self.workers[0].deque; - match *deque { - None => { - panic!("tried to push a block but we don't have the deque?!") - } - Some(ref mut deque) => deque.push(work_unit), - } - self.work_count += 1 - } - - /// Synchronously runs all the enqueued tasks and waits for them to complete. - pub fn run(&mut self, data: &QueueData) { - // Tell the workers to start. - let work_count = AtomicUsize::new(self.work_count); - for worker in &mut self.workers { - worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(), - &work_count, - data)).unwrap() - } - - // Wait for the work to finish. - drop(self.port.recv()); - self.work_count = 0; - - // Tell everyone to stop. - for worker in &self.workers { - worker.chan.send(WorkerMsg::Stop).unwrap() - } - - // Get our deques back. - for _ in 0..self.workers.len() { - match self.port.recv().unwrap() { - SupervisorMsg::ReturnDeque(index, deque) => self.workers[index].deque = Some(deque), - SupervisorMsg::HeapSizeOfTLS(_) => panic!("unexpected HeapSizeOfTLS message"), - SupervisorMsg::Finished => panic!("unexpected finished message!"), - } - } - } - - /// Synchronously measure memory usage of any thread-local storage. - pub fn heap_size_of_tls(&self, f: fn() -> usize) -> Vec<usize> { - // Tell the workers to measure themselves. - for worker in &self.workers { - worker.chan.send(WorkerMsg::HeapSizeOfTLS(f)).unwrap() - } - - // Wait for the workers to finish measuring themselves. - let mut sizes = vec![]; - for _ in 0..self.workers.len() { - match self.port.recv().unwrap() { - SupervisorMsg::HeapSizeOfTLS(size) => { - sizes.push(size); - } - _ => panic!("unexpected message!"), - } - } - sizes - } - - pub fn shutdown(&mut self) { - for worker in &self.workers { - worker.chan.send(WorkerMsg::Exit).unwrap() - } - } -} |