aboutsummaryrefslogtreecommitdiffstats
path: root/src/components/main/layout/parallel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/main/layout/parallel.rs')
-rw-r--r--src/components/main/layout/parallel.rs352
1 files changed, 80 insertions, 272 deletions
diff --git a/src/components/main/layout/parallel.rs b/src/components/main/layout/parallel.rs
index 480ec886d8f..b38e6abc9b5 100644
--- a/src/components/main/layout/parallel.rs
+++ b/src/components/main/layout/parallel.rs
@@ -4,44 +4,29 @@
//! Implements parallel traversals over the flow tree.
-use layout::context::{LayoutContext, SharedLayoutInfo};
-use layout::flow::{Flow, PostorderFlowTraversal};
+use layout::context::LayoutContext;
+use layout::flow::{Flow, LeafSet, PostorderFlowTraversal};
use layout::flow;
use layout::layout_task::{AssignHeightsAndStoreOverflowTraversal, BubbleWidthsTraversal};
-use gfx::font_context::{FontContext, FontContextInfo};
-use native;
+use extra::arc::MutexArc;
use servo_util::time::{ProfilerChan, profile};
use servo_util::time;
+use servo_util::workqueue::{WorkQueue, WorkUnit, WorkerProxy};
use std::cast;
-use std::comm::SharedChan;
-use std::libc::c_void;
-use std::ptr;
use std::sync::atomics::{AtomicInt, Relaxed, SeqCst};
-use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
-use std::util;
-enum WorkerMsg {
- /// Tells the worker to start a traversal.
- StartMsg(Worker<UnsafeFlow>, TraversalKind, SharedLayoutInfo),
-
- /// Tells the worker to stop. It can be restarted again with a `StartMsg`.
- StopMsg,
-
- /// Tells the worker thread to terminate.
- ExitMsg,
+pub enum TraversalKind {
+ BubbleWidthsTraversalKind,
+ AssignHeightsAndStoreOverflowTraversalKind,
}
-enum SupervisorMsg {
- /// Sent once the last flow is processed.
- FinishedMsg,
+pub type UnsafeFlow = (uint, uint);
- /// Returns the deque to the supervisor.
- ReturnDequeMsg(uint, Worker<UnsafeFlow>),
+fn null_unsafe_flow() -> UnsafeFlow {
+ (0, 0)
}
-pub type UnsafeFlow = (*c_void, *c_void);
-
pub fn owned_flow_to_unsafe_flow(flow: *~Flow) -> UnsafeFlow {
unsafe {
cast::transmute_copy(&*flow)
@@ -54,10 +39,6 @@ pub fn mut_owned_flow_to_unsafe_flow(flow: *mut ~Flow) -> UnsafeFlow {
}
}
-fn null_unsafe_flow() -> UnsafeFlow {
- (ptr::null(), ptr::null())
-}
-
/// Information that we need stored in each flow.
pub struct FlowParallelInfo {
/// The number of children that still need work done.
@@ -75,270 +56,97 @@ impl FlowParallelInfo {
}
}
-/// Information that the supervisor thread keeps about the worker threads.
-struct WorkerInfo {
- /// The communication channel to the workers.
- chan: Chan<WorkerMsg>,
- /// The buffer pool for this deque.
- pool: BufferPool<UnsafeFlow>,
- /// The worker end of the deque, if we have it.
- deque: Option<Worker<UnsafeFlow>>,
- /// The thief end of the work-stealing deque.
- thief: Stealer<UnsafeFlow>,
-}
-
-/// Information that each worker needs to do its job.
-struct PostorderWorker {
- /// The font context.
- font_context: Option<~FontContext>,
- /// Communications for the worker.
- comm: PostorderWorkerComm,
-}
-
-/// Communication channels for postorder workers.
-struct PostorderWorkerComm {
- /// The index of this worker.
- index: uint,
- /// The communication port from the supervisor.
- port: Port<WorkerMsg>,
- /// The communication channel to the supervisor.
- chan: SharedChan<SupervisorMsg>,
- /// The thief end of the work-stealing deque for all other workers.
- other_deques: ~[Stealer<UnsafeFlow>],
-}
-
-/// The type of traversal we're performing.
-pub enum TraversalKind {
- BubbleWidthsTraversalKind,
- AssignHeightsAndStoreOverflowTraversalKind,
-}
-
-impl PostorderWorker {
- /// Starts up the worker and listens for messages.
- pub fn start(&mut self) {
+/// A parallel bottom-up flow traversal.
+trait ParallelPostorderFlowTraversal : PostorderFlowTraversal {
+ fn run_parallel(&mut self, mut unsafe_flow: UnsafeFlow) {
loop {
- // Wait for a start message.
- let (mut deque, kind, shared_layout_info) = match self.comm.port.recv() {
- StopMsg => fail!("unexpected stop message"),
- StartMsg(deque, kind, shared_layout_info) => (deque, kind, shared_layout_info),
- ExitMsg => return,
- };
+ unsafe {
+ // Get a real flow.
+ let flow: &mut ~Flow = cast::transmute(&unsafe_flow);
- // Set up our traversal context.
- let mut traversal = LayoutContext {
- shared: shared_layout_info,
- font_ctx: self.font_context.take_unwrap(),
- };
-
- // And we're off!
- 'outer: loop {
- let unsafe_flow;
- match deque.pop() {
- Some(the_flow) => unsafe_flow = the_flow,
- None => {
- // Become a thief.
- let mut i = 0;
- loop {
- if self.comm.other_deques.len() != 0 {
- match self.comm.other_deques[i].steal() {
- Empty => {
- // Try the next one.
- i += 1;
- if i >= self.comm.other_deques.len() {
- i = 0
- }
- }
- Abort => {
- // Continue.
- }
- Data(the_flow) => {
- unsafe_flow = the_flow;
- break
- }
- }
- }
-
- if i == 0 {
- match self.comm.port.try_recv() {
- Some(StopMsg) => break 'outer,
- Some(ExitMsg) => return,
- Some(_) => fail!("unexpected message!"),
- None => {}
- }
- }
- }
- }
+ // Perform the appropriate traversal.
+ if self.should_process(*flow) {
+ self.process(*flow);
}
- // OK, we've got some data. The rest of this is unsafe code.
- unsafe {
- // Get a real flow.
- let flow: &mut ~Flow = cast::transmute(&unsafe_flow);
-
- // Perform the appropriate traversal.
- match kind {
- BubbleWidthsTraversalKind => {
- let mut traversal = BubbleWidthsTraversal(&mut traversal);
- if traversal.should_process(*flow) {
- traversal.process(*flow);
- }
- }
- AssignHeightsAndStoreOverflowTraversalKind => {
- let mut traversal =
- AssignHeightsAndStoreOverflowTraversal(&mut traversal);
- if traversal.should_process(*flow) {
- traversal.process(*flow);
- }
- }
- }
+ let base = flow::mut_base(*flow);
- let base = flow::mut_base(*flow);
+ // Reset the count of children for the next layout traversal.
+ base.parallel.children_count.store(base.children.len() as int, Relaxed);
- // Reset the count of children for the next layout traversal.
- base.parallel.children_count.store(base.children.len() as int, Relaxed);
+ // Possibly enqueue the parent.
+ let unsafe_parent = base.parallel.parent;
+ if unsafe_parent == null_unsafe_flow() {
+ // We're done!
+ break
+ }
- // Possibly enqueue the parent.
- let unsafe_parent = base.parallel.parent;
- if unsafe_parent == null_unsafe_flow() {
- // We're done!
- self.comm.chan.send(FinishedMsg);
- } else {
- // No, we're not at the root yet. Then are we the last sibling of our
- // parent? If so, we can enqueue our parent; otherwise, we've gotta wait.
- let parent: &mut ~Flow = cast::transmute(&unsafe_parent);
- let parent_base = flow::mut_base(*parent);
- if parent_base.parallel.children_count.fetch_sub(1, SeqCst) == 1 {
- // We were the last child of our parent. Enqueue the parent.
- deque.push(unsafe_parent)
- }
- }
+ // No, we're not at the root yet. Then are we the last sibling of our parent? If
+ // so, we can continue on with our parent; otherwise, we've gotta wait.
+ let parent: &mut ~Flow = cast::transmute(&unsafe_parent);
+ let parent_base = flow::mut_base(*parent);
+ if parent_base.parallel.children_count.fetch_sub(1, SeqCst) == 1 {
+ // We were the last child of our parent. Reflow our parent.
+ unsafe_flow = unsafe_parent
+ } else {
+ // Stop.
+ break
}
}
-
- // Destroy the traversal and save the font context.
- let LayoutContext {
- font_ctx: font_context,
- ..
- } = traversal;
- self.font_context = Some(font_context);
-
- // Give the deque back to the supervisor.
- self.comm.chan.send(ReturnDequeMsg(self.comm.index, deque))
}
}
}
-/// A parallel bottom-up traversal.
-pub struct ParallelPostorderFlowTraversal {
- /// Information about each of the workers.
- workers: ~[WorkerInfo],
- /// A port on which information can be received from the workers.
- port: Port<SupervisorMsg>,
-}
+impl<'a> ParallelPostorderFlowTraversal for BubbleWidthsTraversal<'a> {}
-impl ParallelPostorderFlowTraversal {
- pub fn new(font_context_info: FontContextInfo, thread_count: uint)
- -> ParallelPostorderFlowTraversal {
- let (supervisor_port, supervisor_chan) = SharedChan::new();
- let (mut infos, mut comms) = (~[], ~[]);
- for i in range(0, thread_count) {
- let (worker_port, worker_chan) = Chan::new();
- let mut pool = BufferPool::new();
- let (worker, thief) = pool.deque();
- infos.push(WorkerInfo {
- chan: worker_chan,
- pool: pool,
- deque: Some(worker),
- thief: thief,
- });
- comms.push(PostorderWorkerComm {
- index: i,
- port: worker_port,
- chan: supervisor_chan.clone(),
- other_deques: ~[],
- });
- }
+impl<'a> ParallelPostorderFlowTraversal for AssignHeightsAndStoreOverflowTraversal<'a> {}
- for i in range(0, thread_count) {
- for j in range(0, thread_count) {
- if i != j {
- comms[i].other_deques.push(infos[j].thief.clone())
- }
- }
- assert!(comms[i].other_deques.len() == thread_count - 1)
- }
-
- for comm in comms.move_iter() {
- let font_context_info = font_context_info.clone();
- native::task::spawn(proc() {
- let mut worker = PostorderWorker {
- font_context: Some(~FontContext::new(font_context_info)),
- comm: comm,
- };
- worker.start()
- })
- }
+fn bubble_widths(unsafe_flow: UnsafeFlow, proxy: &mut WorkerProxy<*mut LayoutContext,UnsafeFlow>) {
+ let layout_context: &mut LayoutContext = unsafe {
+ cast::transmute(*proxy.user_data())
+ };
+ let mut bubble_widths_traversal = BubbleWidthsTraversal {
+ layout_context: layout_context,
+ };
+ bubble_widths_traversal.run_parallel(unsafe_flow)
+}
- ParallelPostorderFlowTraversal {
- workers: infos,
- port: supervisor_port,
- }
- }
+fn assign_heights_and_store_overflow(unsafe_flow: UnsafeFlow,
+ proxy: &mut WorkerProxy<*mut LayoutContext,UnsafeFlow>) {
+ let layout_context: &mut LayoutContext = unsafe {
+ cast::transmute(*proxy.user_data())
+ };
+ let mut assign_heights_traversal = AssignHeightsAndStoreOverflowTraversal {
+ layout_context: layout_context,
+ };
+ assign_heights_traversal.run_parallel(unsafe_flow)
+}
- /// TODO(pcwalton): This could be parallelized.
- fn warmup(&mut self, layout_context: &mut LayoutContext) {
- layout_context.shared.leaf_set.access(|leaf_set| {
- for &flow in leaf_set.iter() {
- match self.workers[0].deque {
- None => fail!("no deque!"),
- Some(ref mut deque) => {
- deque.push(flow);
- }
- }
- }
- });
+pub fn traverse_flow_tree(kind: TraversalKind,
+ leaf_set: &MutexArc<LeafSet>,
+ profiler_chan: ProfilerChan,
+ layout_context: &mut LayoutContext,
+ queue: &mut WorkQueue<*mut LayoutContext,UnsafeFlow>) {
+ unsafe {
+ queue.data = cast::transmute(layout_context)
}
- /// Traverses the given flow tree in parallel.
- pub fn start(&mut self,
- kind: TraversalKind,
- layout_context: &mut LayoutContext,
- profiler_chan: ProfilerChan) {
- profile(time::LayoutParallelWarmupCategory, profiler_chan, || self.warmup(layout_context));
-
- for worker in self.workers.mut_iter() {
- worker.chan.send(StartMsg(util::replace(&mut worker.deque, None).unwrap(),
- kind,
- layout_context.shared.clone()))
- }
-
- // Wait for them to finish.
- let _ = self.port.recv();
-
- // Tell everyone to stop.
- for worker in self.workers.iter() {
- worker.chan.send(StopMsg);
- }
+ let fun = match kind {
+ BubbleWidthsTraversalKind => bubble_widths,
+ AssignHeightsAndStoreOverflowTraversalKind => assign_heights_and_store_overflow,
+ };
- // Get our deques back.
- //
- // TODO(pcwalton): Might be able to get a little parallelism over multiple traversals by
- // doing this lazily.
- for _ in range(0, self.workers.len()) {
- match self.port.recv() {
- ReturnDequeMsg(returned_deque_index, returned_deque) => {
- self.workers[returned_deque_index].deque = Some(returned_deque)
- }
- _ => fail!("unexpected message received during return queue phase"),
+ profile(time::LayoutParallelWarmupCategory, profiler_chan, || {
+ leaf_set.access(|leaf_set| {
+ for &flow in leaf_set.iter() {
+ queue.push(WorkUnit {
+ fun: fun,
+ data: flow,
+ })
}
- }
- }
+ })
+ });
- /// Shuts down all the worker threads.
- pub fn shutdown(&mut self) {
- for worker in self.workers.iter() {
- worker.chan.send(ExitMsg)
- }
- }
+ queue.run()
}