diff options
-rw-r--r-- | components/net/image_cache_task.rs | 46 | ||||
-rw-r--r-- | components/util/lib.rs | 1 | ||||
-rw-r--r-- | components/util/taskpool.rs | 53 | ||||
-rw-r--r-- | src/lib.rs | 7 |
4 files changed, 84 insertions, 23 deletions
diff --git a/components/net/image_cache_task.rs b/components/net/image_cache_task.rs index e5fbf856fe7..878d6420c24 100644 --- a/components/net/image_cache_task.rs +++ b/components/net/image_cache_task.rs @@ -6,6 +6,7 @@ use image::base::{Image, load_from_memory}; use resource_task; use resource_task::{LoadData, ResourceTask}; +use servo_util::taskpool::TaskPool; use std::comm::{channel, Receiver, Sender}; use std::collections::hashmap::HashMap; use std::mem::replace; @@ -79,7 +80,7 @@ impl<E, S: Encoder<E>> Encodable<S, E> for ImageCacheTask { type DecoderFactory = fn() -> (proc(&[u8]) : 'static -> Option<Image>); impl ImageCacheTask { - pub fn new(resource_task: ResourceTask) -> ImageCacheTask { + pub fn new(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask { let (chan, port) = channel(); let chan_clone = chan.clone(); @@ -90,7 +91,8 @@ impl ImageCacheTask { chan: chan_clone, state_map: HashMap::new(), wait_map: HashMap::new(), - need_exit: None + need_exit: None, + task_pool: task_pool, }; cache.run(); }); @@ -100,11 +102,11 @@ impl ImageCacheTask { } } - pub fn new_sync(resource_task: ResourceTask) -> ImageCacheTask { + pub fn new_sync(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask { let (chan, port) = channel(); spawn(proc() { - let inner_cache = ImageCacheTask::new(resource_task); + let inner_cache = ImageCacheTask::new(resource_task, task_pool); loop { let msg: Msg = port.recv(); @@ -140,6 +142,7 @@ struct ImageCache { /// List of clients waiting on a WaitForImage response wait_map: HashMap<Url, Arc<Mutex<Vec<Sender<ImageResponseMsg>>>>>, need_exit: Option<Sender<()>>, + task_pool: TaskPool, } #[deriving(Clone)] @@ -314,7 +317,7 @@ impl ImageCache { let to_cache = self.chan.clone(); let url_clone = url.clone(); - spawn(proc() { + self.task_pool.execute(proc() { let url = url_clone; debug!("image_cache_task: started image decode for {:s}", url.serialize()); let image = load_from_memory(data.as_slice()); @@ -493,6 +496,7 @@ mod tests { use resource_task; use resource_task::{ResourceTask, Metadata, start_sending}; use image::base::test_image_bin; + use servo_util::taskpool::TaskPool; use std::comm; use url::Url; @@ -581,7 +585,7 @@ mod tests { fn should_exit_on_request() { let mock_resource_task = mock_resource_task(box DoesNothing); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); @@ -592,7 +596,7 @@ mod tests { fn should_fail_if_unprefetched_image_is_requested() { let mock_resource_task = mock_resource_task(box DoesNothing); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); let (chan, port) = channel(); @@ -606,7 +610,7 @@ mod tests { let mock_resource_task = mock_resource_task(box JustSendOK { url_requested_chan: url_requested_chan}); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); image_cache_task.send(Prefetch(url)); @@ -621,7 +625,7 @@ mod tests { let mock_resource_task = mock_resource_task(box JustSendOK { url_requested_chan: url_requested_chan}); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); image_cache_task.send(Prefetch(url.clone())); @@ -641,7 +645,7 @@ mod tests { let mock_resource_task = mock_resource_task(box WaitSendTestImage{wait_port: wait_port}); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); image_cache_task.send(Prefetch(url.clone())); @@ -658,7 +662,7 @@ mod tests { fn should_return_decoded_image_data_if_data_has_arrived() { let mock_resource_task = mock_resource_task(box SendTestImage); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); let join_port = image_cache_task.wait_for_store(); @@ -684,7 +688,7 @@ mod tests { fn should_return_decoded_image_data_for_multiple_requests() { let mock_resource_task = mock_resource_task(box SendTestImage); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); let join_port = image_cache_task.wait_for_store(); @@ -732,7 +736,7 @@ mod tests { } }); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); image_cache_task.send(Prefetch(url.clone())); @@ -779,7 +783,7 @@ mod tests { } }); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); image_cache_task.send(Prefetch(url.clone())); @@ -808,7 +812,7 @@ mod tests { fn should_return_failed_if_image_bin_cannot_be_fetched() { let mock_resource_task = mock_resource_task(box SendTestImageErr); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); let join_port = image_cache_task.wait_for_store_prefetched(); @@ -834,7 +838,7 @@ mod tests { fn should_return_failed_for_multiple_get_image_requests_if_image_bin_cannot_be_fetched() { let mock_resource_task = mock_resource_task(box SendTestImageErr); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); let join_port = image_cache_task.wait_for_store_prefetched(); @@ -868,7 +872,7 @@ mod tests { fn should_return_failed_if_image_decode_fails() { let mock_resource_task = mock_resource_task(box SendBogusImage); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); let join_port = image_cache_task.wait_for_store(); @@ -896,7 +900,7 @@ mod tests { fn should_return_image_on_wait_if_image_is_already_loaded() { let mock_resource_task = mock_resource_task(box SendTestImage); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); let join_port = image_cache_task.wait_for_store(); @@ -924,7 +928,7 @@ mod tests { let mock_resource_task = mock_resource_task(box WaitSendTestImage {wait_port: wait_port}); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); image_cache_task.send(Prefetch(url.clone())); @@ -950,7 +954,7 @@ mod tests { let mock_resource_task = mock_resource_task(box WaitSendTestImageErr{wait_port: wait_port}); - let image_cache_task = ImageCacheTask::new(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); image_cache_task.send(Prefetch(url.clone())); @@ -974,7 +978,7 @@ mod tests { fn sync_cache_should_wait_for_images() { let mock_resource_task = mock_resource_task(box SendTestImage); - let image_cache_task = ImageCacheTask::new_sync(mock_resource_task.clone()); + let image_cache_task = ImageCacheTask::new_sync(mock_resource_task.clone(), TaskPool::new(4)); let url = Url::parse("file:///").unwrap(); image_cache_task.send(Prefetch(url.clone())); diff --git a/components/util/lib.rs b/components/util/lib.rs index f5fd08dc24e..57421954679 100644 --- a/components/util/lib.rs +++ b/components/util/lib.rs @@ -50,6 +50,7 @@ pub mod str; pub mod task; pub mod tid; pub mod time; +pub mod taskpool; pub mod vec; pub mod workqueue; diff --git a/components/util/taskpool.rs b/components/util/taskpool.rs new file mode 100644 index 00000000000..0a2fafb2ab3 --- /dev/null +++ b/components/util/taskpool.rs @@ -0,0 +1,53 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! A load-balancing task pool. +//! +//! This differs in implementation from std::sync::TaskPool in that each job is +//! up for grabs by any of the child tasks in the pool. +//! + +// +// This is based on the cargo task pool. +// https://github.com/rust-lang/cargo/blob/master/src/cargo/util/pool.rs +// +// The only difference is that a normal channel is used instead of a sync_channel. +// + +use std::sync::{Arc, Mutex}; + +pub struct TaskPool { + tx: Sender<proc():Send>, +} + +impl TaskPool { + pub fn new(tasks: uint) -> TaskPool { + assert!(tasks > 0); + let (tx, rx) = channel(); + + let state = Arc::new(Mutex::new(rx)); + + for _ in range(0, tasks) { + let state = state.clone(); + spawn(proc() worker(&*state)); + } + + return TaskPool { tx: tx }; + + fn worker(rx: &Mutex<Receiver<proc():Send>>) { + loop { + let job = rx.lock().recv_opt(); + match job { + Ok(job) => job(), + Err(..) => break, + } + } + } + } + + pub fn execute(&self, job: proc():Send) { + self.tx.send(job); + } +} + diff --git a/src/lib.rs b/src/lib.rs index 132d0314558..b745621517b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,8 @@ use servo_util::time::TimeProfiler; use servo_util::memory::MemoryProfiler; #[cfg(not(test))] use servo_util::opts; +#[cfg(not(test))] +use servo_util::taskpool::TaskPool; #[cfg(not(test))] use green::GreenTaskBuilder; @@ -79,6 +81,7 @@ pub fn run<Window: WindowMethods>(opts: opts::Opts, window: Option<Rc<Window>>) let opts_clone = opts.clone(); let time_profiler_chan_clone = time_profiler_chan.clone(); + let shared_task_pool = TaskPool::new(8); let (result_chan, result_port) = channel(); TaskBuilder::new() @@ -91,9 +94,9 @@ pub fn run<Window: WindowMethods>(opts: opts::Opts, window: Option<Rc<Window>>) // image load or we risk emitting an output file missing the // image. let image_cache_task = if opts.output_file.is_some() { - ImageCacheTask::new_sync(resource_task.clone()) + ImageCacheTask::new_sync(resource_task.clone(), shared_task_pool) } else { - ImageCacheTask::new(resource_task.clone()) + ImageCacheTask::new(resource_task.clone(), shared_task_pool) }; let font_cache_task = FontCacheTask::new(resource_task.clone()); let constellation_chan = Constellation::<layout::layout_task::LayoutTask, |