diff options
-rw-r--r-- | src/components/main/compositing/compositor.rs | 3 | ||||
-rw-r--r-- | src/components/main/layout/block.rs | 6 | ||||
-rw-r--r-- | src/components/main/layout/box_.rs | 2 | ||||
-rw-r--r-- | src/components/main/layout/construct.rs | 43 | ||||
-rw-r--r-- | src/components/main/layout/context.rs | 41 | ||||
-rw-r--r-- | src/components/main/layout/layout_task.rs | 79 | ||||
-rw-r--r-- | src/components/main/layout/parallel.rs | 352 | ||||
-rw-r--r-- | src/components/main/layout/text.rs | 17 | ||||
-rwxr-xr-x | src/components/main/servo.rc | 2 | ||||
-rw-r--r-- | src/components/util/time.rs | 23 | ||||
-rw-r--r-- | src/components/util/util.rc | 2 | ||||
-rw-r--r-- | src/components/util/workqueue.rs | 295 |
12 files changed, 516 insertions, 349 deletions
diff --git a/src/components/main/compositing/compositor.rs b/src/components/main/compositing/compositor.rs index 5d309f1b913..d6abea53368 100644 --- a/src/components/main/compositing/compositor.rs +++ b/src/components/main/compositing/compositor.rs @@ -198,6 +198,9 @@ impl IOCompositor { // Drain compositor port, sometimes messages contain channels that are blocking // another task from finishing (i.e. SetIds) while self.port.try_recv().is_some() {} + + // Tell the profiler to shut down. + self.profiler_chan.send(time::ExitMsg); } fn handle_message(&mut self) { diff --git a/src/components/main/layout/block.rs b/src/components/main/layout/block.rs index b7c945c0d93..5219fe9cd75 100644 --- a/src/components/main/layout/block.rs +++ b/src/components/main/layout/block.rs @@ -329,7 +329,7 @@ impl BlockFlow { // top or bottom borders nor top or bottom padding, and it has a 'height' of either 0 or 'auto', // and it does not contain a line box, and all of its in-flow children's margins (if any) collapse. - let screen_height = ctx.shared.screen_size.height; + let screen_height = ctx.screen_size.height; let mut height = if self.is_root { // FIXME(pcwalton): The max is taken here so that you can scroll the page, but this is @@ -627,7 +627,7 @@ impl Flow for BlockFlow { if self.is_root { debug!("Setting root position"); self.base.position.origin = Au::zero_point(); - self.base.position.size.width = ctx.shared.screen_size.width; + self.base.position.size.width = ctx.screen_size.width; self.base.floats_in = FloatContext::new(self.base.num_floats); self.base.flags_info.flags.set_inorder(false); } @@ -672,7 +672,7 @@ impl Flow for BlockFlow { margin_bottom, margin_left)); - let screen_size = ctx.shared.screen_size; + let screen_size = ctx.screen_size; let (x, w) = box_.get_x_coord_and_new_width_if_fixed(screen_size.width, screen_size.height, width, diff --git a/src/components/main/layout/box_.rs b/src/components/main/layout/box_.rs index 297f70e5adf..66125587be9 100644 --- a/src/components/main/layout/box_.rs +++ b/src/components/main/layout/box_.rs @@ -1338,7 +1338,7 @@ impl Box { iframe_box.pipeline_id, iframe_box.subpage_id); let msg = FrameRectMsg(iframe_box.pipeline_id, iframe_box.subpage_id, rect); - layout_context.shared.constellation_chan.send(msg) + layout_context.constellation_chan.send(msg) } } diff --git a/src/components/main/layout/construct.rs b/src/components/main/layout/construct.rs index 80809899a89..bfc151fb012 100644 --- a/src/components/main/layout/construct.rs +++ b/src/components/main/layout/construct.rs @@ -32,6 +32,7 @@ use layout::text::TextRunScanner; use layout::util::LayoutDataAccess; use layout::wrapper::{LayoutNode, PostorderNodeMutTraversal}; +use gfx::font_context::FontContext; use script::dom::element::{HTMLIframeElementTypeId, HTMLImageElementTypeId}; use script::dom::node::{CommentNodeTypeId, DoctypeNodeTypeId, DocumentFragmentNodeTypeId}; use script::dom::node::{DocumentNodeTypeId, ElementNodeTypeId, TextNodeTypeId}; @@ -209,14 +210,19 @@ pub struct FlowConstructor<'a> { /// /// FIXME(pcwalton): This is going to have to be atomic; can't we do something better? next_flow_id: RefCell<int>, + + /// The font context. + font_context: ~FontContext, } impl<'fc> FlowConstructor<'fc> { /// Creates a new flow constructor. pub fn init<'a>(layout_context: &'a mut LayoutContext) -> FlowConstructor<'a> { + let font_context = ~FontContext::new(layout_context.font_context_info.clone()); FlowConstructor { layout_context: layout_context, next_flow_id: RefCell::new(0), + font_context: font_context, } } @@ -235,7 +241,7 @@ impl<'fc> FlowConstructor<'fc> { Some(url) => { // FIXME(pcwalton): The fact that image boxes store the cache within them makes // little sense to me. - Some(ImageBoxInfo::new(&node, url, self.layout_context.shared.image_cache.clone())) + Some(ImageBoxInfo::new(&node, url, self.layout_context.image_cache.clone())) } } } @@ -262,20 +268,19 @@ impl<'fc> FlowConstructor<'fc> { /// otherwise. #[inline(always)] fn flush_inline_boxes_to_flow(&mut self, boxes: ~[Box], flow: &mut ~Flow, node: LayoutNode) { - if boxes.len() > 0 { - let inline_base = BaseFlow::new(self.next_flow_id(), node); - - let mut inline_flow = ~InlineFlow::from_boxes(inline_base, boxes) as ~Flow; - - self.layout_context.shared.leaf_set.access(|leaf_set| leaf_set.insert(&inline_flow)); + if boxes.len() == 0 { + return + } - TextRunScanner::new().scan_for_runs(self.layout_context, inline_flow); - let mut inline_flow = Some(inline_flow); + let inline_base = BaseFlow::new(self.next_flow_id(), node); + let mut inline_flow = ~InlineFlow::from_boxes(inline_base, boxes) as ~Flow; + self.layout_context.leaf_set.access(|leaf_set| leaf_set.insert(&inline_flow)); + TextRunScanner::new().scan_for_runs(self.font_context, inline_flow); - self.layout_context.shared.leaf_set.access(|leaf_set| { - flow.add_new_child(inline_flow.take_unwrap(), leaf_set) - }) - } + let mut inline_flow = Some(inline_flow); + self.layout_context.leaf_set.access(|leaf_set| { + flow.add_new_child(inline_flow.take_unwrap(), leaf_set) + }) } /// Creates an inline flow from a set of inline boxes, if present, and adds it as a child of @@ -319,7 +324,7 @@ impl<'fc> FlowConstructor<'fc> { flow, node); let mut kid_flow = Some(kid_flow); - self.layout_context.shared.leaf_set.access(|leaf_set| { + self.layout_context.leaf_set.access(|leaf_set| { flow.add_new_child(kid_flow.take_unwrap(), leaf_set) }) } @@ -362,7 +367,7 @@ impl<'fc> FlowConstructor<'fc> { // Push the flow generated by the {ib} split onto our list of // flows. let mut kid_flow = Some(kid_flow); - self.layout_context.shared.leaf_set.access(|leaf_set| { + self.layout_context.leaf_set.access(|leaf_set| { flow.add_new_child(kid_flow.take_unwrap(), leaf_set) }) } @@ -391,7 +396,7 @@ impl<'fc> FlowConstructor<'fc> { let box_ = self.build_box_for_node(node); let mut flow = ~BlockFlow::from_box(base, box_, is_fixed) as ~Flow; - self.layout_context.shared.leaf_set.access(|leaf_set| leaf_set.insert(&flow)); + self.layout_context.leaf_set.access(|leaf_set| leaf_set.insert(&flow)); self.build_children_of_block_flow(&mut flow, node); flow @@ -406,7 +411,7 @@ impl<'fc> FlowConstructor<'fc> { let mut flow = ~BlockFlow::float_from_box(base, float_type, box_) as ~Flow; - self.layout_context.shared.leaf_set.access(|leaf_set| leaf_set.insert(&flow)); + self.layout_context.leaf_set.access(|leaf_set| leaf_set.insert(&flow)); self.build_children_of_block_flow(&mut flow, node); flow @@ -484,7 +489,7 @@ impl<'fc> FlowConstructor<'fc> { fn set_inline_info_for_inline_child(&mut self, boxes: &mut ~[Box], parent_node: LayoutNode) { let parent_box = self.build_box_for_node(parent_node); let font_style = parent_box.font_style(); - let font_group = self.layout_context.font_ctx.get_resolved_font_for_style(&font_style); + let font_group = self.font_context.get_resolved_font_for_style(&font_style); let (font_ascent,font_descent) = font_group.borrow().with_mut( |fg| { fg.fonts[0].borrow().with_mut( |font| { (font.metrics.ascent,font.metrics.descent) @@ -569,7 +574,7 @@ impl<'a> PostorderNodeMutTraversal for FlowConstructor<'a> { // `display: none` contributes no flow construction result. Nuke the flow construction // results of children. (display::none, _, _) => { - self.layout_context.shared.leaf_set.access(|leaf_set| { + self.layout_context.leaf_set.access(|leaf_set| { for child in node.children() { let mut old_result = child.swap_out_construction_result(); old_result.destroy(leaf_set) diff --git a/src/components/main/layout/context.rs b/src/components/main/layout/context.rs index 4b62d4236d1..fb797196e71 100644 --- a/src/components/main/layout/context.rs +++ b/src/components/main/layout/context.rs @@ -5,17 +5,26 @@ //! Data needed by the layout task. use extra::arc::MutexArc; +use green::task::GreenTask; use layout::flow::LeafSet; +use std::cast; +use std::ptr; +use std::rt::Runtime; +use std::rt::local::Local; +use std::rt::task::Task; use geom::size::Size2D; -use gfx::font_context::FontContext; +use gfx::font_context::{FontContext, FontContextInfo}; use servo_msg::constellation_msg::ConstellationChan; use servo_net::local_image_cache::LocalImageCache; use servo_util::geometry::Au; +#[thread_local] +static mut FONT_CONTEXT: *mut FontContext = 0 as *mut FontContext; + /// Data shared by all layout workers. #[deriving(Clone)] -pub struct SharedLayoutInfo { +pub struct LayoutContext { /// The local image cache. image_cache: MutexArc<LocalImageCache>, @@ -27,14 +36,30 @@ pub struct SharedLayoutInfo { /// The set of leaf flows. leaf_set: MutexArc<LeafSet>, + + /// Information needed to construct a font context. + font_context_info: FontContextInfo, } -/// Data specific to a layout worker. -pub struct LayoutContext { - /// Shared layout info. - shared: SharedLayoutInfo, +impl LayoutContext { + pub fn font_context<'a>(&'a mut self) -> &'a mut FontContext { + // Sanity check. + let mut task = Local::borrow(None::<Task>); + match task.get().maybe_take_runtime::<GreenTask>() { + Some(green) => { + task.get().put_runtime(green as ~Runtime); + fail!("can't call this on a green task!") + } + None => {} + } - /// The current font context. - font_ctx: ~FontContext, + unsafe { + if FONT_CONTEXT == ptr::mut_null() { + let context = ~FontContext::new(self.font_context_info.clone()); + FONT_CONTEXT = cast::transmute(context) + } + cast::transmute(FONT_CONTEXT) + } + } } diff --git a/src/components/main/layout/layout_task.rs b/src/components/main/layout/layout_task.rs index 4a10dac03c3..6ad34addd76 100644 --- a/src/components/main/layout/layout_task.rs +++ b/src/components/main/layout/layout_task.rs @@ -9,7 +9,7 @@ use css::matching::MatchMethods; use css::select::new_stylist; use css::node_style::StyledNode; use layout::construct::{FlowConstructionResult, FlowConstructor, NoConstructionResult}; -use layout::context::{LayoutContext, SharedLayoutInfo}; +use layout::context::LayoutContext; use layout::display_list_builder::{DisplayListBuilder, ToGfxColor}; use layout::extra::LayoutAuxMethods; use layout::flow::{Flow, ImmutableFlowUtils, LeafSet, MutableFlowUtils, MutableOwnedFlowUtils}; @@ -17,7 +17,8 @@ use layout::flow::{PreorderFlowTraversal, PostorderFlowTraversal}; use layout::flow; use layout::incremental::RestyleDamage; use layout::parallel::{AssignHeightsAndStoreOverflowTraversalKind, BubbleWidthsTraversalKind}; -use layout::parallel::{ParallelPostorderFlowTraversal}; +use layout::parallel::{UnsafeFlow}; +use layout::parallel; use layout::util::{LayoutDataAccess, OpaqueNode, LayoutDataWrapper}; use layout::wrapper::LayoutNode; @@ -25,7 +26,7 @@ use extra::arc::{Arc, MutexArc, RWArc}; use geom::rect::Rect; use geom::size::Size2D; use gfx::display_list::{ClipDisplayItemClass, DisplayItem, DisplayItemIterator, DisplayList}; -use gfx::font_context::{FontContext, FontContextInfo}; +use gfx::font_context::FontContextInfo; use gfx::opts::Opts; use gfx::render_task::{RenderMsg, RenderChan, RenderLayer}; use gfx::{render_task, color}; @@ -46,10 +47,12 @@ use servo_util::geometry::Au; use servo_util::time::{ProfilerChan, profile}; use servo_util::time; use servo_util::task::spawn_named; +use servo_util::workqueue::WorkQueue; use std::cast::transmute; use std::cast; use std::cell::RefCell; use std::comm::Port; +use std::ptr; use std::util; use style::{AuthorOrigin, Stylesheet, Stylist}; @@ -91,7 +94,7 @@ pub struct LayoutTask { stylist: RWArc<Stylist>, /// The workers that we use for parallel operation. - parallel_traversal: Option<ParallelPostorderFlowTraversal>, + parallel_traversal: Option<WorkQueue<*mut LayoutContext,UnsafeFlow>>, /// The channel on which messages can be sent to the profiler. profiler_chan: ProfilerChan, @@ -142,12 +145,14 @@ impl PreorderFlowTraversal for PropagateDamageTraversal { /// The bubble-widths traversal, the first part of layout computation. This computes preferred /// and intrinsic widths and bubbles them up the tree. -pub struct BubbleWidthsTraversal<'a>(&'a mut LayoutContext); +pub struct BubbleWidthsTraversal<'a> { + layout_context: &'a mut LayoutContext, +} impl<'a> PostorderFlowTraversal for BubbleWidthsTraversal<'a> { #[inline] fn process(&mut self, flow: &mut Flow) -> bool { - flow.bubble_widths(**self); + flow.bubble_widths(self.layout_context); true } @@ -174,13 +179,15 @@ impl<'a> PreorderFlowTraversal for AssignWidthsTraversal<'a> { /// The assign-heights-and-store-overflow traversal, the last (and most expensive) part of layout /// computation. Determines the final heights for all layout objects, computes positions, and /// computes overflow regions. In Gecko this corresponds to `FinishAndStoreOverflow`. -pub struct AssignHeightsAndStoreOverflowTraversal<'a>(&'a mut LayoutContext); +pub struct AssignHeightsAndStoreOverflowTraversal<'a> { + layout_context: &'a mut LayoutContext, +} impl<'a> PostorderFlowTraversal for AssignHeightsAndStoreOverflowTraversal<'a> { #[inline] fn process(&mut self, flow: &mut Flow) -> bool { - flow.assign_height(**self); - flow.store_overflow(**self); + flow.assign_height(self.layout_context); + flow.store_overflow(self.layout_context); true } @@ -248,13 +255,8 @@ impl LayoutTask { -> LayoutTask { let local_image_cache = MutexArc::new(LocalImageCache(image_cache_task.clone())); let screen_size = Size2D(Au(0), Au(0)); - let font_context_info = FontContextInfo { - backend: opts.render_backend, - needs_font_list: true, - profiler_chan: profiler_chan.clone(), - }; let parallel_traversal = if opts.layout_threads != 1 { - Some(ParallelPostorderFlowTraversal::new(font_context_info, opts.layout_threads)) + Some(WorkQueue::new(opts.layout_threads, ptr::mut_null())) } else { None }; @@ -288,20 +290,18 @@ impl LayoutTask { // Create a layout context for use in building display lists, hit testing, &c. fn build_layout_context(&self) -> LayoutContext { - let font_ctx = ~FontContext::new(FontContextInfo { + let font_context_info = FontContextInfo { backend: self.opts.render_backend, needs_font_list: true, profiler_chan: self.profiler_chan.clone(), - }); + }; LayoutContext { - shared: SharedLayoutInfo { - image_cache: self.local_image_cache.clone(), - screen_size: self.screen_size.clone(), - constellation_chan: self.constellation_chan.clone(), - leaf_set: self.leaf_set.clone(), - }, - font_ctx: font_ctx, + image_cache: self.local_image_cache.clone(), + screen_size: self.screen_size.clone(), + constellation_chan: self.constellation_chan.clone(), + leaf_set: self.leaf_set.clone(), + font_context_info: font_context_info, } } @@ -418,7 +418,12 @@ impl LayoutTask { fn solve_constraints(&mut self, layout_root: &mut Flow, layout_context: &mut LayoutContext) { - layout_root.traverse_postorder(&mut BubbleWidthsTraversal(layout_context)); + { + let mut traversal = BubbleWidthsTraversal { + layout_context: layout_context, + }; + layout_root.traverse_postorder(&mut traversal); + } // FIXME(kmc): We want to prune nodes without the Reflow restyle damage // bit, but FloatContext values can't be reused, so we need to @@ -428,8 +433,12 @@ impl LayoutTask { layout_root.traverse_preorder(&mut AssignWidthsTraversal(layout_context)); // FIXME(pcwalton): Prune this pass as well. - layout_root.traverse_postorder(&mut AssignHeightsAndStoreOverflowTraversal( - layout_context)); + { + let mut traversal = AssignHeightsAndStoreOverflowTraversal { + layout_context: layout_context, + }; + layout_root.traverse_postorder(&mut traversal); + } } /// Performs layout constraint solving in parallel. @@ -443,9 +452,11 @@ impl LayoutTask { match self.parallel_traversal { None => fail!("solve_contraints_parallel() called with no parallel traversal ready"), Some(ref mut traversal) => { - traversal.start(BubbleWidthsTraversalKind, - layout_context, - self.profiler_chan.clone()); + parallel::traverse_flow_tree(BubbleWidthsTraversalKind, + &self.leaf_set, + self.profiler_chan.clone(), + layout_context, + traversal); // NOTE: this currently computes borders, so any pruning should separate that // operation out. @@ -453,9 +464,11 @@ impl LayoutTask { // because this is a top-down traversal, unlike the others. layout_root.traverse_preorder(&mut AssignWidthsTraversal(layout_context)); - traversal.start(AssignHeightsAndStoreOverflowTraversalKind, - layout_context, - self.profiler_chan.clone()); + parallel::traverse_flow_tree(AssignHeightsAndStoreOverflowTraversalKind, + &self.leaf_set, + self.profiler_chan.clone(), + layout_context, + traversal); } } } diff --git a/src/components/main/layout/parallel.rs b/src/components/main/layout/parallel.rs index 480ec886d8f..b38e6abc9b5 100644 --- a/src/components/main/layout/parallel.rs +++ b/src/components/main/layout/parallel.rs @@ -4,44 +4,29 @@ //! Implements parallel traversals over the flow tree. -use layout::context::{LayoutContext, SharedLayoutInfo}; -use layout::flow::{Flow, PostorderFlowTraversal}; +use layout::context::LayoutContext; +use layout::flow::{Flow, LeafSet, PostorderFlowTraversal}; use layout::flow; use layout::layout_task::{AssignHeightsAndStoreOverflowTraversal, BubbleWidthsTraversal}; -use gfx::font_context::{FontContext, FontContextInfo}; -use native; +use extra::arc::MutexArc; use servo_util::time::{ProfilerChan, profile}; use servo_util::time; +use servo_util::workqueue::{WorkQueue, WorkUnit, WorkerProxy}; use std::cast; -use std::comm::SharedChan; -use std::libc::c_void; -use std::ptr; use std::sync::atomics::{AtomicInt, Relaxed, SeqCst}; -use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker}; -use std::util; -enum WorkerMsg { - /// Tells the worker to start a traversal. - StartMsg(Worker<UnsafeFlow>, TraversalKind, SharedLayoutInfo), - - /// Tells the worker to stop. It can be restarted again with a `StartMsg`. - StopMsg, - - /// Tells the worker thread to terminate. - ExitMsg, +pub enum TraversalKind { + BubbleWidthsTraversalKind, + AssignHeightsAndStoreOverflowTraversalKind, } -enum SupervisorMsg { - /// Sent once the last flow is processed. - FinishedMsg, +pub type UnsafeFlow = (uint, uint); - /// Returns the deque to the supervisor. - ReturnDequeMsg(uint, Worker<UnsafeFlow>), +fn null_unsafe_flow() -> UnsafeFlow { + (0, 0) } -pub type UnsafeFlow = (*c_void, *c_void); - pub fn owned_flow_to_unsafe_flow(flow: *~Flow) -> UnsafeFlow { unsafe { cast::transmute_copy(&*flow) @@ -54,10 +39,6 @@ pub fn mut_owned_flow_to_unsafe_flow(flow: *mut ~Flow) -> UnsafeFlow { } } -fn null_unsafe_flow() -> UnsafeFlow { - (ptr::null(), ptr::null()) -} - /// Information that we need stored in each flow. pub struct FlowParallelInfo { /// The number of children that still need work done. @@ -75,270 +56,97 @@ impl FlowParallelInfo { } } -/// Information that the supervisor thread keeps about the worker threads. -struct WorkerInfo { - /// The communication channel to the workers. - chan: Chan<WorkerMsg>, - /// The buffer pool for this deque. - pool: BufferPool<UnsafeFlow>, - /// The worker end of the deque, if we have it. - deque: Option<Worker<UnsafeFlow>>, - /// The thief end of the work-stealing deque. - thief: Stealer<UnsafeFlow>, -} - -/// Information that each worker needs to do its job. -struct PostorderWorker { - /// The font context. - font_context: Option<~FontContext>, - /// Communications for the worker. - comm: PostorderWorkerComm, -} - -/// Communication channels for postorder workers. -struct PostorderWorkerComm { - /// The index of this worker. - index: uint, - /// The communication port from the supervisor. - port: Port<WorkerMsg>, - /// The communication channel to the supervisor. - chan: SharedChan<SupervisorMsg>, - /// The thief end of the work-stealing deque for all other workers. - other_deques: ~[Stealer<UnsafeFlow>], -} - -/// The type of traversal we're performing. -pub enum TraversalKind { - BubbleWidthsTraversalKind, - AssignHeightsAndStoreOverflowTraversalKind, -} - -impl PostorderWorker { - /// Starts up the worker and listens for messages. - pub fn start(&mut self) { +/// A parallel bottom-up flow traversal. +trait ParallelPostorderFlowTraversal : PostorderFlowTraversal { + fn run_parallel(&mut self, mut unsafe_flow: UnsafeFlow) { loop { - // Wait for a start message. - let (mut deque, kind, shared_layout_info) = match self.comm.port.recv() { - StopMsg => fail!("unexpected stop message"), - StartMsg(deque, kind, shared_layout_info) => (deque, kind, shared_layout_info), - ExitMsg => return, - }; + unsafe { + // Get a real flow. + let flow: &mut ~Flow = cast::transmute(&unsafe_flow); - // Set up our traversal context. - let mut traversal = LayoutContext { - shared: shared_layout_info, - font_ctx: self.font_context.take_unwrap(), - }; - - // And we're off! - 'outer: loop { - let unsafe_flow; - match deque.pop() { - Some(the_flow) => unsafe_flow = the_flow, - None => { - // Become a thief. - let mut i = 0; - loop { - if self.comm.other_deques.len() != 0 { - match self.comm.other_deques[i].steal() { - Empty => { - // Try the next one. - i += 1; - if i >= self.comm.other_deques.len() { - i = 0 - } - } - Abort => { - // Continue. - } - Data(the_flow) => { - unsafe_flow = the_flow; - break - } - } - } - - if i == 0 { - match self.comm.port.try_recv() { - Some(StopMsg) => break 'outer, - Some(ExitMsg) => return, - Some(_) => fail!("unexpected message!"), - None => {} - } - } - } - } + // Perform the appropriate traversal. + if self.should_process(*flow) { + self.process(*flow); } - // OK, we've got some data. The rest of this is unsafe code. - unsafe { - // Get a real flow. - let flow: &mut ~Flow = cast::transmute(&unsafe_flow); - - // Perform the appropriate traversal. - match kind { - BubbleWidthsTraversalKind => { - let mut traversal = BubbleWidthsTraversal(&mut traversal); - if traversal.should_process(*flow) { - traversal.process(*flow); - } - } - AssignHeightsAndStoreOverflowTraversalKind => { - let mut traversal = - AssignHeightsAndStoreOverflowTraversal(&mut traversal); - if traversal.should_process(*flow) { - traversal.process(*flow); - } - } - } + let base = flow::mut_base(*flow); - let base = flow::mut_base(*flow); + // Reset the count of children for the next layout traversal. + base.parallel.children_count.store(base.children.len() as int, Relaxed); - // Reset the count of children for the next layout traversal. - base.parallel.children_count.store(base.children.len() as int, Relaxed); + // Possibly enqueue the parent. + let unsafe_parent = base.parallel.parent; + if unsafe_parent == null_unsafe_flow() { + // We're done! + break + } - // Possibly enqueue the parent. - let unsafe_parent = base.parallel.parent; - if unsafe_parent == null_unsafe_flow() { - // We're done! - self.comm.chan.send(FinishedMsg); - } else { - // No, we're not at the root yet. Then are we the last sibling of our - // parent? If so, we can enqueue our parent; otherwise, we've gotta wait. - let parent: &mut ~Flow = cast::transmute(&unsafe_parent); - let parent_base = flow::mut_base(*parent); - if parent_base.parallel.children_count.fetch_sub(1, SeqCst) == 1 { - // We were the last child of our parent. Enqueue the parent. - deque.push(unsafe_parent) - } - } + // No, we're not at the root yet. Then are we the last sibling of our parent? If + // so, we can continue on with our parent; otherwise, we've gotta wait. + let parent: &mut ~Flow = cast::transmute(&unsafe_parent); + let parent_base = flow::mut_base(*parent); + if parent_base.parallel.children_count.fetch_sub(1, SeqCst) == 1 { + // We were the last child of our parent. Reflow our parent. + unsafe_flow = unsafe_parent + } else { + // Stop. + break } } - - // Destroy the traversal and save the font context. - let LayoutContext { - font_ctx: font_context, - .. - } = traversal; - self.font_context = Some(font_context); - - // Give the deque back to the supervisor. - self.comm.chan.send(ReturnDequeMsg(self.comm.index, deque)) } } } -/// A parallel bottom-up traversal. -pub struct ParallelPostorderFlowTraversal { - /// Information about each of the workers. - workers: ~[WorkerInfo], - /// A port on which information can be received from the workers. - port: Port<SupervisorMsg>, -} +impl<'a> ParallelPostorderFlowTraversal for BubbleWidthsTraversal<'a> {} -impl ParallelPostorderFlowTraversal { - pub fn new(font_context_info: FontContextInfo, thread_count: uint) - -> ParallelPostorderFlowTraversal { - let (supervisor_port, supervisor_chan) = SharedChan::new(); - let (mut infos, mut comms) = (~[], ~[]); - for i in range(0, thread_count) { - let (worker_port, worker_chan) = Chan::new(); - let mut pool = BufferPool::new(); - let (worker, thief) = pool.deque(); - infos.push(WorkerInfo { - chan: worker_chan, - pool: pool, - deque: Some(worker), - thief: thief, - }); - comms.push(PostorderWorkerComm { - index: i, - port: worker_port, - chan: supervisor_chan.clone(), - other_deques: ~[], - }); - } +impl<'a> ParallelPostorderFlowTraversal for AssignHeightsAndStoreOverflowTraversal<'a> {} - for i in range(0, thread_count) { - for j in range(0, thread_count) { - if i != j { - comms[i].other_deques.push(infos[j].thief.clone()) - } - } - assert!(comms[i].other_deques.len() == thread_count - 1) - } - - for comm in comms.move_iter() { - let font_context_info = font_context_info.clone(); - native::task::spawn(proc() { - let mut worker = PostorderWorker { - font_context: Some(~FontContext::new(font_context_info)), - comm: comm, - }; - worker.start() - }) - } +fn bubble_widths(unsafe_flow: UnsafeFlow, proxy: &mut WorkerProxy<*mut LayoutContext,UnsafeFlow>) { + let layout_context: &mut LayoutContext = unsafe { + cast::transmute(*proxy.user_data()) + }; + let mut bubble_widths_traversal = BubbleWidthsTraversal { + layout_context: layout_context, + }; + bubble_widths_traversal.run_parallel(unsafe_flow) +} - ParallelPostorderFlowTraversal { - workers: infos, - port: supervisor_port, - } - } +fn assign_heights_and_store_overflow(unsafe_flow: UnsafeFlow, + proxy: &mut WorkerProxy<*mut LayoutContext,UnsafeFlow>) { + let layout_context: &mut LayoutContext = unsafe { + cast::transmute(*proxy.user_data()) + }; + let mut assign_heights_traversal = AssignHeightsAndStoreOverflowTraversal { + layout_context: layout_context, + }; + assign_heights_traversal.run_parallel(unsafe_flow) +} - /// TODO(pcwalton): This could be parallelized. - fn warmup(&mut self, layout_context: &mut LayoutContext) { - layout_context.shared.leaf_set.access(|leaf_set| { - for &flow in leaf_set.iter() { - match self.workers[0].deque { - None => fail!("no deque!"), - Some(ref mut deque) => { - deque.push(flow); - } - } - } - }); +pub fn traverse_flow_tree(kind: TraversalKind, + leaf_set: &MutexArc<LeafSet>, + profiler_chan: ProfilerChan, + layout_context: &mut LayoutContext, + queue: &mut WorkQueue<*mut LayoutContext,UnsafeFlow>) { + unsafe { + queue.data = cast::transmute(layout_context) } - /// Traverses the given flow tree in parallel. - pub fn start(&mut self, - kind: TraversalKind, - layout_context: &mut LayoutContext, - profiler_chan: ProfilerChan) { - profile(time::LayoutParallelWarmupCategory, profiler_chan, || self.warmup(layout_context)); - - for worker in self.workers.mut_iter() { - worker.chan.send(StartMsg(util::replace(&mut worker.deque, None).unwrap(), - kind, - layout_context.shared.clone())) - } - - // Wait for them to finish. - let _ = self.port.recv(); - - // Tell everyone to stop. - for worker in self.workers.iter() { - worker.chan.send(StopMsg); - } + let fun = match kind { + BubbleWidthsTraversalKind => bubble_widths, + AssignHeightsAndStoreOverflowTraversalKind => assign_heights_and_store_overflow, + }; - // Get our deques back. - // - // TODO(pcwalton): Might be able to get a little parallelism over multiple traversals by - // doing this lazily. - for _ in range(0, self.workers.len()) { - match self.port.recv() { - ReturnDequeMsg(returned_deque_index, returned_deque) => { - self.workers[returned_deque_index].deque = Some(returned_deque) - } - _ => fail!("unexpected message received during return queue phase"), + profile(time::LayoutParallelWarmupCategory, profiler_chan, || { + leaf_set.access(|leaf_set| { + for &flow in leaf_set.iter() { + queue.push(WorkUnit { + fun: fun, + data: flow, + }) } - } - } + }) + }); - /// Shuts down all the worker threads. - pub fn shutdown(&mut self) { - for worker in self.workers.iter() { - worker.chan.send(ExitMsg) - } - } + queue.run() } diff --git a/src/components/main/layout/text.rs b/src/components/main/layout/text.rs index e4ad1f8bc57..b8913c6499e 100644 --- a/src/components/main/layout/text.rs +++ b/src/components/main/layout/text.rs @@ -5,10 +5,10 @@ //! Text layout. use layout::box_::{Box, ScannedTextBox, ScannedTextBoxInfo, UnscannedTextBox}; -use layout::context::LayoutContext; use layout::flow::Flow; use extra::arc::Arc; +use gfx::font_context::FontContext; use gfx::text::text_run::TextRun; use gfx::text::util::{CompressWhitespaceNewline, transform_text, CompressNone}; use servo_util::range::Range; @@ -27,7 +27,7 @@ impl TextRunScanner { } } - pub fn scan_for_runs(&mut self, ctx: &mut LayoutContext, flow: &mut Flow) { + pub fn scan_for_runs(&mut self, font_context: &mut FontContext, flow: &mut Flow) { { let inline = flow.as_immutable_inline(); debug!("TextRunScanner: scanning {:u} boxes for text runs...", inline.boxes.len()); @@ -40,13 +40,16 @@ impl TextRunScanner { if box_i > 0 && !can_coalesce_text_nodes(flow.as_immutable_inline().boxes, box_i - 1, box_i) { - last_whitespace = self.flush_clump_to_list(ctx, flow, last_whitespace, &mut out_boxes); + last_whitespace = self.flush_clump_to_list(font_context, + flow, + last_whitespace, + &mut out_boxes); } self.clump.extend_by(1); } // handle remaining clumps if self.clump.length() > 0 { - self.flush_clump_to_list(ctx, flow, last_whitespace, &mut out_boxes); + self.flush_clump_to_list(font_context, flow, last_whitespace, &mut out_boxes); } debug!("TextRunScanner: swapping out boxes."); @@ -73,7 +76,7 @@ impl TextRunScanner { /// FIXME(pcwalton): Stop cloning boxes. Instead we will need to consume the `in_box`es as we /// iterate over them. pub fn flush_clump_to_list(&mut self, - ctx: &mut LayoutContext, + font_context: &mut FontContext, flow: &mut Flow, last_whitespace: bool, out_boxes: &mut ~[Box]) @@ -130,7 +133,7 @@ impl TextRunScanner { // TODO(#177): Text run creation must account for the renderability of text by // font group fonts. This is probably achieved by creating the font group above // and then letting `FontGroup` decide which `Font` to stick into the text run. - let fontgroup = ctx.font_ctx.get_resolved_font_for_style(&font_style); + let fontgroup = font_context.get_resolved_font_for_style(&font_style); let run = ~fontgroup.borrow().with(|fg| fg.create_textrun(transformed_text.clone(), decoration)); debug!("TextRunScanner: pushing single text box in range: {} ({})", @@ -151,7 +154,7 @@ impl TextRunScanner { // and then letting `FontGroup` decide which `Font` to stick into the text run. let in_box = &in_boxes[self.clump.begin()]; let font_style = in_box.font_style(); - let fontgroup = ctx.font_ctx.get_resolved_font_for_style(&font_style); + let fontgroup = font_context.get_resolved_font_for_style(&font_style); let decoration = in_box.text_decoration(); // TODO(#115): Use the actual CSS `white-space` property of the relevant style. diff --git a/src/components/main/servo.rc b/src/components/main/servo.rc index 60f6902a591..342aacae57f 100755 --- a/src/components/main/servo.rc +++ b/src/components/main/servo.rc @@ -6,7 +6,7 @@ #[comment = "The Servo Parallel Browser Project"]; #[license = "MPL"]; -#[feature(globs, macro_rules, managed_boxes)]; +#[feature(globs, macro_rules, managed_boxes, thread_local)]; extern mod alert; extern mod azure; diff --git a/src/components/util/time.rs b/src/components/util/time.rs index b4ac13ef095..426cfad705a 100644 --- a/src/components/util/time.rs +++ b/src/components/util/time.rs @@ -37,10 +37,12 @@ impl ProfilerChan { } pub enum ProfilerMsg { - // Normal message used for reporting time + /// Normal message used for reporting time TimeMsg(ProfilerCategory, f64), - // Message used to force print the profiling metrics + /// Message used to force print the profiling metrics PrintMsg, + /// Tells the profiler to shut down. + ExitMsg, } #[deriving(Eq, Clone, TotalEq, TotalOrd)] @@ -136,7 +138,12 @@ impl Profiler { None => { // no-op to handle profiler messages when the profiler is inactive spawn_named("Profiler", proc() { - while port.recv_opt().is_some() {} + loop { + match port.recv_opt() { + None | Some(ExitMsg) => break, + _ => {} + } + } }); } } @@ -156,13 +163,17 @@ impl Profiler { loop { let msg = self.port.recv_opt(); match msg { - Some (msg) => self.handle_msg(msg), + Some(msg) => { + if !self.handle_msg(msg) { + break + } + } None => break } } } - fn handle_msg(&mut self, msg: ProfilerMsg) { + fn handle_msg(&mut self, msg: ProfilerMsg) -> bool { match msg { TimeMsg(category, t) => self.buckets.find_mut(&category).unwrap().push(t), PrintMsg => match self.last_msg { @@ -170,8 +181,10 @@ impl Profiler { Some(TimeMsg(..)) => self.print_buckets(), _ => () }, + ExitMsg => return false, }; self.last_msg = Some(msg); + true } fn print_buckets(&mut self) { diff --git a/src/components/util/util.rc b/src/components/util/util.rc index 50af03c67a7..1fb520c0c7b 100644 --- a/src/components/util/util.rc +++ b/src/components/util/util.rc @@ -9,6 +9,7 @@ extern mod extra; extern mod geom; +extern mod native; pub mod cache; pub mod geometry; @@ -19,4 +20,5 @@ pub mod vec; pub mod debug; pub mod io; pub mod task; +pub mod workqueue; diff --git a/src/components/util/workqueue.rs b/src/components/util/workqueue.rs new file mode 100644 index 00000000000..0254b0c573f --- /dev/null +++ b/src/components/util/workqueue.rs @@ -0,0 +1,295 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! A work queue for scheduling units of work across threads in a fork-join fashion. +//! +//! Data associated with queues is simply a pair of unsigned integers. It is expected that a +//! higher-level API on top of this could allow safe fork-join parallelism. + +use native; +use std::cast; +use std::rand::{Rng, XorShiftRng}; +use std::rand; +use std::sync::atomics::{AtomicUint, SeqCst}; +use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker}; +use std::unstable::intrinsics; + +/// A unit of work. +/// +/// The type parameter `QUD` stands for "queue user data" and represents global custom data for the +/// entire work queue, and the type parameter `WUD` stands for "work user data" and represents +/// custom data specific to each unit of work. +pub struct WorkUnit<QUD,WUD> { + /// The function to execute. + fun: extern "Rust" fn(WUD, &mut WorkerProxy<QUD,WUD>), + /// Arbitrary data. + data: WUD, +} + +/// Messages from the supervisor to the worker. +enum WorkerMsg<QUD,WUD> { + /// Tells the worker to start work. + StartMsg(Worker<WorkUnit<QUD,WUD>>, *mut AtomicUint, *QUD), + + /// Tells the worker to stop. It can be restarted again with a `StartMsg`. + StopMsg, + + /// Tells the worker thread to terminate. + ExitMsg, +} + +/// Messages to the supervisor. +enum SupervisorMsg<QUD,WUD> { + FinishedMsg, + ReturnDequeMsg(uint, Worker<WorkUnit<QUD,WUD>>), +} + +/// Information that the supervisor thread keeps about the worker threads. +struct WorkerInfo<QUD,WUD> { + /// The communication channel to the workers. + chan: Chan<WorkerMsg<QUD,WUD>>, + /// The buffer pool for this deque. + pool: BufferPool<WorkUnit<QUD,WUD>>, + /// The worker end of the deque, if we have it. + deque: Option<Worker<WorkUnit<QUD,WUD>>>, + /// The thief end of the work-stealing deque. + thief: Stealer<WorkUnit<QUD,WUD>>, +} + +/// Information specific to each worker thread that the thread keeps. +struct WorkerThread<QUD,WUD> { + /// The index of this worker. + index: uint, + /// The communication port from the supervisor. + port: Port<WorkerMsg<QUD,WUD>>, + /// The communication channel on which messages are sent to the supervisor. + chan: SharedChan<SupervisorMsg<QUD,WUD>>, + /// The thief end of the work-stealing deque for all other workers. + other_deques: ~[Stealer<WorkUnit<QUD,WUD>>], + /// The random number generator for this worker. + rng: XorShiftRng, +} + +static SPIN_COUNT: uint = 1000; + +impl<QUD:Send,WUD:Send> WorkerThread<QUD,WUD> { + /// The main logic. This function starts up the worker and listens for + /// messages. + pub fn start(&mut self) { + loop { + // Wait for a start message. + let (mut deque, ref_count, queue_data) = match self.port.recv() { + StartMsg(deque, ref_count, queue_data) => (deque, ref_count, queue_data), + StopMsg => fail!("unexpected stop message"), + ExitMsg => return, + }; + + // We're off! + // + // FIXME(pcwalton): Can't use labeled break or continue cross-crate due to a Rust bug. + loop { + // FIXME(pcwalton): Nasty workaround for the lack of labeled break/continue + // cross-crate. + let mut work_unit = unsafe { + intrinsics::uninit() + }; + match deque.pop() { + Some(work) => work_unit = work, + None => { + // Become a thief. + let mut i = 0; + let mut should_continue = true; + loop { + let victim = (self.rng.next_u32() as uint) % self.other_deques.len(); + match self.other_deques[victim].steal() { + Empty | Abort => { + // Continue. + } + Data(work) => { + work_unit = work; + break + } + } + + if i == SPIN_COUNT { + match self.port.try_recv() { + Some(StopMsg) => { + should_continue = false; + break + } + Some(ExitMsg) => return, + Some(_) => fail!("unexpected message"), + None => {} + } + + i = 0 + } else { + i += 1 + } + } + + if !should_continue { + break + } + } + } + + // At this point, we have some work. Perform it. + let mut proxy = WorkerProxy { + worker: &mut deque, + ref_count: ref_count, + queue_data: queue_data, + }; + (work_unit.fun)(work_unit.data, &mut proxy); + + // The work is done. Now decrement the count of outstanding work items. If this was + // the last work unit in the queue, then send a message on the channel. + unsafe { + if (*ref_count).fetch_sub(1, SeqCst) == 1 { + self.chan.send(FinishedMsg) + } + } + } + + // Give the deque back to the supervisor. + self.chan.send(ReturnDequeMsg(self.index, deque)) + } + } +} + +/// A handle to the work queue that individual work units have. +pub struct WorkerProxy<'a,QUD,WUD> { + priv worker: &'a mut Worker<WorkUnit<QUD,WUD>>, + priv ref_count: *mut AtomicUint, + priv queue_data: *QUD, +} + +impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> { + /// Enqueues a block into the work queue. + #[inline] + pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) { + unsafe { + drop((*self.ref_count).fetch_add(1, SeqCst)); + } + self.worker.push(work_unit); + } + + /// Retrieves the queue user data. + #[inline] + pub fn user_data<'a>(&'a self) -> &'a QUD { + unsafe { + cast::transmute(self.queue_data) + } + } +} + +/// A work queue on which units of work can be submitted. +pub struct WorkQueue<QUD,WUD> { + /// Information about each of the workers. + priv workers: ~[WorkerInfo<QUD,WUD>], + /// A port on which deques can be received from the workers. + priv port: Port<SupervisorMsg<QUD,WUD>>, + /// The amount of work that has been enqueued. + priv work_count: uint, + /// Arbitrary user data. + data: QUD, +} + +impl<QUD:Send,WUD:Send> WorkQueue<QUD,WUD> { + /// Creates a new work queue and spawns all the threads associated with + /// it. + pub fn new(thread_count: uint, user_data: QUD) -> WorkQueue<QUD,WUD> { + // Set up data structures. + let (supervisor_port, supervisor_chan) = SharedChan::new(); + let (mut infos, mut threads) = (~[], ~[]); + for i in range(0, thread_count) { + let (worker_port, worker_chan) = Chan::new(); + let mut pool = BufferPool::new(); + let (worker, thief) = pool.deque(); + infos.push(WorkerInfo { + chan: worker_chan, + pool: pool, + deque: Some(worker), + thief: thief, + }); + threads.push(WorkerThread { + index: i, + port: worker_port, + chan: supervisor_chan.clone(), + other_deques: ~[], + rng: rand::weak_rng(), + }); + } + + // Connect workers to one another. + for i in range(0, thread_count) { + for j in range(0, thread_count) { + if i != j { + threads[i].other_deques.push(infos[j].thief.clone()) + } + } + assert!(threads[i].other_deques.len() == thread_count - 1) + } + + // Spawn threads. + for thread in threads.move_iter() { + native::task::spawn(proc() { + let mut thread = thread; + thread.start() + }) + } + + WorkQueue { + workers: infos, + port: supervisor_port, + work_count: 0, + data: user_data, + } + } + + /// Enqueues a block into the work queue. + #[inline] + pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) { + match self.workers[0].deque { + None => { + fail!("tried to push a block but we don't have the deque?!") + } + Some(ref mut deque) => deque.push(work_unit), + } + self.work_count += 1 + } + + /// Synchronously runs all the enqueued tasks and waits for them to complete. + pub fn run(&mut self) { + // Tell the workers to start. + let mut work_count = AtomicUint::new(self.work_count); + for worker in self.workers.mut_iter() { + worker.chan.send(StartMsg(worker.deque.take_unwrap(), &mut work_count, &self.data)) + } + + // Wait for the work to finish. + drop(self.port.recv()); + self.work_count = 0; + + // Tell everyone to stop. + for worker in self.workers.iter() { + worker.chan.send(StopMsg) + } + + // Get our deques back. + for _ in range(0, self.workers.len()) { + match self.port.recv() { + ReturnDequeMsg(index, deque) => self.workers[index].deque = Some(deque), + FinishedMsg => fail!("unexpected finished message!"), + } + } + } + + pub fn shutdown(&mut self) { + for worker in self.workers.iter() { + worker.chan.send(ExitMsg) + } + } +} + |