aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--components/net/image_cache_task.rs46
-rw-r--r--components/util/lib.rs1
-rw-r--r--components/util/taskpool.rs53
-rw-r--r--src/lib.rs7
4 files changed, 84 insertions, 23 deletions
diff --git a/components/net/image_cache_task.rs b/components/net/image_cache_task.rs
index e5fbf856fe7..878d6420c24 100644
--- a/components/net/image_cache_task.rs
+++ b/components/net/image_cache_task.rs
@@ -6,6 +6,7 @@ use image::base::{Image, load_from_memory};
use resource_task;
use resource_task::{LoadData, ResourceTask};
+use servo_util::taskpool::TaskPool;
use std::comm::{channel, Receiver, Sender};
use std::collections::hashmap::HashMap;
use std::mem::replace;
@@ -79,7 +80,7 @@ impl<E, S: Encoder<E>> Encodable<S, E> for ImageCacheTask {
type DecoderFactory = fn() -> (proc(&[u8]) : 'static -> Option<Image>);
impl ImageCacheTask {
- pub fn new(resource_task: ResourceTask) -> ImageCacheTask {
+ pub fn new(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask {
let (chan, port) = channel();
let chan_clone = chan.clone();
@@ -90,7 +91,8 @@ impl ImageCacheTask {
chan: chan_clone,
state_map: HashMap::new(),
wait_map: HashMap::new(),
- need_exit: None
+ need_exit: None,
+ task_pool: task_pool,
};
cache.run();
});
@@ -100,11 +102,11 @@ impl ImageCacheTask {
}
}
- pub fn new_sync(resource_task: ResourceTask) -> ImageCacheTask {
+ pub fn new_sync(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask {
let (chan, port) = channel();
spawn(proc() {
- let inner_cache = ImageCacheTask::new(resource_task);
+ let inner_cache = ImageCacheTask::new(resource_task, task_pool);
loop {
let msg: Msg = port.recv();
@@ -140,6 +142,7 @@ struct ImageCache {
/// List of clients waiting on a WaitForImage response
wait_map: HashMap<Url, Arc<Mutex<Vec<Sender<ImageResponseMsg>>>>>,
need_exit: Option<Sender<()>>,
+ task_pool: TaskPool,
}
#[deriving(Clone)]
@@ -314,7 +317,7 @@ impl ImageCache {
let to_cache = self.chan.clone();
let url_clone = url.clone();
- spawn(proc() {
+ self.task_pool.execute(proc() {
let url = url_clone;
debug!("image_cache_task: started image decode for {:s}", url.serialize());
let image = load_from_memory(data.as_slice());
@@ -493,6 +496,7 @@ mod tests {
use resource_task;
use resource_task::{ResourceTask, Metadata, start_sending};
use image::base::test_image_bin;
+ use servo_util::taskpool::TaskPool;
use std::comm;
use url::Url;
@@ -581,7 +585,7 @@ mod tests {
fn should_exit_on_request() {
let mock_resource_task = mock_resource_task(box DoesNothing);
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
@@ -592,7 +596,7 @@ mod tests {
fn should_fail_if_unprefetched_image_is_requested() {
let mock_resource_task = mock_resource_task(box DoesNothing);
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let (chan, port) = channel();
@@ -606,7 +610,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box JustSendOK { url_requested_chan: url_requested_chan});
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url));
@@ -621,7 +625,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box JustSendOK { url_requested_chan: url_requested_chan});
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@@ -641,7 +645,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box WaitSendTestImage{wait_port: wait_port});
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@@ -658,7 +662,7 @@ mod tests {
fn should_return_decoded_image_data_if_data_has_arrived() {
let mock_resource_task = mock_resource_task(box SendTestImage);
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store();
@@ -684,7 +688,7 @@ mod tests {
fn should_return_decoded_image_data_for_multiple_requests() {
let mock_resource_task = mock_resource_task(box SendTestImage);
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store();
@@ -732,7 +736,7 @@ mod tests {
}
});
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@@ -779,7 +783,7 @@ mod tests {
}
});
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@@ -808,7 +812,7 @@ mod tests {
fn should_return_failed_if_image_bin_cannot_be_fetched() {
let mock_resource_task = mock_resource_task(box SendTestImageErr);
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store_prefetched();
@@ -834,7 +838,7 @@ mod tests {
fn should_return_failed_for_multiple_get_image_requests_if_image_bin_cannot_be_fetched() {
let mock_resource_task = mock_resource_task(box SendTestImageErr);
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store_prefetched();
@@ -868,7 +872,7 @@ mod tests {
fn should_return_failed_if_image_decode_fails() {
let mock_resource_task = mock_resource_task(box SendBogusImage);
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store();
@@ -896,7 +900,7 @@ mod tests {
fn should_return_image_on_wait_if_image_is_already_loaded() {
let mock_resource_task = mock_resource_task(box SendTestImage);
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store();
@@ -924,7 +928,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box WaitSendTestImage {wait_port: wait_port});
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@@ -950,7 +954,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box WaitSendTestImageErr{wait_port: wait_port});
- let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@@ -974,7 +978,7 @@ mod tests {
fn sync_cache_should_wait_for_images() {
let mock_resource_task = mock_resource_task(box SendTestImage);
- let image_cache_task = ImageCacheTask::new_sync(mock_resource_task.clone());
+ let image_cache_task = ImageCacheTask::new_sync(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
diff --git a/components/util/lib.rs b/components/util/lib.rs
index f5fd08dc24e..57421954679 100644
--- a/components/util/lib.rs
+++ b/components/util/lib.rs
@@ -50,6 +50,7 @@ pub mod str;
pub mod task;
pub mod tid;
pub mod time;
+pub mod taskpool;
pub mod vec;
pub mod workqueue;
diff --git a/components/util/taskpool.rs b/components/util/taskpool.rs
new file mode 100644
index 00000000000..0a2fafb2ab3
--- /dev/null
+++ b/components/util/taskpool.rs
@@ -0,0 +1,53 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! A load-balancing task pool.
+//!
+//! This differs in implementation from std::sync::TaskPool in that each job is
+//! up for grabs by any of the child tasks in the pool.
+//!
+
+//
+// This is based on the cargo task pool.
+// https://github.com/rust-lang/cargo/blob/master/src/cargo/util/pool.rs
+//
+// The only difference is that a normal channel is used instead of a sync_channel.
+//
+
+use std::sync::{Arc, Mutex};
+
+pub struct TaskPool {
+ tx: Sender<proc():Send>,
+}
+
+impl TaskPool {
+ pub fn new(tasks: uint) -> TaskPool {
+ assert!(tasks > 0);
+ let (tx, rx) = channel();
+
+ let state = Arc::new(Mutex::new(rx));
+
+ for _ in range(0, tasks) {
+ let state = state.clone();
+ spawn(proc() worker(&*state));
+ }
+
+ return TaskPool { tx: tx };
+
+ fn worker(rx: &Mutex<Receiver<proc():Send>>) {
+ loop {
+ let job = rx.lock().recv_opt();
+ match job {
+ Ok(job) => job(),
+ Err(..) => break,
+ }
+ }
+ }
+ }
+
+ pub fn execute(&self, job: proc():Send) {
+ self.tx.send(job);
+ }
+}
+
diff --git a/src/lib.rs b/src/lib.rs
index 132d0314558..b745621517b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -51,6 +51,8 @@ use servo_util::time::TimeProfiler;
use servo_util::memory::MemoryProfiler;
#[cfg(not(test))]
use servo_util::opts;
+#[cfg(not(test))]
+use servo_util::taskpool::TaskPool;
#[cfg(not(test))]
use green::GreenTaskBuilder;
@@ -79,6 +81,7 @@ pub fn run<Window: WindowMethods>(opts: opts::Opts, window: Option<Rc<Window>>)
let opts_clone = opts.clone();
let time_profiler_chan_clone = time_profiler_chan.clone();
+ let shared_task_pool = TaskPool::new(8);
let (result_chan, result_port) = channel();
TaskBuilder::new()
@@ -91,9 +94,9 @@ pub fn run<Window: WindowMethods>(opts: opts::Opts, window: Option<Rc<Window>>)
// image load or we risk emitting an output file missing the
// image.
let image_cache_task = if opts.output_file.is_some() {
- ImageCacheTask::new_sync(resource_task.clone())
+ ImageCacheTask::new_sync(resource_task.clone(), shared_task_pool)
} else {
- ImageCacheTask::new(resource_task.clone())
+ ImageCacheTask::new(resource_task.clone(), shared_task_pool)
};
let font_cache_task = FontCacheTask::new(resource_task.clone());
let constellation_chan = Constellation::<layout::layout_task::LayoutTask,