diff options
author | Patrick Walton <pcwalton@mimiga.net> | 2014-01-23 13:43:03 -0800 |
---|---|---|
committer | Patrick Walton <pcwalton@mimiga.net> | 2014-01-24 20:50:30 -0800 |
commit | 18a2050a64cd6f320cc59cb490a69b0e895f11d3 (patch) | |
tree | 0a27b979054bded5489300728f0ebcb6f5e4ed8d /src/components/main/layout/parallel.rs | |
parent | 86c29d253a6ffada3488cb08d0154d8901ec252e (diff) | |
download | servo-18a2050a64cd6f320cc59cb490a69b0e895f11d3.tar.gz servo-18a2050a64cd6f320cc59cb490a69b0e895f11d3.zip |
layout: Port parallel layout over to a generic "work queue"
infrastructure.
The work queue accepts abstract generic "work units", which in this case
are layout operations. The same speedups have been observed.
Diffstat (limited to 'src/components/main/layout/parallel.rs')
-rw-r--r-- | src/components/main/layout/parallel.rs | 352 |
1 files changed, 80 insertions, 272 deletions
diff --git a/src/components/main/layout/parallel.rs b/src/components/main/layout/parallel.rs index 480ec886d8f..b38e6abc9b5 100644 --- a/src/components/main/layout/parallel.rs +++ b/src/components/main/layout/parallel.rs @@ -4,44 +4,29 @@ //! Implements parallel traversals over the flow tree. -use layout::context::{LayoutContext, SharedLayoutInfo}; -use layout::flow::{Flow, PostorderFlowTraversal}; +use layout::context::LayoutContext; +use layout::flow::{Flow, LeafSet, PostorderFlowTraversal}; use layout::flow; use layout::layout_task::{AssignHeightsAndStoreOverflowTraversal, BubbleWidthsTraversal}; -use gfx::font_context::{FontContext, FontContextInfo}; -use native; +use extra::arc::MutexArc; use servo_util::time::{ProfilerChan, profile}; use servo_util::time; +use servo_util::workqueue::{WorkQueue, WorkUnit, WorkerProxy}; use std::cast; -use std::comm::SharedChan; -use std::libc::c_void; -use std::ptr; use std::sync::atomics::{AtomicInt, Relaxed, SeqCst}; -use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker}; -use std::util; -enum WorkerMsg { - /// Tells the worker to start a traversal. - StartMsg(Worker<UnsafeFlow>, TraversalKind, SharedLayoutInfo), - - /// Tells the worker to stop. It can be restarted again with a `StartMsg`. - StopMsg, - - /// Tells the worker thread to terminate. - ExitMsg, +pub enum TraversalKind { + BubbleWidthsTraversalKind, + AssignHeightsAndStoreOverflowTraversalKind, } -enum SupervisorMsg { - /// Sent once the last flow is processed. - FinishedMsg, +pub type UnsafeFlow = (uint, uint); - /// Returns the deque to the supervisor. - ReturnDequeMsg(uint, Worker<UnsafeFlow>), +fn null_unsafe_flow() -> UnsafeFlow { + (0, 0) } -pub type UnsafeFlow = (*c_void, *c_void); - pub fn owned_flow_to_unsafe_flow(flow: *~Flow) -> UnsafeFlow { unsafe { cast::transmute_copy(&*flow) @@ -54,10 +39,6 @@ pub fn mut_owned_flow_to_unsafe_flow(flow: *mut ~Flow) -> UnsafeFlow { } } -fn null_unsafe_flow() -> UnsafeFlow { - (ptr::null(), ptr::null()) -} - /// Information that we need stored in each flow. pub struct FlowParallelInfo { /// The number of children that still need work done. @@ -75,270 +56,97 @@ impl FlowParallelInfo { } } -/// Information that the supervisor thread keeps about the worker threads. -struct WorkerInfo { - /// The communication channel to the workers. - chan: Chan<WorkerMsg>, - /// The buffer pool for this deque. - pool: BufferPool<UnsafeFlow>, - /// The worker end of the deque, if we have it. - deque: Option<Worker<UnsafeFlow>>, - /// The thief end of the work-stealing deque. - thief: Stealer<UnsafeFlow>, -} - -/// Information that each worker needs to do its job. -struct PostorderWorker { - /// The font context. - font_context: Option<~FontContext>, - /// Communications for the worker. - comm: PostorderWorkerComm, -} - -/// Communication channels for postorder workers. -struct PostorderWorkerComm { - /// The index of this worker. - index: uint, - /// The communication port from the supervisor. - port: Port<WorkerMsg>, - /// The communication channel to the supervisor. - chan: SharedChan<SupervisorMsg>, - /// The thief end of the work-stealing deque for all other workers. - other_deques: ~[Stealer<UnsafeFlow>], -} - -/// The type of traversal we're performing. -pub enum TraversalKind { - BubbleWidthsTraversalKind, - AssignHeightsAndStoreOverflowTraversalKind, -} - -impl PostorderWorker { - /// Starts up the worker and listens for messages. - pub fn start(&mut self) { +/// A parallel bottom-up flow traversal. +trait ParallelPostorderFlowTraversal : PostorderFlowTraversal { + fn run_parallel(&mut self, mut unsafe_flow: UnsafeFlow) { loop { - // Wait for a start message. - let (mut deque, kind, shared_layout_info) = match self.comm.port.recv() { - StopMsg => fail!("unexpected stop message"), - StartMsg(deque, kind, shared_layout_info) => (deque, kind, shared_layout_info), - ExitMsg => return, - }; + unsafe { + // Get a real flow. + let flow: &mut ~Flow = cast::transmute(&unsafe_flow); - // Set up our traversal context. - let mut traversal = LayoutContext { - shared: shared_layout_info, - font_ctx: self.font_context.take_unwrap(), - }; - - // And we're off! - 'outer: loop { - let unsafe_flow; - match deque.pop() { - Some(the_flow) => unsafe_flow = the_flow, - None => { - // Become a thief. - let mut i = 0; - loop { - if self.comm.other_deques.len() != 0 { - match self.comm.other_deques[i].steal() { - Empty => { - // Try the next one. - i += 1; - if i >= self.comm.other_deques.len() { - i = 0 - } - } - Abort => { - // Continue. - } - Data(the_flow) => { - unsafe_flow = the_flow; - break - } - } - } - - if i == 0 { - match self.comm.port.try_recv() { - Some(StopMsg) => break 'outer, - Some(ExitMsg) => return, - Some(_) => fail!("unexpected message!"), - None => {} - } - } - } - } + // Perform the appropriate traversal. + if self.should_process(*flow) { + self.process(*flow); } - // OK, we've got some data. The rest of this is unsafe code. - unsafe { - // Get a real flow. - let flow: &mut ~Flow = cast::transmute(&unsafe_flow); - - // Perform the appropriate traversal. - match kind { - BubbleWidthsTraversalKind => { - let mut traversal = BubbleWidthsTraversal(&mut traversal); - if traversal.should_process(*flow) { - traversal.process(*flow); - } - } - AssignHeightsAndStoreOverflowTraversalKind => { - let mut traversal = - AssignHeightsAndStoreOverflowTraversal(&mut traversal); - if traversal.should_process(*flow) { - traversal.process(*flow); - } - } - } + let base = flow::mut_base(*flow); - let base = flow::mut_base(*flow); + // Reset the count of children for the next layout traversal. + base.parallel.children_count.store(base.children.len() as int, Relaxed); - // Reset the count of children for the next layout traversal. - base.parallel.children_count.store(base.children.len() as int, Relaxed); + // Possibly enqueue the parent. + let unsafe_parent = base.parallel.parent; + if unsafe_parent == null_unsafe_flow() { + // We're done! + break + } - // Possibly enqueue the parent. - let unsafe_parent = base.parallel.parent; - if unsafe_parent == null_unsafe_flow() { - // We're done! - self.comm.chan.send(FinishedMsg); - } else { - // No, we're not at the root yet. Then are we the last sibling of our - // parent? If so, we can enqueue our parent; otherwise, we've gotta wait. - let parent: &mut ~Flow = cast::transmute(&unsafe_parent); - let parent_base = flow::mut_base(*parent); - if parent_base.parallel.children_count.fetch_sub(1, SeqCst) == 1 { - // We were the last child of our parent. Enqueue the parent. - deque.push(unsafe_parent) - } - } + // No, we're not at the root yet. Then are we the last sibling of our parent? If + // so, we can continue on with our parent; otherwise, we've gotta wait. + let parent: &mut ~Flow = cast::transmute(&unsafe_parent); + let parent_base = flow::mut_base(*parent); + if parent_base.parallel.children_count.fetch_sub(1, SeqCst) == 1 { + // We were the last child of our parent. Reflow our parent. + unsafe_flow = unsafe_parent + } else { + // Stop. + break } } - - // Destroy the traversal and save the font context. - let LayoutContext { - font_ctx: font_context, - .. - } = traversal; - self.font_context = Some(font_context); - - // Give the deque back to the supervisor. - self.comm.chan.send(ReturnDequeMsg(self.comm.index, deque)) } } } -/// A parallel bottom-up traversal. -pub struct ParallelPostorderFlowTraversal { - /// Information about each of the workers. - workers: ~[WorkerInfo], - /// A port on which information can be received from the workers. - port: Port<SupervisorMsg>, -} +impl<'a> ParallelPostorderFlowTraversal for BubbleWidthsTraversal<'a> {} -impl ParallelPostorderFlowTraversal { - pub fn new(font_context_info: FontContextInfo, thread_count: uint) - -> ParallelPostorderFlowTraversal { - let (supervisor_port, supervisor_chan) = SharedChan::new(); - let (mut infos, mut comms) = (~[], ~[]); - for i in range(0, thread_count) { - let (worker_port, worker_chan) = Chan::new(); - let mut pool = BufferPool::new(); - let (worker, thief) = pool.deque(); - infos.push(WorkerInfo { - chan: worker_chan, - pool: pool, - deque: Some(worker), - thief: thief, - }); - comms.push(PostorderWorkerComm { - index: i, - port: worker_port, - chan: supervisor_chan.clone(), - other_deques: ~[], - }); - } +impl<'a> ParallelPostorderFlowTraversal for AssignHeightsAndStoreOverflowTraversal<'a> {} - for i in range(0, thread_count) { - for j in range(0, thread_count) { - if i != j { - comms[i].other_deques.push(infos[j].thief.clone()) - } - } - assert!(comms[i].other_deques.len() == thread_count - 1) - } - - for comm in comms.move_iter() { - let font_context_info = font_context_info.clone(); - native::task::spawn(proc() { - let mut worker = PostorderWorker { - font_context: Some(~FontContext::new(font_context_info)), - comm: comm, - }; - worker.start() - }) - } +fn bubble_widths(unsafe_flow: UnsafeFlow, proxy: &mut WorkerProxy<*mut LayoutContext,UnsafeFlow>) { + let layout_context: &mut LayoutContext = unsafe { + cast::transmute(*proxy.user_data()) + }; + let mut bubble_widths_traversal = BubbleWidthsTraversal { + layout_context: layout_context, + }; + bubble_widths_traversal.run_parallel(unsafe_flow) +} - ParallelPostorderFlowTraversal { - workers: infos, - port: supervisor_port, - } - } +fn assign_heights_and_store_overflow(unsafe_flow: UnsafeFlow, + proxy: &mut WorkerProxy<*mut LayoutContext,UnsafeFlow>) { + let layout_context: &mut LayoutContext = unsafe { + cast::transmute(*proxy.user_data()) + }; + let mut assign_heights_traversal = AssignHeightsAndStoreOverflowTraversal { + layout_context: layout_context, + }; + assign_heights_traversal.run_parallel(unsafe_flow) +} - /// TODO(pcwalton): This could be parallelized. - fn warmup(&mut self, layout_context: &mut LayoutContext) { - layout_context.shared.leaf_set.access(|leaf_set| { - for &flow in leaf_set.iter() { - match self.workers[0].deque { - None => fail!("no deque!"), - Some(ref mut deque) => { - deque.push(flow); - } - } - } - }); +pub fn traverse_flow_tree(kind: TraversalKind, + leaf_set: &MutexArc<LeafSet>, + profiler_chan: ProfilerChan, + layout_context: &mut LayoutContext, + queue: &mut WorkQueue<*mut LayoutContext,UnsafeFlow>) { + unsafe { + queue.data = cast::transmute(layout_context) } - /// Traverses the given flow tree in parallel. - pub fn start(&mut self, - kind: TraversalKind, - layout_context: &mut LayoutContext, - profiler_chan: ProfilerChan) { - profile(time::LayoutParallelWarmupCategory, profiler_chan, || self.warmup(layout_context)); - - for worker in self.workers.mut_iter() { - worker.chan.send(StartMsg(util::replace(&mut worker.deque, None).unwrap(), - kind, - layout_context.shared.clone())) - } - - // Wait for them to finish. - let _ = self.port.recv(); - - // Tell everyone to stop. - for worker in self.workers.iter() { - worker.chan.send(StopMsg); - } + let fun = match kind { + BubbleWidthsTraversalKind => bubble_widths, + AssignHeightsAndStoreOverflowTraversalKind => assign_heights_and_store_overflow, + }; - // Get our deques back. - // - // TODO(pcwalton): Might be able to get a little parallelism over multiple traversals by - // doing this lazily. - for _ in range(0, self.workers.len()) { - match self.port.recv() { - ReturnDequeMsg(returned_deque_index, returned_deque) => { - self.workers[returned_deque_index].deque = Some(returned_deque) - } - _ => fail!("unexpected message received during return queue phase"), + profile(time::LayoutParallelWarmupCategory, profiler_chan, || { + leaf_set.access(|leaf_set| { + for &flow in leaf_set.iter() { + queue.push(WorkUnit { + fun: fun, + data: flow, + }) } - } - } + }) + }); - /// Shuts down all the worker threads. - pub fn shutdown(&mut self) { - for worker in self.workers.iter() { - worker.chan.send(ExitMsg) - } - } + queue.run() } |