aboutsummaryrefslogtreecommitdiffstats
path: root/components
diff options
context:
space:
mode:
authorEmilio Cobos Álvarez <ecoal95@gmail.com>2016-10-07 12:22:06 +0200
committerEmilio Cobos Álvarez <ecoal95@gmail.com>2016-11-14 21:24:19 +0100
commit73917cce83d2225b51b29c374d861d71ec69435f (patch)
treeac8c6b562ca3e88a7d853ddcab88d114362559e7 /components
parentb7eb36fa84e6c6c77727ea2cd02c57f6750dc7af (diff)
downloadservo-73917cce83d2225b51b29c374d861d71ec69435f.tar.gz
servo-73917cce83d2225b51b29c374d861d71ec69435f.zip
style: Use rayon instead of our custom work queue.
Diffstat (limited to 'components')
-rw-r--r--components/layout/Cargo.toml1
-rw-r--r--components/layout/lib.rs1
-rw-r--r--components/layout/parallel.rs90
-rw-r--r--components/layout_thread/Cargo.toml1
-rw-r--r--components/layout_thread/lib.rs29
-rw-r--r--components/servo/Cargo.lock6
-rw-r--r--components/style/Cargo.toml15
-rw-r--r--components/style/gecko/data.rs15
-rw-r--r--components/style/gecko/wrapper.rs3
-rw-r--r--components/style/lib.rs5
-rw-r--r--components/style/parallel.rs76
-rw-r--r--components/style/thread_state.rs3
-rw-r--r--components/style/workqueue.rs385
13 files changed, 110 insertions, 520 deletions
diff --git a/components/layout/Cargo.toml b/components/layout/Cargo.toml
index 62f6cf47ecc..59a5ad9f8cd 100644
--- a/components/layout/Cargo.toml
+++ b/components/layout/Cargo.toml
@@ -31,6 +31,7 @@ parking_lot = "0.3.3"
plugins = {path = "../plugins"}
profile_traits = {path = "../profile_traits"}
range = {path = "../range"}
+rayon = "0.5"
script_layout_interface = {path = "../script_layout_interface"}
script_traits = {path = "../script_traits"}
selectors = "0.14"
diff --git a/components/layout/lib.rs b/components/layout/lib.rs
index a39e7af2eb2..8fa25f2f069 100644
--- a/components/layout/lib.rs
+++ b/components/layout/lib.rs
@@ -44,6 +44,7 @@ extern crate plugins as servo_plugins;
extern crate profile_traits;
#[macro_use]
extern crate range;
+extern crate rayon;
extern crate script_layout_interface;
extern crate script_traits;
extern crate serde;
diff --git a/components/layout/parallel.rs b/components/layout/parallel.rs
index 29e53bf9077..03aaa512d3e 100644
--- a/components/layout/parallel.rs
+++ b/components/layout/parallel.rs
@@ -12,12 +12,11 @@ use context::{LayoutContext, SharedLayoutContext};
use flow::{self, Flow, MutableFlowUtils, PostorderFlowTraversal, PreorderFlowTraversal};
use flow_ref::FlowRef;
use profile_traits::time::{self, TimerMetadata, profile};
+use rayon;
use std::mem;
use std::sync::atomic::{AtomicIsize, Ordering};
use style::dom::UnsafeNode;
-use style::parallel::{CHUNK_SIZE, WorkQueueData};
-use style::parallel::run_queue_with_custom_work_data_type;
-use style::workqueue::{WorkQueue, WorkUnit, WorkerProxy};
+use style::parallel::CHUNK_SIZE;
use traversal::{AssignISizes, BubbleISizes};
use traversal::AssignBSizes;
use util::opts;
@@ -50,10 +49,8 @@ pub fn borrowed_flow_to_unsafe_flow(flow: &Flow) -> UnsafeFlow {
}
}
-pub type UnsafeFlowList = (Box<Vec<UnsafeNode>>, usize);
-
-pub type ChunkedFlowTraversalFunction =
- extern "Rust" fn(UnsafeFlowList, &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>);
+pub type ChunkedFlowTraversalFunction<'scope> =
+ extern "Rust" fn(Box<[UnsafeFlow]>, &'scope SharedLayoutContext, &rayon::Scope<'scope>);
pub type FlowTraversalFunction = extern "Rust" fn(UnsafeFlow, &SharedLayoutContext);
@@ -133,27 +130,35 @@ trait ParallelPostorderFlowTraversal : PostorderFlowTraversal {
/// A parallel top-down flow traversal.
trait ParallelPreorderFlowTraversal : PreorderFlowTraversal {
- fn run_parallel(&self,
- unsafe_flows: UnsafeFlowList,
- proxy: &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>);
+ fn run_parallel<'scope>(&self,
+ unsafe_flows: &[UnsafeFlow],
+ layout_context: &'scope SharedLayoutContext,
+ scope: &rayon::Scope<'scope>);
fn should_record_thread_ids(&self) -> bool;
#[inline(always)]
- fn run_parallel_helper(&self,
- unsafe_flows: UnsafeFlowList,
- proxy: &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>,
- top_down_func: ChunkedFlowTraversalFunction,
- bottom_up_func: FlowTraversalFunction) {
- let mut discovered_child_flows = Vec::new();
- for unsafe_flow in *unsafe_flows.0 {
+ fn run_parallel_helper<'scope>(&self,
+ unsafe_flows: &[UnsafeFlow],
+ layout_context: &'scope SharedLayoutContext,
+ scope: &rayon::Scope<'scope>,
+ top_down_func: ChunkedFlowTraversalFunction<'scope>,
+ bottom_up_func: FlowTraversalFunction)
+ {
+ let mut discovered_child_flows = vec![];
+ for unsafe_flow in unsafe_flows {
let mut had_children = false;
unsafe {
// Get a real flow.
- let flow: &mut Flow = mem::transmute(unsafe_flow);
+ let flow: &mut Flow = mem::transmute(*unsafe_flow);
if self.should_record_thread_ids() {
- flow::mut_base(flow).thread_id = proxy.worker_index();
+ // FIXME(emilio): With the switch to rayon we can no longer
+ // access a thread id from here easily. Either instrument
+ // rayon (the unstable feature) to get a worker thread
+ // identifier, or remove all the layout tinting mode.
+ //
+ // flow::mut_base(flow).thread_id = proxy.worker_index();
}
if self.should_process(flow) {
@@ -170,25 +175,29 @@ trait ParallelPreorderFlowTraversal : PreorderFlowTraversal {
// If there were no more children, start assigning block-sizes.
if !had_children {
- bottom_up_func(unsafe_flow, proxy.user_data())
+ bottom_up_func(*unsafe_flow, layout_context)
}
}
for chunk in discovered_child_flows.chunks(CHUNK_SIZE) {
- proxy.push(WorkUnit {
- fun: top_down_func,
- data: (box chunk.iter().cloned().collect(), 0),
+ let nodes = chunk.iter().cloned().collect::<Vec<_>>().into_boxed_slice();
+
+ scope.spawn(move |scope| {
+ top_down_func(nodes, layout_context, scope);
});
}
}
}
impl<'a> ParallelPreorderFlowTraversal for AssignISizes<'a> {
- fn run_parallel(&self,
- unsafe_flows: UnsafeFlowList,
- proxy: &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>) {
+ fn run_parallel<'scope>(&self,
+ unsafe_flows: &[UnsafeFlow],
+ layout_context: &'scope SharedLayoutContext,
+ scope: &rayon::Scope<'scope>)
+ {
self.run_parallel_helper(unsafe_flows,
- proxy,
+ layout_context,
+ scope,
assign_inline_sizes,
assign_block_sizes_and_store_overflow)
}
@@ -200,13 +209,13 @@ impl<'a> ParallelPreorderFlowTraversal for AssignISizes<'a> {
impl<'a> ParallelPostorderFlowTraversal for AssignBSizes<'a> {}
-fn assign_inline_sizes(unsafe_flows: UnsafeFlowList,
- proxy: &mut WorkerProxy<SharedLayoutContext, UnsafeFlowList>) {
- let shared_layout_context = proxy.user_data();
+fn assign_inline_sizes<'scope>(unsafe_flows: Box<[UnsafeFlow]>,
+ shared_layout_context: &'scope SharedLayoutContext,
+ scope: &rayon::Scope<'scope>) {
let assign_inline_sizes_traversal = AssignISizes {
shared_context: &shared_layout_context.style_context,
};
- assign_inline_sizes_traversal.run_parallel(unsafe_flows, proxy)
+ assign_inline_sizes_traversal.run_parallel(&unsafe_flows, shared_layout_context, scope)
}
fn assign_block_sizes_and_store_overflow(
@@ -224,20 +233,21 @@ pub fn traverse_flow_tree_preorder(
profiler_metadata: Option<TimerMetadata>,
time_profiler_chan: time::ProfilerChan,
shared_layout_context: &SharedLayoutContext,
- queue: &mut WorkQueue<SharedLayoutContext, WorkQueueData>) {
+ queue: &rayon::ThreadPool) {
if opts::get().bubble_inline_sizes_separately {
let layout_context = LayoutContext::new(shared_layout_context);
let bubble_inline_sizes = BubbleISizes { layout_context: &layout_context };
root.traverse_postorder(&bubble_inline_sizes);
}
- run_queue_with_custom_work_data_type(queue, |queue| {
- profile(time::ProfilerCategory::LayoutParallelWarmup, profiler_metadata,
- time_profiler_chan, || {
- queue.push(WorkUnit {
- fun: assign_inline_sizes,
- data: (box vec![borrowed_flow_to_unsafe_flow(root)], 0),
- })
+ let nodes = vec![borrowed_flow_to_unsafe_flow(root)].into_boxed_slice();
+
+ queue.install(move || {
+ rayon::scope(move |scope| {
+ profile(time::ProfilerCategory::LayoutParallelWarmup,
+ profiler_metadata, time_profiler_chan, move || {
+ assign_inline_sizes(nodes, &shared_layout_context, scope);
+ });
});
- }, shared_layout_context);
+ });
}
diff --git a/components/layout_thread/Cargo.toml b/components/layout_thread/Cargo.toml
index cc97c74697f..798abe3eb03 100644
--- a/components/layout_thread/Cargo.toml
+++ b/components/layout_thread/Cargo.toml
@@ -27,6 +27,7 @@ net_traits = {path = "../net_traits"}
parking_lot = {version = "0.3.3", features = ["nightly"]}
plugins = {path = "../plugins"}
profile_traits = {path = "../profile_traits"}
+rayon = "0.5"
script = {path = "../script"}
script_layout_interface = {path = "../script_layout_interface"}
script_traits = {path = "../script_traits"}
diff --git a/components/layout_thread/lib.rs b/components/layout_thread/lib.rs
index 859183935b1..ce08a1ab010 100644
--- a/components/layout_thread/lib.rs
+++ b/components/layout_thread/lib.rs
@@ -34,6 +34,7 @@ extern crate net_traits;
extern crate parking_lot;
#[macro_use]
extern crate profile_traits;
+extern crate rayon;
extern crate script;
extern crate script_layout_interface;
extern crate script_traits;
@@ -107,14 +108,12 @@ use style::dom::{TDocument, TElement, TNode};
use style::error_reporting::{ParseErrorReporter, StdoutErrorReporter};
use style::logical_geometry::LogicalPoint;
use style::media_queries::{Device, MediaType};
-use style::parallel::WorkQueueData;
use style::parser::ParserContextExtraData;
use style::selector_matching::Stylist;
use style::servo::restyle_damage::{REFLOW, REFLOW_OUT_OF_FLOW, REPAINT, REPOSITION, STORE_OVERFLOW};
use style::stylesheets::{Origin, Stylesheet, UserAgentStylesheets};
use style::thread_state;
use style::timer::Timer;
-use style::workqueue::WorkQueue;
use url::Url;
use util::geometry::max_rect;
use util::opts;
@@ -173,7 +172,7 @@ pub struct LayoutThread {
first_reflow: bool,
/// The workers that we use for parallel operation.
- parallel_traversal: Option<WorkQueue<SharedLayoutContext, WorkQueueData>>,
+ parallel_traversal: Option<rayon::ThreadPool>,
/// Starts at zero, and increased by one every time a layout completes.
/// This can be used to easily check for invalid stale data.
@@ -383,7 +382,9 @@ impl LayoutThread {
MediaType::Screen,
opts::get().initial_window_size.to_f32() * ScaleFactor::new(1.0));
let parallel_traversal = if layout_threads != 1 {
- WorkQueue::new("LayoutWorker", thread_state::LAYOUT, layout_threads).ok()
+ let configuration =
+ rayon::Configuration::new().set_num_threads(layout_threads);
+ rayon::ThreadPool::new(configuration).ok()
} else {
None
};
@@ -711,19 +712,6 @@ impl LayoutThread {
size: heap_size_of_local_context(),
});
- // ... as do each of the LayoutWorkers, if present.
- if let Some(ref traversal) = self.parallel_traversal {
- let sizes = traversal.heap_size_of_tls(heap_size_of_local_context);
- for (i, size) in sizes.iter().enumerate() {
- reports.push(Report {
- path: path![formatted_url,
- format!("layout-worker-{}-local-context", i)],
- kind: ReportKind::ExplicitJemallocHeapSize,
- size: *size,
- });
- }
- }
-
reports_chan.send(reports);
}
@@ -773,9 +761,8 @@ impl LayoutThread {
/// Shuts down the layout thread now. If there are any DOM nodes left, layout will now (safely)
/// crash.
fn exit_now(&mut self) {
- if let Some(ref mut traversal) = self.parallel_traversal {
- traversal.shutdown()
- }
+ // Drop the rayon threadpool if present.
+ let _ = self.parallel_traversal.take();
}
fn handle_add_stylesheet<'a, 'b>(&self,
@@ -855,7 +842,7 @@ impl LayoutThread {
/// This corresponds to `Reflow()` in Gecko and `layout()` in WebKit/Blink and should be
/// benchmarked against those two. It is marked `#[inline(never)]` to aid profiling.
#[inline(never)]
- fn solve_constraints_parallel(traversal: &mut WorkQueue<SharedLayoutContext, WorkQueueData>,
+ fn solve_constraints_parallel(traversal: &rayon::ThreadPool,
layout_root: &mut Flow,
profiler_metadata: Option<TimerMetadata>,
time_profiler_chan: time::ProfilerChan,
diff --git a/components/servo/Cargo.lock b/components/servo/Cargo.lock
index ed74bf61ba8..a3b4c7731d9 100644
--- a/components/servo/Cargo.lock
+++ b/components/servo/Cargo.lock
@@ -1265,6 +1265,7 @@ dependencies = [
"plugins 0.0.1",
"profile_traits 0.0.1",
"range 0.0.1",
+ "rayon 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"script_layout_interface 0.0.1",
"script_traits 0.0.1",
"selectors 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1310,6 +1311,7 @@ dependencies = [
"parking_lot 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"plugins 0.0.1",
"profile_traits 0.0.1",
+ "rayon 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"script 0.0.1",
"script_layout_interface 0.0.1",
"script_traits 0.0.1",
@@ -2487,7 +2489,6 @@ dependencies = [
"bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"cssparser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"encoding 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"euclid 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2496,18 +2497,17 @@ dependencies = [
"html5ever-atoms 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"matches 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"num-integer 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
- "num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ordered-float 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"owning_ref 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"plugins 0.0.1",
"quickersort 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rayon 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"selectors 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 0.8.17 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/components/style/Cargo.toml b/components/style/Cargo.toml
index 3494ea263c2..1374926b08b 100644
--- a/components/style/Cargo.toml
+++ b/components/style/Cargo.toml
@@ -13,11 +13,11 @@ path = "lib.rs"
doctest = false
[features]
-gecko = ["nsstring_vendor"]
+gecko = ["nsstring_vendor", "num_cpus", "rayon/unstable"]
servo = ["serde/unstable", "serde", "serde_derive", "heapsize_derive",
"style_traits/servo", "app_units/plugins", "servo_atoms", "html5ever-atoms",
"cssparser/heap_size", "cssparser/serde-serialization",
- "url/heap_size", "plugins"]
+ "url/heap_size", "plugins", "rayon/unstable"]
testing = []
[dependencies]
@@ -25,7 +25,6 @@ app_units = "0.3"
bitflags = "0.7"
cfg-if = "0.1.0"
cssparser = "0.7"
-deque = "0.3.1"
encoding = "0.2"
euclid = "0.10.1"
fnv = "1.0"
@@ -34,17 +33,16 @@ heapsize_derive = {version = "0.1", optional = true}
html5ever-atoms = {version = "0.1", optional = true}
lazy_static = "0.2"
log = "0.3.5"
-libc = "0.2"
matches = "0.1"
nsstring_vendor = {path = "gecko_bindings/nsstring_vendor", optional = true}
num-integer = "0.1.32"
num-traits = "0.1.32"
-num_cpus = "1.1.0"
ordered-float = "0.2.2"
owning_ref = "0.2.2"
parking_lot = "0.3.3"
quickersort = "2.0.0"
rand = "0.3"
+rayon = "0.5"
rustc-serialize = "0.3"
selectors = "0.14"
serde = {version = "0.8", optional = true}
@@ -58,11 +56,12 @@ url = "1.2"
util = {path = "../util"}
plugins = {path = "../plugins", optional = true}
+[dependencies.num_cpus]
+optional = true
+version = "1.0"
+
[target.'cfg(windows)'.dependencies]
kernel32-sys = "0.2"
-[target.'cfg(not(windows))'.dependencies]
-libc = "0.2"
-
[build-dependencies]
walkdir = "0.1"
diff --git a/components/style/gecko/data.rs b/components/style/gecko/data.rs
index 1cc360fdeb8..2f6ed96c71f 100644
--- a/components/style/gecko/data.rs
+++ b/components/style/gecko/data.rs
@@ -4,15 +4,14 @@
use animation::Animation;
use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
-use context::SharedStyleContext;
use dom::OpaqueNode;
use euclid::size::TypedSize2D;
use gecko_bindings::bindings::RawServoStyleSet;
use gecko_bindings::sugar::ownership::{HasBoxFFI, HasFFI, HasSimpleFFI};
use media_queries::{Device, MediaType};
use num_cpus;
-use parallel::WorkQueueData;
use parking_lot::RwLock;
+use rayon;
use selector_matching::Stylist;
use std::cmp;
use std::collections::HashMap;
@@ -21,8 +20,6 @@ use std::sync::Arc;
use std::sync::mpsc::{Receiver, Sender, channel};
use style_traits::ViewportPx;
use stylesheets::Stylesheet;
-use thread_state;
-use workqueue::WorkQueue;
pub struct PerDocumentStyleDataImpl {
/// Rule processor.
@@ -41,7 +38,7 @@ pub struct PerDocumentStyleDataImpl {
pub expired_animations: Arc<RwLock<HashMap<OpaqueNode, Vec<Animation>>>>,
// FIXME(bholley): This shouldn't be per-document.
- pub work_queue: Option<WorkQueue<SharedStyleContext, WorkQueueData>>,
+ pub work_queue: Option<rayon::ThreadPool>,
pub num_threads: usize,
}
@@ -76,7 +73,9 @@ impl PerDocumentStyleData {
work_queue: if *NUM_THREADS <= 1 {
None
} else {
- WorkQueue::new("StyleWorker", thread_state::LAYOUT, *NUM_THREADS).ok()
+ let configuration =
+ rayon::Configuration::new().set_num_threads(*NUM_THREADS);
+ rayon::ThreadPool::new(configuration).ok()
},
num_threads: *NUM_THREADS,
}))
@@ -112,8 +111,6 @@ unsafe impl HasBoxFFI for PerDocumentStyleData {}
impl Drop for PerDocumentStyleDataImpl {
fn drop(&mut self) {
- if let Some(ref mut queue) = self.work_queue {
- queue.shutdown();
- }
+ let _ = self.work_queue.take();
}
}
diff --git a/components/style/gecko/wrapper.rs b/components/style/gecko/wrapper.rs
index 9adac5f269f..cc40f22ae97 100644
--- a/components/style/gecko/wrapper.rs
+++ b/components/style/gecko/wrapper.rs
@@ -30,7 +30,6 @@ use gecko_bindings::bindings::Gecko_StoreStyleDifference;
use gecko_bindings::structs;
use gecko_bindings::structs::{NODE_HAS_DIRTY_DESCENDANTS_FOR_SERVO, NODE_IS_DIRTY_FOR_SERVO};
use gecko_bindings::structs::{nsIAtom, nsIContent, nsStyleContext};
-use libc::uintptr_t;
use parking_lot::RwLock;
use parser::ParserContextExtraData;
use properties::{ComputedValues, parse_style_attribute};
@@ -114,7 +113,7 @@ impl<'ln> TNode for GeckoNode<'ln> {
}
fn opaque(&self) -> OpaqueNode {
- let ptr: uintptr_t = self.0 as *const _ as uintptr_t;
+ let ptr: usize = self.0 as *const _ as usize;
OpaqueNode(ptr)
}
diff --git a/components/style/lib.rs b/components/style/lib.rs
index e7c764ba87c..64fe606d9c1 100644
--- a/components/style/lib.rs
+++ b/components/style/lib.rs
@@ -49,7 +49,6 @@ extern crate cfg_if;
extern crate core;
#[macro_use]
extern crate cssparser;
-extern crate deque;
extern crate encoding;
extern crate euclid;
extern crate fnv;
@@ -60,7 +59,6 @@ extern crate heapsize;
#[allow(unused_extern_crates)]
#[macro_use]
extern crate lazy_static;
-#[cfg(feature = "gecko")] extern crate libc;
#[macro_use]
extern crate log;
#[allow(unused_extern_crates)]
@@ -74,7 +72,7 @@ extern crate ordered_float;
extern crate owning_ref;
extern crate parking_lot;
extern crate quickersort;
-extern crate rand;
+extern crate rayon;
extern crate rustc_serialize;
extern crate selectors;
#[cfg(feature = "servo")]
@@ -131,7 +129,6 @@ pub mod traversal;
#[allow(non_camel_case_types)]
pub mod values;
pub mod viewport;
-pub mod workqueue;
use std::fmt;
use std::sync::Arc;
diff --git a/components/style/parallel.rs b/components/style/parallel.rs
index 6740c4f379c..e3f4eaf50b1 100644
--- a/components/style/parallel.rs
+++ b/components/style/parallel.rs
@@ -6,45 +6,18 @@
//!
//! This code is highly unsafe. Keep this file small and easy to audit.
-#![allow(unsafe_code)]
-
use dom::{OpaqueNode, StylingMode, TElement, TNode, UnsafeNode};
-use std::mem;
+use rayon;
use std::sync::atomic::Ordering;
use traversal::{STYLE_SHARING_CACHE_HITS, STYLE_SHARING_CACHE_MISSES};
use traversal::DomTraversalContext;
use util::opts;
-use workqueue::{WorkQueue, WorkUnit, WorkerProxy};
-
-#[allow(dead_code)]
-fn static_assertion(node: UnsafeNode) {
- unsafe {
- let _: UnsafeNodeList = mem::transmute(node);
- }
-}
-
-pub type UnsafeNodeList = (Box<Vec<UnsafeNode>>, OpaqueNode);
pub const CHUNK_SIZE: usize = 64;
-pub struct WorkQueueData(usize, usize);
-
-pub fn run_queue_with_custom_work_data_type<To, F, SharedContext: Sync>(
- queue: &mut WorkQueue<SharedContext, WorkQueueData>,
- callback: F,
- shared: &SharedContext)
- where To: 'static + Send, F: FnOnce(&mut WorkQueue<SharedContext, To>)
-{
- let queue: &mut WorkQueue<SharedContext, To> = unsafe {
- mem::transmute(queue)
- };
- callback(queue);
- queue.run(shared);
-}
-
pub fn traverse_dom<N, C>(root: N,
- queue_data: &C::SharedContext,
- queue: &mut WorkQueue<C::SharedContext, WorkQueueData>)
+ shared_context: &C::SharedContext,
+ queue: &rayon::ThreadPool)
where N: TNode,
C: DomTraversalContext<N>
{
@@ -53,12 +26,15 @@ pub fn traverse_dom<N, C>(root: N,
STYLE_SHARING_CACHE_HITS.store(0, Ordering::SeqCst);
STYLE_SHARING_CACHE_MISSES.store(0, Ordering::SeqCst);
}
- run_queue_with_custom_work_data_type(queue, |queue| {
- queue.push(WorkUnit {
- fun: top_down_dom::<N, C>,
- data: (Box::new(vec![root.to_unsafe()]), root.opaque()),
+
+ let nodes = vec![root.to_unsafe()].into_boxed_slice();
+ let root = root.opaque();
+ queue.install(|| {
+ rayon::scope(|scope| {
+ let nodes = nodes;
+ top_down_dom::<N, C>(&nodes, root, scope, shared_context);
});
- }, queue_data);
+ });
if opts::get().style_sharing_stats {
let hits = STYLE_SHARING_CACHE_HITS.load(Ordering::SeqCst);
@@ -72,14 +48,18 @@ pub fn traverse_dom<N, C>(root: N,
/// A parallel top-down DOM traversal.
#[inline(always)]
-fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList,
- proxy: &mut WorkerProxy<C::SharedContext, UnsafeNodeList>)
- where N: TNode, C: DomTraversalContext<N>
+#[allow(unsafe_code)]
+fn top_down_dom<'a, 'scope, N, C>(unsafe_nodes: &'a [UnsafeNode],
+ root: OpaqueNode,
+ scope: &'a rayon::Scope<'scope>,
+ shared_context: &'scope C::SharedContext)
+ where N: TNode,
+ C: DomTraversalContext<N>,
{
- let context = C::new(proxy.user_data(), unsafe_nodes.1);
+ let context = C::new(shared_context, root);
let mut discovered_child_nodes = vec![];
- for unsafe_node in *unsafe_nodes.0 {
+ for unsafe_node in unsafe_nodes {
// Get a real layout node.
let node = unsafe { N::from_unsafe(&unsafe_node) };
@@ -98,7 +78,7 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList,
if context.needs_postorder_traversal() {
if children_to_process == 0 {
// If there were no more children, start walking back up.
- bottom_up_dom::<N, C>(unsafe_nodes.1, unsafe_node, proxy)
+ bottom_up_dom::<N, C>(root, *unsafe_node, shared_context)
} else {
// Otherwise record the number of children to process when the
// time comes.
@@ -112,10 +92,11 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList,
context.local_context().style_sharing_candidate_cache.borrow_mut().clear();
for chunk in discovered_child_nodes.chunks(CHUNK_SIZE) {
- proxy.push(WorkUnit {
- fun: top_down_dom::<N, C>,
- data: (Box::new(chunk.iter().cloned().collect()), unsafe_nodes.1),
- });
+ let nodes = chunk.iter().cloned().collect::<Vec<_>>().into_boxed_slice();
+ scope.spawn(move |scope| {
+ let nodes = nodes;
+ top_down_dom::<N, C>(&nodes, root, scope, shared_context)
+ })
}
}
@@ -130,13 +111,14 @@ fn top_down_dom<N, C>(unsafe_nodes: UnsafeNodeList,
///
/// The only communication between siblings is that they both
/// fetch-and-subtract the parent's children count.
+#[allow(unsafe_code)]
fn bottom_up_dom<N, C>(root: OpaqueNode,
unsafe_node: UnsafeNode,
- proxy: &mut WorkerProxy<C::SharedContext, UnsafeNodeList>)
+ shared_context: &C::SharedContext)
where N: TNode,
C: DomTraversalContext<N>
{
- let context = C::new(proxy.user_data(), root);
+ let context = C::new(shared_context, root);
// Get a real layout node.
let mut node = unsafe { N::from_unsafe(&unsafe_node) };
diff --git a/components/style/thread_state.rs b/components/style/thread_state.rs
index 12e52425f55..b0fbd5f4294 100644
--- a/components/style/thread_state.rs
+++ b/components/style/thread_state.rs
@@ -72,7 +72,8 @@ mod imp {
pub fn get() -> ThreadState {
let state = STATE.with(|ref k| {
match *k.borrow() {
- None => panic!("Thread state not initialized"),
+ // This is one of the layout threads, that use rayon.
+ None => super::LAYOUT | super::IN_WORKER,
Some(s) => s,
}
});
diff --git a/components/style/workqueue.rs b/components/style/workqueue.rs
deleted file mode 100644
index fc4f66ea120..00000000000
--- a/components/style/workqueue.rs
+++ /dev/null
@@ -1,385 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-//! A work queue for scheduling units of work across threads in a fork-join fashion.
-//!
-//! Data associated with queues is simply a pair of unsigned integers. It is expected that a
-//! higher-level API on top of this could allow safe fork-join parallelism.
-
-#![allow(unsafe_code)]
-
-#[cfg(windows)]
-extern crate kernel32;
-#[cfg(not(windows))]
-extern crate libc;
-
-use deque::{self, Abort, Data, Empty, Stealer, Worker};
-use rand::{Rng, XorShiftRng, weak_rng};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::mpsc::{Receiver, Sender, channel};
-use std::thread;
-use thread_state;
-
-/// A unit of work.
-///
-/// # Type parameters
-///
-/// - `QueueData`: global custom data for the entire work queue.
-/// - `WorkData`: custom data specific to each unit of work.
-pub struct WorkUnit<QueueData, WorkData: Send> {
- /// The function to execute.
- pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>),
- /// Arbitrary data.
- pub data: WorkData,
-}
-
-/// Messages from the supervisor to the worker.
-enum WorkerMsg<QueueData: 'static, WorkData: 'static + Send> {
- /// Tells the worker to start work.
- Start(Worker<WorkUnit<QueueData, WorkData>>, *const AtomicUsize, *const QueueData),
- /// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`.
- Stop,
- /// Tells the worker to measure the heap size of its TLS using the supplied function.
- HeapSizeOfTLS(fn() -> usize),
- /// Tells the worker thread to terminate.
- Exit,
-}
-
-unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerMsg<QueueData, WorkData> {}
-
-/// Messages to the supervisor.
-enum SupervisorMsg<QueueData: 'static, WorkData: 'static + Send> {
- Finished,
- HeapSizeOfTLS(usize),
- ReturnDeque(usize, Worker<WorkUnit<QueueData, WorkData>>),
-}
-
-unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for SupervisorMsg<QueueData, WorkData> {}
-
-/// Information that the supervisor thread keeps about the worker threads.
-struct WorkerInfo<QueueData: 'static, WorkData: 'static + Send> {
- /// The communication channel to the workers.
- chan: Sender<WorkerMsg<QueueData, WorkData>>,
- /// The worker end of the deque, if we have it.
- deque: Option<Worker<WorkUnit<QueueData, WorkData>>>,
- /// The thief end of the work-stealing deque.
- thief: Stealer<WorkUnit<QueueData, WorkData>>,
-}
-
-/// Information specific to each worker thread that the thread keeps.
-struct WorkerThread<QueueData: 'static, WorkData: 'static + Send> {
- /// The index of this worker.
- index: usize,
- /// The communication port from the supervisor.
- port: Receiver<WorkerMsg<QueueData, WorkData>>,
- /// The communication channel on which messages are sent to the supervisor.
- chan: Sender<SupervisorMsg<QueueData, WorkData>>,
- /// The thief end of the work-stealing deque for all other workers.
- other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>,
- /// The random number generator for this worker.
- rng: XorShiftRng,
-}
-
-unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerThread<QueueData, WorkData> {}
-
-const SPINS_UNTIL_BACKOFF: u32 = 128;
-const BACKOFF_INCREMENT_IN_US: u32 = 5;
-const BACKOFFS_UNTIL_CONTROL_CHECK: u32 = 6;
-
-#[cfg(not(windows))]
-fn sleep_microseconds(usec: u32) {
- unsafe {
- libc::usleep(usec);
- }
-}
-
-#[cfg(windows)]
-fn sleep_microseconds(_: u32) {
- unsafe {
- kernel32::Sleep(0);
- }
-}
-
-impl<QueueData: Sync, WorkData: Send> WorkerThread<QueueData, WorkData> {
- /// The main logic. This function starts up the worker and listens for
- /// messages.
- fn start(&mut self) {
- let deque_index_mask = (self.other_deques.len() as u32).next_power_of_two() - 1;
- loop {
- // Wait for a start message.
- let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() {
- WorkerMsg::Start(deque, ref_count, queue_data) => (deque, ref_count, queue_data),
- WorkerMsg::Stop => panic!("unexpected stop message"),
- WorkerMsg::Exit => return,
- WorkerMsg::HeapSizeOfTLS(f) => {
- self.chan.send(SupervisorMsg::HeapSizeOfTLS(f())).unwrap();
- continue;
- }
- };
-
- let mut back_off_sleep = 0 as u32;
-
- // We're off!
- 'outer: loop {
- let work_unit;
- match deque.pop() {
- Some(work) => work_unit = work,
- None => {
- // Become a thief.
- let mut i = 0;
- loop {
- // Don't just use `rand % len` because that's slow on ARM.
- let mut victim;
- loop {
- victim = self.rng.next_u32() & deque_index_mask;
- if (victim as usize) < self.other_deques.len() {
- break
- }
- }
-
- match self.other_deques[victim as usize].steal() {
- Empty | Abort => {
- // Continue.
- }
- Data(work) => {
- work_unit = work;
- back_off_sleep = 0 as u32;
- break
- }
- }
-
- if i > SPINS_UNTIL_BACKOFF {
- if back_off_sleep >= BACKOFF_INCREMENT_IN_US *
- BACKOFFS_UNTIL_CONTROL_CHECK {
- match self.port.try_recv() {
- Ok(WorkerMsg::Stop) => break 'outer,
- Ok(WorkerMsg::Exit) => return,
- Ok(_) => panic!("unexpected message"),
- _ => {}
- }
- }
-
- sleep_microseconds(back_off_sleep);
-
- back_off_sleep += BACKOFF_INCREMENT_IN_US;
- i = 0
- } else {
- i += 1
- }
- }
- }
- }
-
- // At this point, we have some work. Perform it.
- let mut proxy = WorkerProxy {
- worker: &mut deque,
- ref_count: ref_count,
- // queue_data is kept alive in the stack frame of
- // WorkQueue::run until we send the
- // SupervisorMsg::ReturnDeque message below.
- queue_data: unsafe { &*queue_data },
- worker_index: self.index as u8,
- };
- (work_unit.fun)(work_unit.data, &mut proxy);
-
- // The work is done. Now decrement the count of outstanding work items. If this was
- // the last work unit in the queue, then send a message on the channel.
- unsafe {
- if (*ref_count).fetch_sub(1, Ordering::Release) == 1 {
- self.chan.send(SupervisorMsg::Finished).unwrap()
- }
- }
- }
-
- // Give the deque back to the supervisor.
- self.chan.send(SupervisorMsg::ReturnDeque(self.index, deque)).unwrap()
- }
- }
-}
-
-/// A handle to the work queue that individual work units have.
-pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a + Send> {
- worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>,
- ref_count: *const AtomicUsize,
- queue_data: &'a QueueData,
- worker_index: u8,
-}
-
-impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData, WorkData> {
- /// Enqueues a block into the work queue.
- #[inline]
- pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
- unsafe {
- drop((*self.ref_count).fetch_add(1, Ordering::Relaxed));
- }
- self.worker.push(work_unit);
- }
-
- /// Retrieves the queue user data.
- #[inline]
- pub fn user_data(&self) -> &'a QueueData {
- self.queue_data
- }
-
- /// Retrieves the index of the worker.
- #[inline]
- pub fn worker_index(&self) -> u8 {
- self.worker_index
- }
-}
-
-/// A work queue on which units of work can be submitted.
-pub struct WorkQueue<QueueData: 'static, WorkData: 'static + Send> {
- /// Information about each of the workers.
- workers: Vec<WorkerInfo<QueueData, WorkData>>,
- /// A port on which deques can be received from the workers.
- port: Receiver<SupervisorMsg<QueueData, WorkData>>,
- /// The amount of work that has been enqueued.
- work_count: usize,
-}
-
-impl<QueueData: Sync, WorkData: Send> WorkQueue<QueueData, WorkData> {
- /// Creates a new work queue and spawns all the threads associated with
- /// it.
- pub fn new(thread_name: &'static str,
- state: thread_state::ThreadState,
- thread_count: usize) -> Result<WorkQueue<QueueData, WorkData>, ()> {
- // Set up data structures.
- let (supervisor_chan, supervisor_port) = channel();
- let mut infos = Vec::with_capacity(thread_count);
- let mut threads = Vec::with_capacity(thread_count);
- for i in 0..thread_count {
- let (worker_chan, worker_port) = channel();
- let (worker, thief) = deque::new();
- infos.push(WorkerInfo {
- chan: worker_chan,
- deque: Some(worker),
- thief: thief,
- });
- threads.push(WorkerThread {
- index: i,
- port: worker_port,
- chan: supervisor_chan.clone(),
- other_deques: vec!(),
- rng: weak_rng(),
- });
- }
-
- // Connect workers to one another.
- for (i, mut thread) in threads.iter_mut().enumerate() {
- for (j, info) in infos.iter().enumerate() {
- if i != j {
- thread.other_deques.push(info.thief.clone())
- }
- }
- assert!(thread.other_deques.len() == thread_count - 1)
- }
-
- // Spawn threads.
- let mut thread_handles = vec![];
- for (i, thread) in threads.into_iter().enumerate() {
- let handle = thread::Builder::new()
- .name(format!("{} worker {}/{}", thread_name, i + 1, thread_count))
- .spawn(move || {
- thread_state::initialize(state | thread_state::IN_WORKER);
- let mut thread = thread;
- thread.start()
- });
- match handle {
- Ok(handle) => {
- thread_handles.push(handle);
- }
- Err(err) => {
- warn!("Failed spawning thread: {:?}", err);
- break;
- }
- }
- }
-
- if thread_handles.len() != thread_count {
- // At least one worker thread failed to be created, just close the
- // rest of them, and return an error.
- for (i, handle) in thread_handles.into_iter().enumerate() {
- let _ = infos[i].chan.send(WorkerMsg::Exit);
- let _ = handle.join();
- }
-
- return Err(());
- }
-
- Ok(WorkQueue {
- workers: infos,
- port: supervisor_port,
- work_count: 0,
- })
- }
-
- /// Enqueues a block into the work queue.
- #[inline]
- pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
- let deque = &mut self.workers[0].deque;
- match *deque {
- None => {
- panic!("tried to push a block but we don't have the deque?!")
- }
- Some(ref mut deque) => deque.push(work_unit),
- }
- self.work_count += 1
- }
-
- /// Synchronously runs all the enqueued tasks and waits for them to complete.
- pub fn run(&mut self, data: &QueueData) {
- // Tell the workers to start.
- let work_count = AtomicUsize::new(self.work_count);
- for worker in &mut self.workers {
- worker.chan.send(WorkerMsg::Start(worker.deque.take().unwrap(),
- &work_count,
- data)).unwrap()
- }
-
- // Wait for the work to finish.
- drop(self.port.recv());
- self.work_count = 0;
-
- // Tell everyone to stop.
- for worker in &self.workers {
- worker.chan.send(WorkerMsg::Stop).unwrap()
- }
-
- // Get our deques back.
- for _ in 0..self.workers.len() {
- match self.port.recv().unwrap() {
- SupervisorMsg::ReturnDeque(index, deque) => self.workers[index].deque = Some(deque),
- SupervisorMsg::HeapSizeOfTLS(_) => panic!("unexpected HeapSizeOfTLS message"),
- SupervisorMsg::Finished => panic!("unexpected finished message!"),
- }
- }
- }
-
- /// Synchronously measure memory usage of any thread-local storage.
- pub fn heap_size_of_tls(&self, f: fn() -> usize) -> Vec<usize> {
- // Tell the workers to measure themselves.
- for worker in &self.workers {
- worker.chan.send(WorkerMsg::HeapSizeOfTLS(f)).unwrap()
- }
-
- // Wait for the workers to finish measuring themselves.
- let mut sizes = vec![];
- for _ in 0..self.workers.len() {
- match self.port.recv().unwrap() {
- SupervisorMsg::HeapSizeOfTLS(size) => {
- sizes.push(size);
- }
- _ => panic!("unexpected message!"),
- }
- }
- sizes
- }
-
- pub fn shutdown(&mut self) {
- for worker in &self.workers {
- worker.chan.send(WorkerMsg::Exit).unwrap()
- }
- }
-}