diff options
author | Emilio Cobos Álvarez <ecoal95@gmail.com> | 2016-10-07 12:22:06 +0200 |
---|---|---|
committer | Emilio Cobos Álvarez <ecoal95@gmail.com> | 2016-11-14 21:24:19 +0100 |
commit | 73917cce83d2225b51b29c374d861d71ec69435f (patch) | |
tree | ac8c6b562ca3e88a7d853ddcab88d114362559e7 /components | |
parent | b7eb36fa84e6c6c77727ea2cd02c57f6750dc7af (diff) | |
download | servo-73917cce83d2225b51b29c374d861d71ec69435f.tar.gz servo-73917cce83d2225b51b29c374d861d71ec69435f.zip |
style: Use rayon instead of our custom work queue.
Diffstat (limited to 'components')
-rw-r--r-- | components/layout/Cargo.toml | 1 | ||||
-rw-r--r-- | components/layout/lib.rs | 1 | ||||
-rw-r--r-- | components/layout/parallel.rs | 90 | ||||
-rw-r--r-- | components/layout_thread/Cargo.toml | 1 | ||||
-rw-r--r-- | components/layout_thread/lib.rs | 29 | ||||
-rw-r--r-- | components/servo/Cargo.lock | 6 | ||||
-rw-r--r-- | components/style/Cargo.toml | 15 | ||||
-rw-r--r-- | components/style/gecko/data.rs | 15 | ||||
-rw-r--r-- | components/style/gecko/wrapper.rs | 3 | ||||
-rw-r--r-- | components/style/lib.rs | 5 | ||||
-rw-r--r-- | components/style/parallel.rs | 76 | ||||
-rw-r--r-- | components/style/thread_state.rs | 3 | ||||
-rw-r--r-- | components/style/workqueue.rs | 385 |
13 files changed, 110 insertions, 520 deletions
diff --git a/components/layout/Cargo.toml b/components/layout/Cargo.toml index 62f6cf47ecc..59a5ad9f8cd 100644 --- a/components/layout/Cargo.toml +++ b/components/layout/Cargo.toml @@ -31,6 +31,7 @@ parking_lot = "0.3.3" plugins = {path = "../plugins"} profile_traits = {path = "../profile_traits"} range = {path = "../range"} +rayon = "0.5" script_layout_interface = {path = "../script_layout_interface"} script_traits = {path = "../script_traits"} selectors = "0.14" diff --git a/components/layout/lib.rs b/components/layout/lib.rs index a39e7af2eb2..8fa25f2f069 100644 --- a/components/layout/lib.rs +++ b/components/layout/lib.rs @@ -44,6 +44,7 @@ extern crate plugins as servo_plugins; extern crate profile_traits; #[macro_use] extern crate range; +extern crate rayon; extern crate script_layout_interface; extern crate script_traits; extern crate serde; diff --git a/components/layout/parallel.rs b/components/layout/parallel.rs index 29e53bf9077..03aaa512d3e 100644 --- a/components/layout/parallel.rs +++ b/components/layout/parallel.rs @@ -12,12 +12,11 @@ use context::{LayoutContext, SharedLayoutContext}; use flow::{self, Flow, MutableFlowUtils, PostorderFlowTraversal, PreorderFlowTraversal}; use flow_ref::FlowRef; use profile_traits::time::{self, TimerMetadata, profile}; +use rayon; use std::mem; use std::sync::atomic::{AtomicIsize, Ordering}; use style::dom::UnsafeNode; -use style::parallel::{CHUNK_SIZE, WorkQueueData}; -use style::parallel::run_queue_with_custom_work_data_type; -use style::workqueue::{WorkQueue, WorkUnit, WorkerProxy}; +use style::parallel::CHUNK_SIZE; use traversal::{AssignISizes, BubbleISizes}; use traversal::AssignBSizes; use util::opts; @@ -50,10 +49,8 @@ pub fn borrowed_flow_to_unsafe_flow(flow: &Flow) -> UnsafeFlow { } } -pub type UnsafeFlowList = (Box<Vec<UnsafeNode>>, usize); - -pub type ChunkedFlowTraversalFunction = - extern "Rust" fn(UnsafeFlowList, &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>); +pub type ChunkedFlowTraversalFunction<'scope> = + extern "Rust" fn(Box<[UnsafeFlow]>, &'scope SharedLayoutContext, &rayon::Scope<'scope>); pub type FlowTraversalFunction = extern "Rust" fn(UnsafeFlow, &SharedLayoutContext); @@ -133,27 +130,35 @@ trait ParallelPostorderFlowTraversal : PostorderFlowTraversal { /// A parallel top-down flow traversal. trait ParallelPreorderFlowTraversal : PreorderFlowTraversal { - fn run_parallel(&self, - unsafe_flows: UnsafeFlowList, - proxy: &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>); + fn run_parallel<'scope>(&self, + unsafe_flows: &[UnsafeFlow], + layout_context: &'scope SharedLayoutContext, + scope: &rayon::Scope<'scope>); fn should_record_thread_ids(&self) -> bool; #[inline(always)] - fn run_parallel_helper(&self, - unsafe_flows: UnsafeFlowList, - proxy: &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>, - top_down_func: ChunkedFlowTraversalFunction, - bottom_up_func: FlowTraversalFunction) { - let mut discovered_child_flows = Vec::new(); - for unsafe_flow in *unsafe_flows.0 { + fn run_parallel_helper<'scope>(&self, + unsafe_flows: &[UnsafeFlow], + layout_context: &'scope SharedLayoutContext, + scope: &rayon::Scope<'scope>, + top_down_func: ChunkedFlowTraversalFunction<'scope>, + bottom_up_func: FlowTraversalFunction) + { + let mut discovered_child_flows = vec![]; + for unsafe_flow in unsafe_flows { let mut had_children = false; unsafe { // Get a real flow. - let flow: &mut Flow = mem::transmute(unsafe_flow); + let flow: &mut Flow = mem::transmute(*unsafe_flow); if self.should_record_thread_ids() { - flow::mut_base(flow).thread_id = proxy.worker_index(); + // FIXME(emilio): With the switch to rayon we can no longer + // access a thread id from here easily. Either instrument + // rayon (the unstable feature) to get a worker thread + // identifier, or remove all the layout tinting mode. + // + // flow::mut_base(flow).thread_id = proxy.worker_index(); } if self.should_process(flow) { @@ -170,25 +175,29 @@ trait ParallelPreorderFlowTraversal : PreorderFlowTraversal { // If there were no more children, start assigning block-sizes. if !had_children { - bottom_up_func(unsafe_flow, proxy.user_data()) + bottom_up_func(*unsafe_flow, layout_context) } } for chunk in discovered_child_flows.chunks(CHUNK_SIZE) { - proxy.push(WorkUnit { - fun: top_down_func, - data: (box chunk.iter().cloned().collect(), 0), + let nodes = chunk.iter().cloned().collect::<Vec<_>>().into_boxed_slice(); + + scope.spawn(move |scope| { + top_down_func(nodes, layout_context, scope); }); } } } impl<'a> ParallelPreorderFlowTraversal for AssignISizes<'a> { - fn run_parallel(&self, - unsafe_flows: UnsafeFlowList, - proxy: &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>) { + fn run_parallel<'scope>(&self, + unsafe_flows: &[UnsafeFlow], + layout_context: &'scope SharedLayoutContext, + scope: &rayon::Scope<'scope>) + { self.run_parallel_helper(unsafe_flows, - proxy, + layout_context, + scope, assign_inline_sizes, assign_block_sizes_and_store_overflow) } @@ -200,13 +209,13 @@ impl<'a> ParallelPreorderFlowTraversal for AssignISizes<'a> { impl<'a> ParallelPostorderFlowTraversal for AssignBSizes<'a> {} -fn assign_inline_sizes(unsafe_flows: UnsafeFlowList, - proxy: &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>) { - let shared_layout_context = proxy.user_data(); +fn assign_inline_sizes<'scope>(unsafe_flows: Box<[UnsafeFlow]>, + shared_layout_context: &'scope SharedLayoutContext, + scope: &rayon::Scope<'scope>) { let assign_inline_sizes_traversal = AssignISizes { shared_context: &shared_layout_context.style_context, }; - assign_inline_sizes_traversal.run_parallel(unsafe_flows, proxy) + assign_inline_sizes_traversal.run_parallel(&unsafe_flows, shared_layout_context, scope) } fn assign_block_sizes_and_store_overflow( @@ -224,20 +233,21 @@ pub fn traverse_flow_tree_preorder( profiler_metadata: Option<TimerMetadata>, time_profiler_chan: time::ProfilerChan, shared_layout_context: &SharedLayoutContext, - queue: &mut WorkQueue<SharedLayoutContext, WorkQueueData>) { + queue: &rayon::ThreadPool) { if opts::get().bubble_inline_sizes_separately { let layout_context = LayoutContext::new(shared_layout_context); let bubble_inline_sizes = BubbleISizes { layout_context: &layout_context }; root.traverse_postorder(&bubble_inline_sizes); } - run_queue_with_custom_work_data_type(queue, |queue| { - profile(time::ProfilerCategory::LayoutParallelWarmup, profiler_metadata, - time_profiler_chan, || { - queue.push(WorkUnit { - fun: assign_inline_sizes, - data: (box vec![borrowed_flow_to_unsafe_flow(root)], 0), - }) + let nodes = vec![borrowed_flow_to_unsafe_flow(root)].into_boxed_slice(); + + queue.install(move || { + rayon::scope(move |scope| { + profile(time::ProfilerCategory::LayoutParallelWarmup, + profiler_metadata, time_profiler_chan, move || { + assign_inline_sizes(nodes, &shared_layout_context, scope); + }); }); - }, shared_layout_context); + }); } diff --git a/components/layout_thread/Cargo.toml b/components/layout_thread/Cargo.toml index cc97c74697f..798abe3eb03 100644 --- a/components/layout_thread/Cargo.toml +++ b/components/layout_thread/Cargo.toml @@ -27,6 +27,7 @@ net_traits = {path = "../net_traits"} parking_lot = {version = "0.3.3", features = ["nightly"]} plugins = {path = "../plugins"} profile_traits = {path = "../profile_traits"} +rayon = "0.5" script = {path = "../script"} script_layout_interface = {path = "../script_layout_interface"} script_traits = {path = "../script_traits"} diff --git a/components/layout_thread/lib.rs b/components/layout_thread/lib.rs index 859183935b1..ce08a1ab010 100644 --- a/components/layout_thread/lib.rs +++ b/components/layout_thread/lib.rs @@ -34,6 +34,7 @@ extern crate net_traits; extern crate parking_lot; #[macro_use] extern crate profile_traits; +extern crate rayon; extern crate script; extern crate script_layout_interface; extern crate script_traits; @@ -107,14 +108,12 @@ use style::dom::{TDocument, TElement, TNode}; use style::error_reporting::{ParseErrorReporter, StdoutErrorReporter}; use style::logical_geometry::LogicalPoint; use style::media_queries::{Device, MediaType}; -use style::parallel::WorkQueueData; use style::parser::ParserContextExtraData; use style::selector_matching::Stylist; use style::servo::restyle_damage::{REFLOW, REFLOW_OUT_OF_FLOW, REPAINT, REPOSITION, STORE_OVERFLOW}; use style::stylesheets::{Origin, Stylesheet, UserAgentStylesheets}; use style::thread_state; use style::timer::Timer; -use style::workqueue::WorkQueue; use url::Url; use util::geometry::max_rect; use util::opts; @@ -173,7 +172,7 @@ pub struct LayoutThread { first_reflow: bool, /// The workers that we use for parallel operation. - parallel_traversal: Option<WorkQueue<SharedLayoutContext, WorkQueueData>>, + parallel_traversal: Option<rayon::ThreadPool>, /// Starts at zero, and increased by one every time a layout completes. /// This can be used to easily check for invalid stale data. @@ -383,7 +382,9 @@ impl LayoutThread { MediaType::Screen, opts::get().initial_window_size.to_f32() * ScaleFactor::new(1.0)); let parallel_traversal = if layout_threads != 1 { - WorkQueue::new("LayoutWorker", thread_state::LAYOUT, layout_threads).ok() + let configuration = + rayon::Configuration::new().set_num_threads(layout_threads); + rayon::ThreadPool::new(configuration).ok() } else { None }; @@ -711,19 +712,6 @@ impl LayoutThread { size: heap_size_of_local_context(), }); - // ... as do each of the LayoutWorkers, if present. - if let Some(ref traversal) = self.parallel_traversal { - let sizes = traversal.heap_size_of_tls(heap_size_of_local_context); - for (i, size) in sizes.iter().enumerate() { - reports.push(Report { - path: path![formatted_url, - format!("layout-worker-{}-local-context", i)], - kind: ReportKind::ExplicitJemallocHeapSize, - size: *size, - }); - } - } - reports_chan.send(reports); } @@ -773,9 +761,8 @@ impl LayoutThread { /// Shuts down the layout thread now. If there are any DOM nodes left, layout will now (safely) /// crash. fn exit_now(&mut self) { - if let Some(ref mut traversal) = self.parallel_traversal { - traversal.shutdown() - } + // Drop the rayon threadpool if present. + let _ = self.parallel_traversal.take(); } fn handle_add_stylesheet<'a, 'b>(&self, @@ -855,7 +842,7 @@ impl LayoutThread { /// This corresponds to `Reflow()` in Gecko and `layout()` in WebKit/Blink and should be /// benchmarked against those two. It is marked `#[inline(never)]` to aid profiling. #[inline(never)] - fn solve_constraints_parallel(traversal: &mut WorkQueue<SharedLayoutContext, WorkQueueData>, + fn solve_constraints_parallel(traversal: &rayon::ThreadPool, layout_root: &mut Flow, profiler_metadata: Option<TimerMetadata>, time_profiler_chan: time::ProfilerChan, diff --git a/components/servo/Cargo.lock b/components/servo/Cargo.lock index ed74bf61ba8..a3b4c7731d9 100644 --- a/components/servo/Cargo.lock +++ b/components/servo/Cargo.lock @@ -1265,6 +1265,7 @@ dependencies = [ "plugins 0.0.1", "profile_traits 0.0.1", "range 0.0.1", + "rayon 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "script_layout_interface 0.0.1", "script_traits 0.0.1", "selectors 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1310,6 +1311,7 @@ dependencies = [ "parking_lot 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "plugins 0.0.1", "profile_traits 0.0.1", + "rayon 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "script 0.0.1", "script_layout_interface 0.0.1", "script_traits 0.0.1", @@ -2487,7 +2489,6 @@ dependencies = [ "bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "cssparser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "encoding 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "euclid 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2496,18 +2497,17 @@ dependencies = [ "html5ever-atoms 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "num-integer 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "ordered-float 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "owning_ref 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "plugins 0.0.1", "quickersort 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "selectors 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.8.17 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/components/style/Cargo.toml b/components/style/Cargo.toml index 3494ea263c2..1374926b08b 100644 --- a/components/style/Cargo.toml +++ b/components/style/Cargo.toml @@ -13,11 +13,11 @@ path = "lib.rs" doctest = false [features] -gecko = ["nsstring_vendor"] +gecko = ["nsstring_vendor", "num_cpus", "rayon/unstable"] servo = ["serde/unstable", "serde", "serde_derive", "heapsize_derive", "style_traits/servo", "app_units/plugins", "servo_atoms", "html5ever-atoms", "cssparser/heap_size", "cssparser/serde-serialization", - "url/heap_size", "plugins"] + "url/heap_size", "plugins", "rayon/unstable"] testing = [] [dependencies] @@ -25,7 +25,6 @@ app_units = "0.3" bitflags = "0.7" cfg-if = "0.1.0" cssparser = "0.7" -deque = "0.3.1" encoding = "0.2" euclid = "0.10.1" fnv = "1.0" @@ -34,17 +33,16 @@ heapsize_derive = {version = "0.1", optional = true} html5ever-atoms = {version = "0.1", optional = true} lazy_static = "0.2" log = "0.3.5" -libc = "0.2" matches = "0.1" nsstring_vendor = {path = "gecko_bindings/nsstring_vendor", optional = true} num-integer = "0.1.32" num-traits = "0.1.32" -num_cpus = "1.1.0" ordered-float = "0.2.2" owning_ref = "0.2.2" parking_lot = "0.3.3" quickersort = "2.0.0" rand = "0.3" +rayon = "0.5" rustc-serialize = "0.3" selectors = "0.14" serde = {version = "0.8", optional = true} @@ -58,11 +56,12 @@ url = "1.2" util = {path = "../util"} plugins = {path = "../plugins", optional = true} +[dependencies.num_cpus] +optional = true +version = "1.0" + [target.'cfg(windows)'.dependencies] kernel32-sys = "0.2" -[target.'cfg(not(windows))'.dependencies] -libc = "0.2" - [build-dependencies] walkdir = "0.1" diff --git a/components/style/gecko/data.rs b/components/style/gecko/data.rs index 1cc360fdeb8..2f6ed96c71f 100644 --- a/components/style/gecko/data.rs +++ b/components/style/gecko/data.rs @@ -4,15 +4,14 @@ use animation::Animation; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; -use context::SharedStyleContext; use dom::OpaqueNode; use euclid::size::TypedSize2D; use gecko_bindings::bindings::RawServoStyleSet; use gecko_bindings::sugar::ownership::{HasBoxFFI, HasFFI, HasSimpleFFI}; use media_queries::{Device, MediaType}; use num_cpus; -use parallel::WorkQueueData; use parking_lot::RwLock; +use rayon; use selector_matching::Stylist; use std::cmp; use std::collections::HashMap; @@ -21,8 +20,6 @@ use std::sync::Arc; use std::sync::mpsc::{Receiver, Sender, channel}; use style_traits::ViewportPx; use stylesheets::Stylesheet; -use thread_state; -use workqueue::WorkQueue; pub struct PerDocumentStyleDataImpl { /// Rule processor. @@ -41,7 +38,7 @@ pub struct PerDocumentStyleDataImpl { pub expired_animations: Arc<RwLock<HashMap<OpaqueNode, Vec<Animation>>>>, // FIXME(bholley): This shouldn't be per-document. - pub work_queue: Option<WorkQueue<SharedStyleContext, WorkQueueData>>, + pub work_queue: Option<rayon::ThreadPool>, pub num_threads: usize, } @@ -76,7 +73,9 @@ impl PerDocumentStyleData { work_queue: if *NUM_THREADS <= 1 { None } else { - WorkQueue::new("StyleWorker", thread_state::LAYOUT, *NUM_THREADS).ok() + let configuration = + rayon::Configuration::new().set_num_threads(*NUM_THREADS); + rayon::ThreadPool::new(configuration).ok() }, num_threads: *NUM_THREADS, })) @@ -112,8 +111,6 @@ unsafe impl HasBoxFFI for PerDocumentStyleData {} impl Drop for PerDocumentStyleDataImpl { fn drop(&mut self) { - if let Some(ref mut queue) = self.work_queue { - queue.shutdown(); - } + let _ = self.work_queue.take(); } } diff --git a/components/style/gecko/wrapper.rs b/components/style/gecko/wrapper.rs index 9adac5f269f..cc40f22ae97 100644 --- a/components/style/gecko/wrapper.rs +++ b/components/style/gecko/wrapper.rs @@ -30,7 +30,6 @@ use gecko_bindings::bindings::Gecko_StoreStyleDifference; use gecko_bindings::structs; use gecko_bindings::structs::{NODE_HAS_DIRTY_DESCENDANTS_FOR_SERVO, NODE_IS_DIRTY_FOR_SERVO}; use gecko_bindings::structs::{nsIAtom, nsIContent, nsStyleContext}; -use libc::uintptr_t; use parking_lot::RwLock; use parser::ParserContextExtraData; use properties::{ComputedValues, parse_style_attribute}; @@ -114,7 +113,7 @@ impl<'ln> TNode for GeckoNode<'ln> { } fn opaque(&self) -> OpaqueNode { - let ptr: uintptr_t = self.0 as *const _ as uintptr_t; + let ptr: usize = self.0 as *const _ as usize; OpaqueNode(ptr) } diff --git a/components/style/lib.rs b/components/style/lib.rs index e7c764ba87c..64fe606d9c1 100644 --- a/components/style/lib.rs +++ b/components/style/lib.rs @@ -49,7 +49,6 @@ extern crate cfg_if; extern crate core; #[macro_use] extern crate cssparser; -extern crate deque; extern crate encoding; extern crate euclid; extern crate fnv; @@ -60,7 +59,6 @@ extern crate heapsize; #[allow(unused_extern_crates)] #[macro_use] extern crate lazy_static; -#[cfg(feature = "gecko")] extern crate libc; #[macro_use] extern crate log; #[allow(unused_extern_crates)] @@ -74,7 +72,7 @@ extern crate ordered_float; extern crate owning_ref; extern crate parking_lot; extern crate quickersort; -extern crate rand; +extern crate rayon; extern crate rustc_serialize; extern crate selectors; #[cfg(feature = "servo")] @@ -131,7 +129,6 @@ pub mod traversal; #[allow(non_camel_case_types)] pub mod values; pub mod viewport; -pub mod workqueue; use std::fmt; use std::sync::Arc; diff --git a/components/style/parallel.rs b/components/style/parallel.rs index 6740c4f379c..e3f4eaf50b1 100644 --- a/components/style/parallel.rs +++ b/components/style/parallel.rs @@ -6,45 +6,18 @@ //! //! This code is highly unsafe. Keep this file small and easy to audit. -#![allow(unsafe_code)] - use dom::{OpaqueNode, StylingMode, TElement, TNode, UnsafeNode}; -use std::mem; +use rayon; use std::sync::atomic::Ordering; use traversal::{STYLE_SHARING_CACHE_HITS, STYLE_SHARING_CACHE_MISSES}; use traversal::DomTraversalContext; use util::opts; -use workqueue::{WorkQueue, WorkUnit, WorkerProxy}; - -#[allow(dead_code)] -fn static_assertion(node: UnsafeNode) { - unsafe { - let _: UnsafeNodeList = mem::transmute(node); - } -} - -pub type UnsafeNodeList = (Box<Vec<UnsafeNode>>, OpaqueNode); pub const CHUNK_SIZE: usize = 64; -pub struct WorkQueueData(usize, usize); - -pub fn run_queue_with_custom_work_data_type<To, F, SharedContext: Sync>( - queue: &mut WorkQueue<SharedContext, WorkQueueData>, - callback: F, - shared: &SharedContext) - where To: 'static + Send, F: FnOnce(&mut WorkQueue<SharedContext, To>) -{ - let queue: &mut WorkQueue<SharedContext, To> = unsafe { - mem::transmute(queue) - }; - callback(queue); - queue.run(shared); -} - pub fn traverse_dom<N, C>(root: N, - queue_data: &C::SharedContext, - queue: &mut WorkQueue<C::SharedContext, WorkQueueData>) + shared_context: &C::SharedContext, + queue: &rayon::ThreadPool) where N: TNode, C: DomTraversalContext<N> { @@ -53,12 +26,15 @@ pub fn traverse_dom<N, C>(root: N, STYLE_SHARING_CACHE_HITS.store(0, Ordering::SeqCst); STYLE_SHARING_CACHE_MISSES.store(0, Ordering::SeqCst); } - run_queue_with_custom_work_data_type(queue, |queue| { - queue.push(WorkUnit { - fun: top_down_dom::<N, C>, - data: (Box::new(vec![root.to_unsafe()]), root.opaque()), + + let nodes = vec![root.to_unsafe()].into_boxed_slice(); + let root = root.opaque(); + queue.install(|| { + rayon::scope(|scope| { + let nodes = nodes; + top_down_dom::<N, C>(&nodes, root, scope, shared_context); }); - }, queue_data); + }); if opts::get().style_sharing_stats { let hits = STYLE_SHARING_CACHE_HITS.load(Ordering::SeqCst); @@ -72,14 +48,18 @@ pub fn traverse_dom<N, C>(root: N, /// A parallel top-down DOM traversal. #[inline(always)] -fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList, - proxy: &mut WorkerProxy<C::SharedContext, UnsafeNodeList>) - where N: TNode, C: DomTraversalContext<N> +#[allow(unsafe_code)] +fn top_down_dom<'a, 'scope, N, C>(unsafe_nodes: &'a [UnsafeNode], + root: OpaqueNode, + scope: &'a rayon::Scope<'scope>, + shared_context: &'scope C::SharedContext) + where N: TNode, + C: DomTraversalContext<N>, { - let context = C::new(proxy.user_data(), unsafe_nodes.1); + let context = C::new(shared_context, root); let mut discovered_child_nodes = vec![]; - for unsafe_node in *unsafe_nodes.0 { + for unsafe_node in unsafe_nodes { // Get a real layout node. let node = unsafe { N::from_unsafe(&unsafe_node) }; @@ -98,7 +78,7 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList, if context.needs_postorder_traversal() { if children_to_process == 0 { // If there were no more children, start walking back up. - bottom_up_dom::<N, C>(unsafe_nodes.1, unsafe_node, proxy) + bottom_up_dom::<N, C>(root, *unsafe_node, shared_context) } else { // Otherwise record the number of children to process when the // time comes. @@ -112,10 +92,11 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList, context.local_context().style_sharing_candidate_cache.borrow_mut().clear(); for chunk in discovered_child_nodes.chunks(CHUNK_SIZE) { - proxy.push(WorkUnit { - fun: top_down_dom::<N, C>, - data: (Box::new(chunk.iter().cloned().collect()), unsafe_nodes.1), - }); + let nodes = chunk.iter().cloned().collect::<Vec<_>>().into_boxed_slice(); + scope.spawn(move |scope| { + let nodes = nodes; + top_down_dom::<N, C>(&nodes, root, scope, shared_context) + }) } } @@ -130,13 +111,14 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList, /// /// The only communication between siblings is that they both /// fetch-and-subtract the parent's children count. +#[allow(unsafe_code)] fn bottom_up_dom<N, C>(root: OpaqueNode, unsafe_node: UnsafeNode, - proxy: &mut WorkerProxy<C::SharedContext, UnsafeNodeList>) + shared_context: &C::SharedContext) where N: TNode, C: DomTraversalContext<N> { - let context = C::new(proxy.user_data(), root); + let context = C::new(shared_context, root); // Get a real layout node. let mut node = unsafe { N::from_unsafe(&unsafe_node) }; diff --git a/components/style/thread_state.rs b/components/style/thread_state.rs index 12e52425f55..b0fbd5f4294 100644 --- a/components/style/thread_state.rs +++ b/components/style/thread_state.rs @@ -72,7 +72,8 @@ mod imp { pub fn get() -> ThreadState { let state = STATE.with(|ref k| { match *k.borrow() { - None => panic!("Thread state not initialized"), + // This is one of the layout threads, that use rayon. + None => super::LAYOUT | super::IN_WORKER, Some(s) => s, } }); diff --git a/components/style/workqueue.rs b/components/style/workqueue.rs deleted file mode 100644 index fc4f66ea120..00000000000 --- a/components/style/workqueue.rs +++ /dev/null @@ -1,385 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -//! A work queue for scheduling units of work across threads in a fork-join fashion. -//! -//! Data associated with queues is simply a pair of unsigned integers. It is expected that a -//! higher-level API on top of this could allow safe fork-join parallelism. - -#![allow(unsafe_code)] - -#[cfg(windows)] -extern crate kernel32; -#[cfg(not(windows))] -extern crate libc; - -use deque::{self, Abort, Data, Empty, Stealer, Worker}; -use rand::{Rng, XorShiftRng, weak_rng}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::{Receiver, Sender, channel}; -use std::thread; -use thread_state; - -/// A unit of work. -/// -/// # Type parameters -/// -/// - `QueueData`: global custom data for the entire work queue. -/// - `WorkData`: custom data specific to each unit of work. -pub struct WorkUnit<QueueData, WorkData: Send> { - /// The function to execute. - pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>), - /// Arbitrary data. - pub data: WorkData, -} - -/// Messages from the supervisor to the worker. -enum WorkerMsg<QueueData: 'static, WorkData: 'static + Send> { - /// Tells the worker to start work. - Start(Worker<WorkUnit<QueueData, WorkData>>, *const AtomicUsize, *const QueueData), - /// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`. - Stop, - /// Tells the worker to measure the heap size of its TLS using the supplied function. - HeapSizeOfTLS(fn() -> usize), - /// Tells the worker thread to terminate. - Exit, -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerMsg<QueueData, WorkData> {} - -/// Messages to the supervisor. -enum SupervisorMsg<QueueData: 'static, WorkData: 'static + Send> { - Finished, - HeapSizeOfTLS(usize), - ReturnDeque(usize, Worker<WorkUnit<QueueData, WorkData>>), -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for SupervisorMsg<QueueData, WorkData> {} - -/// Information that the supervisor thread keeps about the worker threads. -struct WorkerInfo<QueueData: 'static, WorkData: 'static + Send> { - /// The communication channel to the workers. - chan: Sender<WorkerMsg<QueueData, WorkData>>, - /// The worker end of the deque, if we have it. - deque: Option<Worker<WorkUnit<QueueData, WorkData>>>, - /// The thief end of the work-stealing deque. - thief: Stealer<WorkUnit<QueueData, WorkData>>, -} - -/// Information specific to each worker thread that the thread keeps. -struct WorkerThread<QueueData: 'static, WorkData: 'static + Send> { - /// The index of this worker. - index: usize, - /// The communication port from the supervisor. - port: Receiver<WorkerMsg<QueueData, WorkData>>, - /// The communication channel on which messages are sent to the supervisor. - chan: Sender<SupervisorMsg<QueueData, WorkData>>, - /// The thief end of the work-stealing deque for all other workers. - other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>, - /// The random number generator for this worker. - rng: XorShiftRng, -} - -unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerThread<QueueData, WorkData> {} - -const SPINS_UNTIL_BACKOFF: u32 = 128; -const BACKOFF_INCREMENT_IN_US: u32 = 5; -const BACKOFFS_UNTIL_CONTROL_CHECK: u32 = 6; - -#[cfg(not(windows))] -fn sleep_microseconds(usec: u32) { - unsafe { - libc::usleep(usec); - } -} - -#[cfg(windows)] -fn sleep_microseconds(_: u32) { - unsafe { - kernel32::Sleep(0); - } -} - -impl<QueueData: Sync, WorkData: Send> WorkerThread<QueueData, WorkData> { - /// The main logic. This function starts up the worker and listens for - /// messages. - fn start(&mut self) { - let deque_index_mask = (self.other_deques.len() as u32).next_power_of_two() - 1; - loop { - // Wait for a start message. - let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() { - WorkerMsg::Start(deque, ref_count, queue_data) => (deque, ref_count, queue_data), - WorkerMsg::Stop => panic!("unexpected stop message"), - WorkerMsg::Exit => return, - WorkerMsg::HeapSizeOfTLS(f) => { - self.chan.send(SupervisorMsg::HeapSizeOfTLS(f())).unwrap(); - continue; - } - }; - - let mut back_off_sleep = 0 as u32; - - // We're off! - 'outer: loop { - let work_unit; - match deque.pop() { - Some(work) => work_unit = work, - None => { - // Become a thief. - let mut i = 0; - loop { - // Don't just use `rand % len` because that's slow on ARM. - let mut victim; - loop { - victim = self.rng.next_u32() & deque_index_mask; - if (victim as usize) < self.other_deques.len() { - break - } - } - - match self.other_deques[victim as usize].steal() { - Empty | Abort => { - // Continue. - } - Data(work) => { - work_unit = work; - back_off_sleep = 0 as u32; - break - } - } - - if i > SPINS_UNTIL_BACKOFF { - if back_off_sleep >= BACKOFF_INCREMENT_IN_US * - BACKOFFS_UNTIL_CONTROL_CHECK { - match self.port.try_recv() { - Ok(WorkerMsg::Stop) => break 'outer, - Ok(WorkerMsg::Exit) => return, - Ok(_) => panic!("unexpected message"), - _ => {} - } - } - - sleep_microseconds(back_off_sleep); - - back_off_sleep += BACKOFF_INCREMENT_IN_US; - i = 0 - } else { - i += 1 - } - } - } - } - - // At this point, we have some work. Perform it. - let mut proxy = WorkerProxy { - worker: &mut deque, - ref_count: ref_count, - // queue_data is kept alive in the stack frame of - // WorkQueue::run until we send the - // SupervisorMsg::ReturnDeque message below. - queue_data: unsafe { &*queue_data }, - worker_index: self.index as u8, - }; - (work_unit.fun)(work_unit.data, &mut proxy); - - // The work is done. Now decrement the count of outstanding work items. If this was - // the last work unit in the queue, then send a message on the channel. - unsafe { - if (*ref_count).fetch_sub(1, Ordering::Release) == 1 { - self.chan.send(SupervisorMsg::Finished).unwrap() - } - } - } - - // Give the deque back to the supervisor. - self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque)).unwrap() - } - } -} - -/// A handle to the work queue that individual work units have. -pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a + Send> { - worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>, - ref_count: *const AtomicUsize, - queue_data: &'a QueueData, - worker_index: u8, -} - -impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData, WorkData> { - /// Enqueues a block into the work queue. - #[inline] - pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) { - unsafe { - drop((*self.ref_count).fetch_add(1, Ordering::Relaxed)); - } - self.worker.push(work_unit); - } - - /// Retrieves the queue user data. - #[inline] - pub fn user_data(&self) -> &'a QueueData { - self.queue_data - } - - /// Retrieves the index of the worker. - #[inline] - pub fn worker_index(&self) -> u8 { - self.worker_index - } -} - -/// A work queue on which units of work can be submitted. -pub struct WorkQueue<QueueData: 'static, WorkData: 'static + Send> { - /// Information about each of the workers. - workers: Vec<WorkerInfo<QueueData, WorkData>>, - /// A port on which deques can be received from the workers. - port: Receiver<SupervisorMsg<QueueData, WorkData>>, - /// The amount of work that has been enqueued. - work_count: usize, -} - -impl<QueueData: Sync, WorkData: Send> WorkQueue<QueueData, WorkData> { - /// Creates a new work queue and spawns all the threads associated with - /// it. - pub fn new(thread_name: &'static str, - state: thread_state::ThreadState, - thread_count: usize) -> Result<WorkQueue<QueueData, WorkData>, ()> { - // Set up data structures. - let (supervisor_chan, supervisor_port) = channel(); - let mut infos = Vec::with_capacity(thread_count); - let mut threads = Vec::with_capacity(thread_count); - for i in 0..thread_count { - let (worker_chan, worker_port) = channel(); - let (worker, thief) = deque::new(); - infos.push(WorkerInfo { - chan: worker_chan, - deque: Some(worker), - thief: thief, - }); - threads.push(WorkerThread { - index: i, - port: worker_port, - chan: supervisor_chan.clone(), - other_deques: vec!(), - rng: weak_rng(), - }); - } - - // Connect workers to one another. - for (i, mut thread) in threads.iter_mut().enumerate() { - for (j, info) in infos.iter().enumerate() { - if i != j { - thread.other_deques.push(info.thief.clone()) - } - } - assert!(thread.other_deques.len() == thread_count - 1) - } - - // Spawn threads. - let mut thread_handles = vec![]; - for (i, thread) in threads.into_iter().enumerate() { - let handle = thread::Builder::new() - .name(format!("{} worker {}/{}", thread_name, i + 1, thread_count)) - .spawn(move || { - thread_state::initialize(state | thread_state::IN_WORKER); - let mut thread = thread; - thread.start() - }); - match handle { - Ok(handle) => { - thread_handles.push(handle); - } - Err(err) => { - warn!("Failed spawning thread: {:?}", err); - break; - } - } - } - - if thread_handles.len() != thread_count { - // At least one worker thread failed to be created, just close the - // rest of them, and return an error. - for (i, handle) in thread_handles.into_iter().enumerate() { - let _ = infos[i].chan.send(WorkerMsg::Exit); - let _ = handle.join(); - } - - return Err(()); - } - - Ok(WorkQueue { - workers: infos, - port: supervisor_port, - work_count: 0, - }) - } - - /// Enqueues a block into the work queue. - #[inline] - pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) { - let deque = &mut self.workers[0].deque; - match *deque { - None => { - panic!("tried to push a block but we don't have the deque?!") - } - Some(ref mut deque) => deque.push(work_unit), - } - self.work_count += 1 - } - - /// Synchronously runs all the enqueued tasks and waits for them to complete. - pub fn run(&mut self, data: &QueueData) { - // Tell the workers to start. - let work_count = AtomicUsize::new(self.work_count); - for worker in &mut self.workers { - worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(), - &work_count, - data)).unwrap() - } - - // Wait for the work to finish. - drop(self.port.recv()); - self.work_count = 0; - - // Tell everyone to stop. - for worker in &self.workers { - worker.chan.send(WorkerMsg::Stop).unwrap() - } - - // Get our deques back. - for _ in 0..self.workers.len() { - match self.port.recv().unwrap() { - SupervisorMsg::ReturnDeque(index, deque) => self.workers[index].deque = Some(deque), - SupervisorMsg::HeapSizeOfTLS(_) => panic!("unexpected HeapSizeOfTLS message"), - SupervisorMsg::Finished => panic!("unexpected finished message!"), - } - } - } - - /// Synchronously measure memory usage of any thread-local storage. - pub fn heap_size_of_tls(&self, f: fn() -> usize) -> Vec<usize> { - // Tell the workers to measure themselves. - for worker in &self.workers { - worker.chan.send(WorkerMsg::HeapSizeOfTLS(f)).unwrap() - } - - // Wait for the workers to finish measuring themselves. - let mut sizes = vec![]; - for _ in 0..self.workers.len() { - match self.port.recv().unwrap() { - SupervisorMsg::HeapSizeOfTLS(size) => { - sizes.push(size); - } - _ => panic!("unexpected message!"), - } - } - sizes - } - - pub fn shutdown(&mut self) { - for worker in &self.workers { - worker.chan.send(WorkerMsg::Exit).unwrap() - } - } -} |