aboutsummaryrefslogtreecommitdiffstats
path: root/src/components
diff options
context:
space:
mode:
authorPatrick Walton <pcwalton@mimiga.net>2014-01-23 13:43:03 -0800
committerPatrick Walton <pcwalton@mimiga.net>2014-01-24 20:50:30 -0800
commit18a2050a64cd6f320cc59cb490a69b0e895f11d3 (patch)
tree0a27b979054bded5489300728f0ebcb6f5e4ed8d /src/components
parent86c29d253a6ffada3488cb08d0154d8901ec252e (diff)
downloadservo-18a2050a64cd6f320cc59cb490a69b0e895f11d3.tar.gz
servo-18a2050a64cd6f320cc59cb490a69b0e895f11d3.zip
layout: Port parallel layout over to a generic "work queue"
infrastructure. The work queue accepts abstract generic "work units", which in this case are layout operations. The same speedups have been observed.
Diffstat (limited to 'src/components')
-rw-r--r--src/components/main/compositing/compositor.rs3
-rw-r--r--src/components/main/layout/block.rs6
-rw-r--r--src/components/main/layout/box_.rs2
-rw-r--r--src/components/main/layout/construct.rs43
-rw-r--r--src/components/main/layout/context.rs41
-rw-r--r--src/components/main/layout/layout_task.rs79
-rw-r--r--src/components/main/layout/parallel.rs352
-rw-r--r--src/components/main/layout/text.rs17
-rwxr-xr-xsrc/components/main/servo.rc2
-rw-r--r--src/components/util/time.rs23
-rw-r--r--src/components/util/util.rc2
-rw-r--r--src/components/util/workqueue.rs295
12 files changed, 516 insertions, 349 deletions
diff --git a/src/components/main/compositing/compositor.rs b/src/components/main/compositing/compositor.rs
index 5d309f1b913..d6abea53368 100644
--- a/src/components/main/compositing/compositor.rs
+++ b/src/components/main/compositing/compositor.rs
@@ -198,6 +198,9 @@ impl IOCompositor {
// Drain compositor port, sometimes messages contain channels that are blocking
// another task from finishing (i.e. SetIds)
while self.port.try_recv().is_some() {}
+
+ // Tell the profiler to shut down.
+ self.profiler_chan.send(time::ExitMsg);
}
fn handle_message(&mut self) {
diff --git a/src/components/main/layout/block.rs b/src/components/main/layout/block.rs
index b7c945c0d93..5219fe9cd75 100644
--- a/src/components/main/layout/block.rs
+++ b/src/components/main/layout/block.rs
@@ -329,7 +329,7 @@ impl BlockFlow {
// top or bottom borders nor top or bottom padding, and it has a 'height' of either 0 or 'auto',
// and it does not contain a line box, and all of its in-flow children's margins (if any) collapse.
- let screen_height = ctx.shared.screen_size.height;
+ let screen_height = ctx.screen_size.height;
let mut height = if self.is_root {
// FIXME(pcwalton): The max is taken here so that you can scroll the page, but this is
@@ -627,7 +627,7 @@ impl Flow for BlockFlow {
if self.is_root {
debug!("Setting root position");
self.base.position.origin = Au::zero_point();
- self.base.position.size.width = ctx.shared.screen_size.width;
+ self.base.position.size.width = ctx.screen_size.width;
self.base.floats_in = FloatContext::new(self.base.num_floats);
self.base.flags_info.flags.set_inorder(false);
}
@@ -672,7 +672,7 @@ impl Flow for BlockFlow {
margin_bottom,
margin_left));
- let screen_size = ctx.shared.screen_size;
+ let screen_size = ctx.screen_size;
let (x, w) = box_.get_x_coord_and_new_width_if_fixed(screen_size.width,
screen_size.height,
width,
diff --git a/src/components/main/layout/box_.rs b/src/components/main/layout/box_.rs
index 297f70e5adf..66125587be9 100644
--- a/src/components/main/layout/box_.rs
+++ b/src/components/main/layout/box_.rs
@@ -1338,7 +1338,7 @@ impl Box {
iframe_box.pipeline_id,
iframe_box.subpage_id);
let msg = FrameRectMsg(iframe_box.pipeline_id, iframe_box.subpage_id, rect);
- layout_context.shared.constellation_chan.send(msg)
+ layout_context.constellation_chan.send(msg)
}
}
diff --git a/src/components/main/layout/construct.rs b/src/components/main/layout/construct.rs
index 80809899a89..bfc151fb012 100644
--- a/src/components/main/layout/construct.rs
+++ b/src/components/main/layout/construct.rs
@@ -32,6 +32,7 @@ use layout::text::TextRunScanner;
use layout::util::LayoutDataAccess;
use layout::wrapper::{LayoutNode, PostorderNodeMutTraversal};
+use gfx::font_context::FontContext;
use script::dom::element::{HTMLIframeElementTypeId, HTMLImageElementTypeId};
use script::dom::node::{CommentNodeTypeId, DoctypeNodeTypeId, DocumentFragmentNodeTypeId};
use script::dom::node::{DocumentNodeTypeId, ElementNodeTypeId, TextNodeTypeId};
@@ -209,14 +210,19 @@ pub struct FlowConstructor<'a> {
///
/// FIXME(pcwalton): This is going to have to be atomic; can't we do something better?
next_flow_id: RefCell<int>,
+
+ /// The font context.
+ font_context: ~FontContext,
}
impl<'fc> FlowConstructor<'fc> {
/// Creates a new flow constructor.
pub fn init<'a>(layout_context: &'a mut LayoutContext) -> FlowConstructor<'a> {
+ let font_context = ~FontContext::new(layout_context.font_context_info.clone());
FlowConstructor {
layout_context: layout_context,
next_flow_id: RefCell::new(0),
+ font_context: font_context,
}
}
@@ -235,7 +241,7 @@ impl<'fc> FlowConstructor<'fc> {
Some(url) => {
// FIXME(pcwalton): The fact that image boxes store the cache within them makes
// little sense to me.
- Some(ImageBoxInfo::new(&node, url, self.layout_context.shared.image_cache.clone()))
+ Some(ImageBoxInfo::new(&node, url, self.layout_context.image_cache.clone()))
}
}
}
@@ -262,20 +268,19 @@ impl<'fc> FlowConstructor<'fc> {
/// otherwise.
#[inline(always)]
fn flush_inline_boxes_to_flow(&mut self, boxes: ~[Box], flow: &mut ~Flow, node: LayoutNode) {
- if boxes.len() > 0 {
- let inline_base = BaseFlow::new(self.next_flow_id(), node);
-
- let mut inline_flow = ~InlineFlow::from_boxes(inline_base, boxes) as ~Flow;
-
- self.layout_context.shared.leaf_set.access(|leaf_set| leaf_set.insert(&inline_flow));
+ if boxes.len() == 0 {
+ return
+ }
- TextRunScanner::new().scan_for_runs(self.layout_context, inline_flow);
- let mut inline_flow = Some(inline_flow);
+ let inline_base = BaseFlow::new(self.next_flow_id(), node);
+ let mut inline_flow = ~InlineFlow::from_boxes(inline_base, boxes) as ~Flow;
+ self.layout_context.leaf_set.access(|leaf_set| leaf_set.insert(&inline_flow));
+ TextRunScanner::new().scan_for_runs(self.font_context, inline_flow);
- self.layout_context.shared.leaf_set.access(|leaf_set| {
- flow.add_new_child(inline_flow.take_unwrap(), leaf_set)
- })
- }
+ let mut inline_flow = Some(inline_flow);
+ self.layout_context.leaf_set.access(|leaf_set| {
+ flow.add_new_child(inline_flow.take_unwrap(), leaf_set)
+ })
}
/// Creates an inline flow from a set of inline boxes, if present, and adds it as a child of
@@ -319,7 +324,7 @@ impl<'fc> FlowConstructor<'fc> {
flow,
node);
let mut kid_flow = Some(kid_flow);
- self.layout_context.shared.leaf_set.access(|leaf_set| {
+ self.layout_context.leaf_set.access(|leaf_set| {
flow.add_new_child(kid_flow.take_unwrap(), leaf_set)
})
}
@@ -362,7 +367,7 @@ impl<'fc> FlowConstructor<'fc> {
// Push the flow generated by the {ib} split onto our list of
// flows.
let mut kid_flow = Some(kid_flow);
- self.layout_context.shared.leaf_set.access(|leaf_set| {
+ self.layout_context.leaf_set.access(|leaf_set| {
flow.add_new_child(kid_flow.take_unwrap(), leaf_set)
})
}
@@ -391,7 +396,7 @@ impl<'fc> FlowConstructor<'fc> {
let box_ = self.build_box_for_node(node);
let mut flow = ~BlockFlow::from_box(base, box_, is_fixed) as ~Flow;
- self.layout_context.shared.leaf_set.access(|leaf_set| leaf_set.insert(&flow));
+ self.layout_context.leaf_set.access(|leaf_set| leaf_set.insert(&flow));
self.build_children_of_block_flow(&mut flow, node);
flow
@@ -406,7 +411,7 @@ impl<'fc> FlowConstructor<'fc> {
let mut flow = ~BlockFlow::float_from_box(base, float_type, box_) as ~Flow;
- self.layout_context.shared.leaf_set.access(|leaf_set| leaf_set.insert(&flow));
+ self.layout_context.leaf_set.access(|leaf_set| leaf_set.insert(&flow));
self.build_children_of_block_flow(&mut flow, node);
flow
@@ -484,7 +489,7 @@ impl<'fc> FlowConstructor<'fc> {
fn set_inline_info_for_inline_child(&mut self, boxes: &mut ~[Box], parent_node: LayoutNode) {
let parent_box = self.build_box_for_node(parent_node);
let font_style = parent_box.font_style();
- let font_group = self.layout_context.font_ctx.get_resolved_font_for_style(&font_style);
+ let font_group = self.font_context.get_resolved_font_for_style(&font_style);
let (font_ascent,font_descent) = font_group.borrow().with_mut( |fg| {
fg.fonts[0].borrow().with_mut( |font| {
(font.metrics.ascent,font.metrics.descent)
@@ -569,7 +574,7 @@ impl<'a> PostorderNodeMutTraversal for FlowConstructor<'a> {
// `display: none` contributes no flow construction result. Nuke the flow construction
// results of children.
(display::none, _, _) => {
- self.layout_context.shared.leaf_set.access(|leaf_set| {
+ self.layout_context.leaf_set.access(|leaf_set| {
for child in node.children() {
let mut old_result = child.swap_out_construction_result();
old_result.destroy(leaf_set)
diff --git a/src/components/main/layout/context.rs b/src/components/main/layout/context.rs
index 4b62d4236d1..fb797196e71 100644
--- a/src/components/main/layout/context.rs
+++ b/src/components/main/layout/context.rs
@@ -5,17 +5,26 @@
//! Data needed by the layout task.
use extra::arc::MutexArc;
+use green::task::GreenTask;
use layout::flow::LeafSet;
+use std::cast;
+use std::ptr;
+use std::rt::Runtime;
+use std::rt::local::Local;
+use std::rt::task::Task;
use geom::size::Size2D;
-use gfx::font_context::FontContext;
+use gfx::font_context::{FontContext, FontContextInfo};
use servo_msg::constellation_msg::ConstellationChan;
use servo_net::local_image_cache::LocalImageCache;
use servo_util::geometry::Au;
+#[thread_local]
+static mut FONT_CONTEXT: *mut FontContext = 0 as *mut FontContext;
+
/// Data shared by all layout workers.
#[deriving(Clone)]
-pub struct SharedLayoutInfo {
+pub struct LayoutContext {
/// The local image cache.
image_cache: MutexArc<LocalImageCache>,
@@ -27,14 +36,30 @@ pub struct SharedLayoutInfo {
/// The set of leaf flows.
leaf_set: MutexArc<LeafSet>,
+
+ /// Information needed to construct a font context.
+ font_context_info: FontContextInfo,
}
-/// Data specific to a layout worker.
-pub struct LayoutContext {
- /// Shared layout info.
- shared: SharedLayoutInfo,
+impl LayoutContext {
+ pub fn font_context<'a>(&'a mut self) -> &'a mut FontContext {
+ // Sanity check.
+ let mut task = Local::borrow(None::<Task>);
+ match task.get().maybe_take_runtime::<GreenTask>() {
+ Some(green) => {
+ task.get().put_runtime(green as ~Runtime);
+ fail!("can't call this on a green task!")
+ }
+ None => {}
+ }
- /// The current font context.
- font_ctx: ~FontContext,
+ unsafe {
+ if FONT_CONTEXT == ptr::mut_null() {
+ let context = ~FontContext::new(self.font_context_info.clone());
+ FONT_CONTEXT = cast::transmute(context)
+ }
+ cast::transmute(FONT_CONTEXT)
+ }
+ }
}
diff --git a/src/components/main/layout/layout_task.rs b/src/components/main/layout/layout_task.rs
index 4a10dac03c3..6ad34addd76 100644
--- a/src/components/main/layout/layout_task.rs
+++ b/src/components/main/layout/layout_task.rs
@@ -9,7 +9,7 @@ use css::matching::MatchMethods;
use css::select::new_stylist;
use css::node_style::StyledNode;
use layout::construct::{FlowConstructionResult, FlowConstructor, NoConstructionResult};
-use layout::context::{LayoutContext, SharedLayoutInfo};
+use layout::context::LayoutContext;
use layout::display_list_builder::{DisplayListBuilder, ToGfxColor};
use layout::extra::LayoutAuxMethods;
use layout::flow::{Flow, ImmutableFlowUtils, LeafSet, MutableFlowUtils, MutableOwnedFlowUtils};
@@ -17,7 +17,8 @@ use layout::flow::{PreorderFlowTraversal, PostorderFlowTraversal};
use layout::flow;
use layout::incremental::RestyleDamage;
use layout::parallel::{AssignHeightsAndStoreOverflowTraversalKind, BubbleWidthsTraversalKind};
-use layout::parallel::{ParallelPostorderFlowTraversal};
+use layout::parallel::{UnsafeFlow};
+use layout::parallel;
use layout::util::{LayoutDataAccess, OpaqueNode, LayoutDataWrapper};
use layout::wrapper::LayoutNode;
@@ -25,7 +26,7 @@ use extra::arc::{Arc, MutexArc, RWArc};
use geom::rect::Rect;
use geom::size::Size2D;
use gfx::display_list::{ClipDisplayItemClass, DisplayItem, DisplayItemIterator, DisplayList};
-use gfx::font_context::{FontContext, FontContextInfo};
+use gfx::font_context::FontContextInfo;
use gfx::opts::Opts;
use gfx::render_task::{RenderMsg, RenderChan, RenderLayer};
use gfx::{render_task, color};
@@ -46,10 +47,12 @@ use servo_util::geometry::Au;
use servo_util::time::{ProfilerChan, profile};
use servo_util::time;
use servo_util::task::spawn_named;
+use servo_util::workqueue::WorkQueue;
use std::cast::transmute;
use std::cast;
use std::cell::RefCell;
use std::comm::Port;
+use std::ptr;
use std::util;
use style::{AuthorOrigin, Stylesheet, Stylist};
@@ -91,7 +94,7 @@ pub struct LayoutTask {
stylist: RWArc<Stylist>,
/// The workers that we use for parallel operation.
- parallel_traversal: Option<ParallelPostorderFlowTraversal>,
+ parallel_traversal: Option<WorkQueue<*mut LayoutContext,UnsafeFlow>>,
/// The channel on which messages can be sent to the profiler.
profiler_chan: ProfilerChan,
@@ -142,12 +145,14 @@ impl PreorderFlowTraversal for PropagateDamageTraversal {
/// The bubble-widths traversal, the first part of layout computation. This computes preferred
/// and intrinsic widths and bubbles them up the tree.
-pub struct BubbleWidthsTraversal<'a>(&'a mut LayoutContext);
+pub struct BubbleWidthsTraversal<'a> {
+ layout_context: &'a mut LayoutContext,
+}
impl<'a> PostorderFlowTraversal for BubbleWidthsTraversal<'a> {
#[inline]
fn process(&mut self, flow: &mut Flow) -> bool {
- flow.bubble_widths(**self);
+ flow.bubble_widths(self.layout_context);
true
}
@@ -174,13 +179,15 @@ impl<'a> PreorderFlowTraversal for AssignWidthsTraversal<'a> {
/// The assign-heights-and-store-overflow traversal, the last (and most expensive) part of layout
/// computation. Determines the final heights for all layout objects, computes positions, and
/// computes overflow regions. In Gecko this corresponds to `FinishAndStoreOverflow`.
-pub struct AssignHeightsAndStoreOverflowTraversal<'a>(&'a mut LayoutContext);
+pub struct AssignHeightsAndStoreOverflowTraversal<'a> {
+ layout_context: &'a mut LayoutContext,
+}
impl<'a> PostorderFlowTraversal for AssignHeightsAndStoreOverflowTraversal<'a> {
#[inline]
fn process(&mut self, flow: &mut Flow) -> bool {
- flow.assign_height(**self);
- flow.store_overflow(**self);
+ flow.assign_height(self.layout_context);
+ flow.store_overflow(self.layout_context);
true
}
@@ -248,13 +255,8 @@ impl LayoutTask {
-> LayoutTask {
let local_image_cache = MutexArc::new(LocalImageCache(image_cache_task.clone()));
let screen_size = Size2D(Au(0), Au(0));
- let font_context_info = FontContextInfo {
- backend: opts.render_backend,
- needs_font_list: true,
- profiler_chan: profiler_chan.clone(),
- };
let parallel_traversal = if opts.layout_threads != 1 {
- Some(ParallelPostorderFlowTraversal::new(font_context_info, opts.layout_threads))
+ Some(WorkQueue::new(opts.layout_threads, ptr::mut_null()))
} else {
None
};
@@ -288,20 +290,18 @@ impl LayoutTask {
// Create a layout context for use in building display lists, hit testing, &c.
fn build_layout_context(&self) -> LayoutContext {
- let font_ctx = ~FontContext::new(FontContextInfo {
+ let font_context_info = FontContextInfo {
backend: self.opts.render_backend,
needs_font_list: true,
profiler_chan: self.profiler_chan.clone(),
- });
+ };
LayoutContext {
- shared: SharedLayoutInfo {
- image_cache: self.local_image_cache.clone(),
- screen_size: self.screen_size.clone(),
- constellation_chan: self.constellation_chan.clone(),
- leaf_set: self.leaf_set.clone(),
- },
- font_ctx: font_ctx,
+ image_cache: self.local_image_cache.clone(),
+ screen_size: self.screen_size.clone(),
+ constellation_chan: self.constellation_chan.clone(),
+ leaf_set: self.leaf_set.clone(),
+ font_context_info: font_context_info,
}
}
@@ -418,7 +418,12 @@ impl LayoutTask {
fn solve_constraints(&mut self,
layout_root: &mut Flow,
layout_context: &mut LayoutContext) {
- layout_root.traverse_postorder(&mut BubbleWidthsTraversal(layout_context));
+ {
+ let mut traversal = BubbleWidthsTraversal {
+ layout_context: layout_context,
+ };
+ layout_root.traverse_postorder(&mut traversal);
+ }
// FIXME(kmc): We want to prune nodes without the Reflow restyle damage
// bit, but FloatContext values can't be reused, so we need to
@@ -428,8 +433,12 @@ impl LayoutTask {
layout_root.traverse_preorder(&mut AssignWidthsTraversal(layout_context));
// FIXME(pcwalton): Prune this pass as well.
- layout_root.traverse_postorder(&mut AssignHeightsAndStoreOverflowTraversal(
- layout_context));
+ {
+ let mut traversal = AssignHeightsAndStoreOverflowTraversal {
+ layout_context: layout_context,
+ };
+ layout_root.traverse_postorder(&mut traversal);
+ }
}
/// Performs layout constraint solving in parallel.
@@ -443,9 +452,11 @@ impl LayoutTask {
match self.parallel_traversal {
None => fail!("solve_contraints_parallel() called with no parallel traversal ready"),
Some(ref mut traversal) => {
- traversal.start(BubbleWidthsTraversalKind,
- layout_context,
- self.profiler_chan.clone());
+ parallel::traverse_flow_tree(BubbleWidthsTraversalKind,
+ &self.leaf_set,
+ self.profiler_chan.clone(),
+ layout_context,
+ traversal);
// NOTE: this currently computes borders, so any pruning should separate that
// operation out.
@@ -453,9 +464,11 @@ impl LayoutTask {
// because this is a top-down traversal, unlike the others.
layout_root.traverse_preorder(&mut AssignWidthsTraversal(layout_context));
- traversal.start(AssignHeightsAndStoreOverflowTraversalKind,
- layout_context,
- self.profiler_chan.clone());
+ parallel::traverse_flow_tree(AssignHeightsAndStoreOverflowTraversalKind,
+ &self.leaf_set,
+ self.profiler_chan.clone(),
+ layout_context,
+ traversal);
}
}
}
diff --git a/src/components/main/layout/parallel.rs b/src/components/main/layout/parallel.rs
index 480ec886d8f..b38e6abc9b5 100644
--- a/src/components/main/layout/parallel.rs
+++ b/src/components/main/layout/parallel.rs
@@ -4,44 +4,29 @@
//! Implements parallel traversals over the flow tree.
-use layout::context::{LayoutContext, SharedLayoutInfo};
-use layout::flow::{Flow, PostorderFlowTraversal};
+use layout::context::LayoutContext;
+use layout::flow::{Flow, LeafSet, PostorderFlowTraversal};
use layout::flow;
use layout::layout_task::{AssignHeightsAndStoreOverflowTraversal, BubbleWidthsTraversal};
-use gfx::font_context::{FontContext, FontContextInfo};
-use native;
+use extra::arc::MutexArc;
use servo_util::time::{ProfilerChan, profile};
use servo_util::time;
+use servo_util::workqueue::{WorkQueue, WorkUnit, WorkerProxy};
use std::cast;
-use std::comm::SharedChan;
-use std::libc::c_void;
-use std::ptr;
use std::sync::atomics::{AtomicInt, Relaxed, SeqCst};
-use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
-use std::util;
-enum WorkerMsg {
- /// Tells the worker to start a traversal.
- StartMsg(Worker<UnsafeFlow>, TraversalKind, SharedLayoutInfo),
-
- /// Tells the worker to stop. It can be restarted again with a `StartMsg`.
- StopMsg,
-
- /// Tells the worker thread to terminate.
- ExitMsg,
+pub enum TraversalKind {
+ BubbleWidthsTraversalKind,
+ AssignHeightsAndStoreOverflowTraversalKind,
}
-enum SupervisorMsg {
- /// Sent once the last flow is processed.
- FinishedMsg,
+pub type UnsafeFlow = (uint, uint);
- /// Returns the deque to the supervisor.
- ReturnDequeMsg(uint, Worker<UnsafeFlow>),
+fn null_unsafe_flow() -> UnsafeFlow {
+ (0, 0)
}
-pub type UnsafeFlow = (*c_void, *c_void);
-
pub fn owned_flow_to_unsafe_flow(flow: *~Flow) -> UnsafeFlow {
unsafe {
cast::transmute_copy(&*flow)
@@ -54,10 +39,6 @@ pub fn mut_owned_flow_to_unsafe_flow(flow: *mut ~Flow) -> UnsafeFlow {
}
}
-fn null_unsafe_flow() -> UnsafeFlow {
- (ptr::null(), ptr::null())
-}
-
/// Information that we need stored in each flow.
pub struct FlowParallelInfo {
/// The number of children that still need work done.
@@ -75,270 +56,97 @@ impl FlowParallelInfo {
}
}
-/// Information that the supervisor thread keeps about the worker threads.
-struct WorkerInfo {
- /// The communication channel to the workers.
- chan: Chan<WorkerMsg>,
- /// The buffer pool for this deque.
- pool: BufferPool<UnsafeFlow>,
- /// The worker end of the deque, if we have it.
- deque: Option<Worker<UnsafeFlow>>,
- /// The thief end of the work-stealing deque.
- thief: Stealer<UnsafeFlow>,
-}
-
-/// Information that each worker needs to do its job.
-struct PostorderWorker {
- /// The font context.
- font_context: Option<~FontContext>,
- /// Communications for the worker.
- comm: PostorderWorkerComm,
-}
-
-/// Communication channels for postorder workers.
-struct PostorderWorkerComm {
- /// The index of this worker.
- index: uint,
- /// The communication port from the supervisor.
- port: Port<WorkerMsg>,
- /// The communication channel to the supervisor.
- chan: SharedChan<SupervisorMsg>,
- /// The thief end of the work-stealing deque for all other workers.
- other_deques: ~[Stealer<UnsafeFlow>],
-}
-
-/// The type of traversal we're performing.
-pub enum TraversalKind {
- BubbleWidthsTraversalKind,
- AssignHeightsAndStoreOverflowTraversalKind,
-}
-
-impl PostorderWorker {
- /// Starts up the worker and listens for messages.
- pub fn start(&mut self) {
+/// A parallel bottom-up flow traversal.
+trait ParallelPostorderFlowTraversal : PostorderFlowTraversal {
+ fn run_parallel(&mut self, mut unsafe_flow: UnsafeFlow) {
loop {
- // Wait for a start message.
- let (mut deque, kind, shared_layout_info) = match self.comm.port.recv() {
- StopMsg => fail!("unexpected stop message"),
- StartMsg(deque, kind, shared_layout_info) => (deque, kind, shared_layout_info),
- ExitMsg => return,
- };
+ unsafe {
+ // Get a real flow.
+ let flow: &mut ~Flow = cast::transmute(&unsafe_flow);
- // Set up our traversal context.
- let mut traversal = LayoutContext {
- shared: shared_layout_info,
- font_ctx: self.font_context.take_unwrap(),
- };
-
- // And we're off!
- 'outer: loop {
- let unsafe_flow;
- match deque.pop() {
- Some(the_flow) => unsafe_flow = the_flow,
- None => {
- // Become a thief.
- let mut i = 0;
- loop {
- if self.comm.other_deques.len() != 0 {
- match self.comm.other_deques[i].steal() {
- Empty => {
- // Try the next one.
- i += 1;
- if i >= self.comm.other_deques.len() {
- i = 0
- }
- }
- Abort => {
- // Continue.
- }
- Data(the_flow) => {
- unsafe_flow = the_flow;
- break
- }
- }
- }
-
- if i == 0 {
- match self.comm.port.try_recv() {
- Some(StopMsg) => break 'outer,
- Some(ExitMsg) => return,
- Some(_) => fail!("unexpected message!"),
- None => {}
- }
- }
- }
- }
+ // Perform the appropriate traversal.
+ if self.should_process(*flow) {
+ self.process(*flow);
}
- // OK, we've got some data. The rest of this is unsafe code.
- unsafe {
- // Get a real flow.
- let flow: &mut ~Flow = cast::transmute(&unsafe_flow);
-
- // Perform the appropriate traversal.
- match kind {
- BubbleWidthsTraversalKind => {
- let mut traversal = BubbleWidthsTraversal(&mut traversal);
- if traversal.should_process(*flow) {
- traversal.process(*flow);
- }
- }
- AssignHeightsAndStoreOverflowTraversalKind => {
- let mut traversal =
- AssignHeightsAndStoreOverflowTraversal(&mut traversal);
- if traversal.should_process(*flow) {
- traversal.process(*flow);
- }
- }
- }
+ let base = flow::mut_base(*flow);
- let base = flow::mut_base(*flow);
+ // Reset the count of children for the next layout traversal.
+ base.parallel.children_count.store(base.children.len() as int, Relaxed);
- // Reset the count of children for the next layout traversal.
- base.parallel.children_count.store(base.children.len() as int, Relaxed);
+ // Possibly enqueue the parent.
+ let unsafe_parent = base.parallel.parent;
+ if unsafe_parent == null_unsafe_flow() {
+ // We're done!
+ break
+ }
- // Possibly enqueue the parent.
- let unsafe_parent = base.parallel.parent;
- if unsafe_parent == null_unsafe_flow() {
- // We're done!
- self.comm.chan.send(FinishedMsg);
- } else {
- // No, we're not at the root yet. Then are we the last sibling of our
- // parent? If so, we can enqueue our parent; otherwise, we've gotta wait.
- let parent: &mut ~Flow = cast::transmute(&unsafe_parent);
- let parent_base = flow::mut_base(*parent);
- if parent_base.parallel.children_count.fetch_sub(1, SeqCst) == 1 {
- // We were the last child of our parent. Enqueue the parent.
- deque.push(unsafe_parent)
- }
- }
+ // No, we're not at the root yet. Then are we the last sibling of our parent? If
+ // so, we can continue on with our parent; otherwise, we've gotta wait.
+ let parent: &mut ~Flow = cast::transmute(&unsafe_parent);
+ let parent_base = flow::mut_base(*parent);
+ if parent_base.parallel.children_count.fetch_sub(1, SeqCst) == 1 {
+ // We were the last child of our parent. Reflow our parent.
+ unsafe_flow = unsafe_parent
+ } else {
+ // Stop.
+ break
}
}
-
- // Destroy the traversal and save the font context.
- let LayoutContext {
- font_ctx: font_context,
- ..
- } = traversal;
- self.font_context = Some(font_context);
-
- // Give the deque back to the supervisor.
- self.comm.chan.send(ReturnDequeMsg(self.comm.index, deque))
}
}
}
-/// A parallel bottom-up traversal.
-pub struct ParallelPostorderFlowTraversal {
- /// Information about each of the workers.
- workers: ~[WorkerInfo],
- /// A port on which information can be received from the workers.
- port: Port<SupervisorMsg>,
-}
+impl<'a> ParallelPostorderFlowTraversal for BubbleWidthsTraversal<'a> {}
-impl ParallelPostorderFlowTraversal {
- pub fn new(font_context_info: FontContextInfo, thread_count: uint)
- -> ParallelPostorderFlowTraversal {
- let (supervisor_port, supervisor_chan) = SharedChan::new();
- let (mut infos, mut comms) = (~[], ~[]);
- for i in range(0, thread_count) {
- let (worker_port, worker_chan) = Chan::new();
- let mut pool = BufferPool::new();
- let (worker, thief) = pool.deque();
- infos.push(WorkerInfo {
- chan: worker_chan,
- pool: pool,
- deque: Some(worker),
- thief: thief,
- });
- comms.push(PostorderWorkerComm {
- index: i,
- port: worker_port,
- chan: supervisor_chan.clone(),
- other_deques: ~[],
- });
- }
+impl<'a> ParallelPostorderFlowTraversal for AssignHeightsAndStoreOverflowTraversal<'a> {}
- for i in range(0, thread_count) {
- for j in range(0, thread_count) {
- if i != j {
- comms[i].other_deques.push(infos[j].thief.clone())
- }
- }
- assert!(comms[i].other_deques.len() == thread_count - 1)
- }
-
- for comm in comms.move_iter() {
- let font_context_info = font_context_info.clone();
- native::task::spawn(proc() {
- let mut worker = PostorderWorker {
- font_context: Some(~FontContext::new(font_context_info)),
- comm: comm,
- };
- worker.start()
- })
- }
+fn bubble_widths(unsafe_flow: UnsafeFlow, proxy: &mut WorkerProxy<*mut LayoutContext,UnsafeFlow>) {
+ let layout_context: &mut LayoutContext = unsafe {
+ cast::transmute(*proxy.user_data())
+ };
+ let mut bubble_widths_traversal = BubbleWidthsTraversal {
+ layout_context: layout_context,
+ };
+ bubble_widths_traversal.run_parallel(unsafe_flow)
+}
- ParallelPostorderFlowTraversal {
- workers: infos,
- port: supervisor_port,
- }
- }
+fn assign_heights_and_store_overflow(unsafe_flow: UnsafeFlow,
+ proxy: &mut WorkerProxy<*mut LayoutContext,UnsafeFlow>) {
+ let layout_context: &mut LayoutContext = unsafe {
+ cast::transmute(*proxy.user_data())
+ };
+ let mut assign_heights_traversal = AssignHeightsAndStoreOverflowTraversal {
+ layout_context: layout_context,
+ };
+ assign_heights_traversal.run_parallel(unsafe_flow)
+}
- /// TODO(pcwalton): This could be parallelized.
- fn warmup(&mut self, layout_context: &mut LayoutContext) {
- layout_context.shared.leaf_set.access(|leaf_set| {
- for &flow in leaf_set.iter() {
- match self.workers[0].deque {
- None => fail!("no deque!"),
- Some(ref mut deque) => {
- deque.push(flow);
- }
- }
- }
- });
+pub fn traverse_flow_tree(kind: TraversalKind,
+ leaf_set: &MutexArc<LeafSet>,
+ profiler_chan: ProfilerChan,
+ layout_context: &mut LayoutContext,
+ queue: &mut WorkQueue<*mut LayoutContext,UnsafeFlow>) {
+ unsafe {
+ queue.data = cast::transmute(layout_context)
}
- /// Traverses the given flow tree in parallel.
- pub fn start(&mut self,
- kind: TraversalKind,
- layout_context: &mut LayoutContext,
- profiler_chan: ProfilerChan) {
- profile(time::LayoutParallelWarmupCategory, profiler_chan, || self.warmup(layout_context));
-
- for worker in self.workers.mut_iter() {
- worker.chan.send(StartMsg(util::replace(&mut worker.deque, None).unwrap(),
- kind,
- layout_context.shared.clone()))
- }
-
- // Wait for them to finish.
- let _ = self.port.recv();
-
- // Tell everyone to stop.
- for worker in self.workers.iter() {
- worker.chan.send(StopMsg);
- }
+ let fun = match kind {
+ BubbleWidthsTraversalKind => bubble_widths,
+ AssignHeightsAndStoreOverflowTraversalKind => assign_heights_and_store_overflow,
+ };
- // Get our deques back.
- //
- // TODO(pcwalton): Might be able to get a little parallelism over multiple traversals by
- // doing this lazily.
- for _ in range(0, self.workers.len()) {
- match self.port.recv() {
- ReturnDequeMsg(returned_deque_index, returned_deque) => {
- self.workers[returned_deque_index].deque = Some(returned_deque)
- }
- _ => fail!("unexpected message received during return queue phase"),
+ profile(time::LayoutParallelWarmupCategory, profiler_chan, || {
+ leaf_set.access(|leaf_set| {
+ for &flow in leaf_set.iter() {
+ queue.push(WorkUnit {
+ fun: fun,
+ data: flow,
+ })
}
- }
- }
+ })
+ });
- /// Shuts down all the worker threads.
- pub fn shutdown(&mut self) {
- for worker in self.workers.iter() {
- worker.chan.send(ExitMsg)
- }
- }
+ queue.run()
}
diff --git a/src/components/main/layout/text.rs b/src/components/main/layout/text.rs
index e4ad1f8bc57..b8913c6499e 100644
--- a/src/components/main/layout/text.rs
+++ b/src/components/main/layout/text.rs
@@ -5,10 +5,10 @@
//! Text layout.
use layout::box_::{Box, ScannedTextBox, ScannedTextBoxInfo, UnscannedTextBox};
-use layout::context::LayoutContext;
use layout::flow::Flow;
use extra::arc::Arc;
+use gfx::font_context::FontContext;
use gfx::text::text_run::TextRun;
use gfx::text::util::{CompressWhitespaceNewline, transform_text, CompressNone};
use servo_util::range::Range;
@@ -27,7 +27,7 @@ impl TextRunScanner {
}
}
- pub fn scan_for_runs(&mut self, ctx: &mut LayoutContext, flow: &mut Flow) {
+ pub fn scan_for_runs(&mut self, font_context: &mut FontContext, flow: &mut Flow) {
{
let inline = flow.as_immutable_inline();
debug!("TextRunScanner: scanning {:u} boxes for text runs...", inline.boxes.len());
@@ -40,13 +40,16 @@ impl TextRunScanner {
if box_i > 0 && !can_coalesce_text_nodes(flow.as_immutable_inline().boxes,
box_i - 1,
box_i) {
- last_whitespace = self.flush_clump_to_list(ctx, flow, last_whitespace, &mut out_boxes);
+ last_whitespace = self.flush_clump_to_list(font_context,
+ flow,
+ last_whitespace,
+ &mut out_boxes);
}
self.clump.extend_by(1);
}
// handle remaining clumps
if self.clump.length() > 0 {
- self.flush_clump_to_list(ctx, flow, last_whitespace, &mut out_boxes);
+ self.flush_clump_to_list(font_context, flow, last_whitespace, &mut out_boxes);
}
debug!("TextRunScanner: swapping out boxes.");
@@ -73,7 +76,7 @@ impl TextRunScanner {
/// FIXME(pcwalton): Stop cloning boxes. Instead we will need to consume the `in_box`es as we
/// iterate over them.
pub fn flush_clump_to_list(&mut self,
- ctx: &mut LayoutContext,
+ font_context: &mut FontContext,
flow: &mut Flow,
last_whitespace: bool,
out_boxes: &mut ~[Box])
@@ -130,7 +133,7 @@ impl TextRunScanner {
// TODO(#177): Text run creation must account for the renderability of text by
// font group fonts. This is probably achieved by creating the font group above
// and then letting `FontGroup` decide which `Font` to stick into the text run.
- let fontgroup = ctx.font_ctx.get_resolved_font_for_style(&font_style);
+ let fontgroup = font_context.get_resolved_font_for_style(&font_style);
let run = ~fontgroup.borrow().with(|fg| fg.create_textrun(transformed_text.clone(), decoration));
debug!("TextRunScanner: pushing single text box in range: {} ({})",
@@ -151,7 +154,7 @@ impl TextRunScanner {
// and then letting `FontGroup` decide which `Font` to stick into the text run.
let in_box = &in_boxes[self.clump.begin()];
let font_style = in_box.font_style();
- let fontgroup = ctx.font_ctx.get_resolved_font_for_style(&font_style);
+ let fontgroup = font_context.get_resolved_font_for_style(&font_style);
let decoration = in_box.text_decoration();
// TODO(#115): Use the actual CSS `white-space` property of the relevant style.
diff --git a/src/components/main/servo.rc b/src/components/main/servo.rc
index 60f6902a591..342aacae57f 100755
--- a/src/components/main/servo.rc
+++ b/src/components/main/servo.rc
@@ -6,7 +6,7 @@
#[comment = "The Servo Parallel Browser Project"];
#[license = "MPL"];
-#[feature(globs, macro_rules, managed_boxes)];
+#[feature(globs, macro_rules, managed_boxes, thread_local)];
extern mod alert;
extern mod azure;
diff --git a/src/components/util/time.rs b/src/components/util/time.rs
index b4ac13ef095..426cfad705a 100644
--- a/src/components/util/time.rs
+++ b/src/components/util/time.rs
@@ -37,10 +37,12 @@ impl ProfilerChan {
}
pub enum ProfilerMsg {
- // Normal message used for reporting time
+ /// Normal message used for reporting time
TimeMsg(ProfilerCategory, f64),
- // Message used to force print the profiling metrics
+ /// Message used to force print the profiling metrics
PrintMsg,
+ /// Tells the profiler to shut down.
+ ExitMsg,
}
#[deriving(Eq, Clone, TotalEq, TotalOrd)]
@@ -136,7 +138,12 @@ impl Profiler {
None => {
// no-op to handle profiler messages when the profiler is inactive
spawn_named("Profiler", proc() {
- while port.recv_opt().is_some() {}
+ loop {
+ match port.recv_opt() {
+ None | Some(ExitMsg) => break,
+ _ => {}
+ }
+ }
});
}
}
@@ -156,13 +163,17 @@ impl Profiler {
loop {
let msg = self.port.recv_opt();
match msg {
- Some (msg) => self.handle_msg(msg),
+ Some(msg) => {
+ if !self.handle_msg(msg) {
+ break
+ }
+ }
None => break
}
}
}
- fn handle_msg(&mut self, msg: ProfilerMsg) {
+ fn handle_msg(&mut self, msg: ProfilerMsg) -> bool {
match msg {
TimeMsg(category, t) => self.buckets.find_mut(&category).unwrap().push(t),
PrintMsg => match self.last_msg {
@@ -170,8 +181,10 @@ impl Profiler {
Some(TimeMsg(..)) => self.print_buckets(),
_ => ()
},
+ ExitMsg => return false,
};
self.last_msg = Some(msg);
+ true
}
fn print_buckets(&mut self) {
diff --git a/src/components/util/util.rc b/src/components/util/util.rc
index 50af03c67a7..1fb520c0c7b 100644
--- a/src/components/util/util.rc
+++ b/src/components/util/util.rc
@@ -9,6 +9,7 @@
extern mod extra;
extern mod geom;
+extern mod native;
pub mod cache;
pub mod geometry;
@@ -19,4 +20,5 @@ pub mod vec;
pub mod debug;
pub mod io;
pub mod task;
+pub mod workqueue;
diff --git a/src/components/util/workqueue.rs b/src/components/util/workqueue.rs
new file mode 100644
index 00000000000..0254b0c573f
--- /dev/null
+++ b/src/components/util/workqueue.rs
@@ -0,0 +1,295 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! A work queue for scheduling units of work across threads in a fork-join fashion.
+//!
+//! Data associated with queues is simply a pair of unsigned integers. It is expected that a
+//! higher-level API on top of this could allow safe fork-join parallelism.
+
+use native;
+use std::cast;
+use std::rand::{Rng, XorShiftRng};
+use std::rand;
+use std::sync::atomics::{AtomicUint, SeqCst};
+use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
+use std::unstable::intrinsics;
+
+/// A unit of work.
+///
+/// The type parameter `QUD` stands for "queue user data" and represents global custom data for the
+/// entire work queue, and the type parameter `WUD` stands for "work user data" and represents
+/// custom data specific to each unit of work.
+pub struct WorkUnit<QUD,WUD> {
+ /// The function to execute.
+ fun: extern "Rust" fn(WUD, &mut WorkerProxy<QUD,WUD>),
+ /// Arbitrary data.
+ data: WUD,
+}
+
+/// Messages from the supervisor to the worker.
+enum WorkerMsg<QUD,WUD> {
+ /// Tells the worker to start work.
+ StartMsg(Worker<WorkUnit<QUD,WUD>>, *mut AtomicUint, *QUD),
+
+ /// Tells the worker to stop. It can be restarted again with a `StartMsg`.
+ StopMsg,
+
+ /// Tells the worker thread to terminate.
+ ExitMsg,
+}
+
+/// Messages to the supervisor.
+enum SupervisorMsg<QUD,WUD> {
+ FinishedMsg,
+ ReturnDequeMsg(uint, Worker<WorkUnit<QUD,WUD>>),
+}
+
+/// Information that the supervisor thread keeps about the worker threads.
+struct WorkerInfo<QUD,WUD> {
+ /// The communication channel to the workers.
+ chan: Chan<WorkerMsg<QUD,WUD>>,
+ /// The buffer pool for this deque.
+ pool: BufferPool<WorkUnit<QUD,WUD>>,
+ /// The worker end of the deque, if we have it.
+ deque: Option<Worker<WorkUnit<QUD,WUD>>>,
+ /// The thief end of the work-stealing deque.
+ thief: Stealer<WorkUnit<QUD,WUD>>,
+}
+
+/// Information specific to each worker thread that the thread keeps.
+struct WorkerThread<QUD,WUD> {
+ /// The index of this worker.
+ index: uint,
+ /// The communication port from the supervisor.
+ port: Port<WorkerMsg<QUD,WUD>>,
+ /// The communication channel on which messages are sent to the supervisor.
+ chan: SharedChan<SupervisorMsg<QUD,WUD>>,
+ /// The thief end of the work-stealing deque for all other workers.
+ other_deques: ~[Stealer<WorkUnit<QUD,WUD>>],
+ /// The random number generator for this worker.
+ rng: XorShiftRng,
+}
+
+static SPIN_COUNT: uint = 1000;
+
+impl<QUD:Send,WUD:Send> WorkerThread<QUD,WUD> {
+ /// The main logic. This function starts up the worker and listens for
+ /// messages.
+ pub fn start(&mut self) {
+ loop {
+ // Wait for a start message.
+ let (mut deque, ref_count, queue_data) = match self.port.recv() {
+ StartMsg(deque, ref_count, queue_data) => (deque, ref_count, queue_data),
+ StopMsg => fail!("unexpected stop message"),
+ ExitMsg => return,
+ };
+
+ // We're off!
+ //
+ // FIXME(pcwalton): Can't use labeled break or continue cross-crate due to a Rust bug.
+ loop {
+ // FIXME(pcwalton): Nasty workaround for the lack of labeled break/continue
+ // cross-crate.
+ let mut work_unit = unsafe {
+ intrinsics::uninit()
+ };
+ match deque.pop() {
+ Some(work) => work_unit = work,
+ None => {
+ // Become a thief.
+ let mut i = 0;
+ let mut should_continue = true;
+ loop {
+ let victim = (self.rng.next_u32() as uint) % self.other_deques.len();
+ match self.other_deques[victim].steal() {
+ Empty | Abort => {
+ // Continue.
+ }
+ Data(work) => {
+ work_unit = work;
+ break
+ }
+ }
+
+ if i == SPIN_COUNT {
+ match self.port.try_recv() {
+ Some(StopMsg) => {
+ should_continue = false;
+ break
+ }
+ Some(ExitMsg) => return,
+ Some(_) => fail!("unexpected message"),
+ None => {}
+ }
+
+ i = 0
+ } else {
+ i += 1
+ }
+ }
+
+ if !should_continue {
+ break
+ }
+ }
+ }
+
+ // At this point, we have some work. Perform it.
+ let mut proxy = WorkerProxy {
+ worker: &mut deque,
+ ref_count: ref_count,
+ queue_data: queue_data,
+ };
+ (work_unit.fun)(work_unit.data, &mut proxy);
+
+ // The work is done. Now decrement the count of outstanding work items. If this was
+ // the last work unit in the queue, then send a message on the channel.
+ unsafe {
+ if (*ref_count).fetch_sub(1, SeqCst) == 1 {
+ self.chan.send(FinishedMsg)
+ }
+ }
+ }
+
+ // Give the deque back to the supervisor.
+ self.chan.send(ReturnDequeMsg(self.index, deque))
+ }
+ }
+}
+
+/// A handle to the work queue that individual work units have.
+pub struct WorkerProxy<'a,QUD,WUD> {
+ priv worker: &'a mut Worker<WorkUnit<QUD,WUD>>,
+ priv ref_count: *mut AtomicUint,
+ priv queue_data: *QUD,
+}
+
+impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> {
+ /// Enqueues a block into the work queue.
+ #[inline]
+ pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) {
+ unsafe {
+ drop((*self.ref_count).fetch_add(1, SeqCst));
+ }
+ self.worker.push(work_unit);
+ }
+
+ /// Retrieves the queue user data.
+ #[inline]
+ pub fn user_data<'a>(&'a self) -> &'a QUD {
+ unsafe {
+ cast::transmute(self.queue_data)
+ }
+ }
+}
+
+/// A work queue on which units of work can be submitted.
+pub struct WorkQueue<QUD,WUD> {
+ /// Information about each of the workers.
+ priv workers: ~[WorkerInfo<QUD,WUD>],
+ /// A port on which deques can be received from the workers.
+ priv port: Port<SupervisorMsg<QUD,WUD>>,
+ /// The amount of work that has been enqueued.
+ priv work_count: uint,
+ /// Arbitrary user data.
+ data: QUD,
+}
+
+impl<QUD:Send,WUD:Send> WorkQueue<QUD,WUD> {
+ /// Creates a new work queue and spawns all the threads associated with
+ /// it.
+ pub fn new(thread_count: uint, user_data: QUD) -> WorkQueue<QUD,WUD> {
+ // Set up data structures.
+ let (supervisor_port, supervisor_chan) = SharedChan::new();
+ let (mut infos, mut threads) = (~[], ~[]);
+ for i in range(0, thread_count) {
+ let (worker_port, worker_chan) = Chan::new();
+ let mut pool = BufferPool::new();
+ let (worker, thief) = pool.deque();
+ infos.push(WorkerInfo {
+ chan: worker_chan,
+ pool: pool,
+ deque: Some(worker),
+ thief: thief,
+ });
+ threads.push(WorkerThread {
+ index: i,
+ port: worker_port,
+ chan: supervisor_chan.clone(),
+ other_deques: ~[],
+ rng: rand::weak_rng(),
+ });
+ }
+
+ // Connect workers to one another.
+ for i in range(0, thread_count) {
+ for j in range(0, thread_count) {
+ if i != j {
+ threads[i].other_deques.push(infos[j].thief.clone())
+ }
+ }
+ assert!(threads[i].other_deques.len() == thread_count - 1)
+ }
+
+ // Spawn threads.
+ for thread in threads.move_iter() {
+ native::task::spawn(proc() {
+ let mut thread = thread;
+ thread.start()
+ })
+ }
+
+ WorkQueue {
+ workers: infos,
+ port: supervisor_port,
+ work_count: 0,
+ data: user_data,
+ }
+ }
+
+ /// Enqueues a block into the work queue.
+ #[inline]
+ pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) {
+ match self.workers[0].deque {
+ None => {
+ fail!("tried to push a block but we don't have the deque?!")
+ }
+ Some(ref mut deque) => deque.push(work_unit),
+ }
+ self.work_count += 1
+ }
+
+ /// Synchronously runs all the enqueued tasks and waits for them to complete.
+ pub fn run(&mut self) {
+ // Tell the workers to start.
+ let mut work_count = AtomicUint::new(self.work_count);
+ for worker in self.workers.mut_iter() {
+ worker.chan.send(StartMsg(worker.deque.take_unwrap(), &mut work_count, &self.data))
+ }
+
+ // Wait for the work to finish.
+ drop(self.port.recv());
+ self.work_count = 0;
+
+ // Tell everyone to stop.
+ for worker in self.workers.iter() {
+ worker.chan.send(StopMsg)
+ }
+
+ // Get our deques back.
+ for _ in range(0, self.workers.len()) {
+ match self.port.recv() {
+ ReturnDequeMsg(index, deque) => self.workers[index].deque = Some(deque),
+ FinishedMsg => fail!("unexpected finished message!"),
+ }
+ }
+ }
+
+ pub fn shutdown(&mut self) {
+ for worker in self.workers.iter() {
+ worker.chan.send(ExitMsg)
+ }
+ }
+}
+