aboutsummaryrefslogtreecommitdiffstats
path: root/components/net/resource_thread.rs
diff options
context:
space:
mode:
Diffstat (limited to 'components/net/resource_thread.rs')
-rw-r--r--components/net/resource_thread.rs151
1 files changed, 143 insertions, 8 deletions
diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs
index bdbc815baa4..0bf17e7570f 100644
--- a/components/net/resource_thread.rs
+++ b/components/net/resource_thread.rs
@@ -46,6 +46,7 @@ use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
+use std::time::Duration;
/// Returns a tuple of (public, private) senders to the new threads.
pub fn new_resource_threads(
@@ -345,6 +346,7 @@ impl ResourceChannelManager {
Err(_) => warn!("Error writing hsts list to disk"),
}
}
+ self.resource_manager.exit();
let _ = sender.send(());
return false;
},
@@ -429,10 +431,135 @@ pub struct CoreResourceManager {
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
swmanager_chan: Option<IpcSender<CustomResponseMediator>>,
filemanager: FileManager,
- fetch_pool: rayon::ThreadPool,
+ thread_pool: Arc<CoreResourceThreadPool>,
certificate_path: Option<String>,
}
+/// The state of the thread-pool used by CoreResource.
+struct ThreadPoolState {
+ /// The number of active workers.
+ active_workers: u32,
+ /// Whether the pool can spawn additional work.
+ active: bool,
+}
+
+impl ThreadPoolState {
+ pub fn new() -> ThreadPoolState {
+ ThreadPoolState {
+ active_workers: 0,
+ active: true,
+ }
+ }
+
+ /// Is the pool still able to spawn new work?
+ pub fn is_active(&self) -> bool {
+ self.active
+ }
+
+ /// How many workers are currently active?
+ pub fn active_workers(&self) -> u32 {
+ self.active_workers
+ }
+
+ /// Prevent additional work from being spawned.
+ pub fn switch_to_inactive(&mut self) {
+ self.active = false;
+ }
+
+ /// Add to the count of active workers.
+ pub fn increment_active(&mut self) {
+ self.active_workers += 1;
+ }
+
+ /// Substract from the count of active workers.
+ pub fn decrement_active(&mut self) {
+ self.active_workers -= 1;
+ }
+}
+
+/// Threadpool used by Fetch and file operations.
+pub struct CoreResourceThreadPool {
+ pool: rayon::ThreadPool,
+ state: Arc<Mutex<ThreadPoolState>>,
+}
+
+impl CoreResourceThreadPool {
+ pub fn new(num_threads: usize) -> CoreResourceThreadPool {
+ let pool = rayon::ThreadPoolBuilder::new()
+ .num_threads(num_threads)
+ .build()
+ .unwrap();
+ let state = Arc::new(Mutex::new(ThreadPoolState::new()));
+ CoreResourceThreadPool { pool: pool, state }
+ }
+
+ /// Spawn work on the thread-pool, if still active.
+ ///
+ /// There is no need to give feedback to the caller,
+ /// because if we do not perform work,
+ /// it is because the system as a whole is exiting.
+ pub fn spawn<OP>(&self, work: OP)
+ where
+ OP: FnOnce() + Send + 'static,
+ {
+ {
+ let mut state = self.state.lock().unwrap();
+ if state.is_active() {
+ state.increment_active();
+ } else {
+ // Don't spawn any work.
+ return;
+ }
+ }
+
+ let state = self.state.clone();
+
+ self.pool.spawn(move || {
+ {
+ let mut state = state.lock().unwrap();
+ if !state.is_active() {
+ // Decrement number of active workers and return,
+ // without doing any work.
+ return state.decrement_active();
+ }
+ }
+ // Perform work.
+ work();
+ {
+ // Decrement number of active workers.
+ let mut state = state.lock().unwrap();
+ state.decrement_active();
+ }
+ });
+ }
+
+ /// Prevent further work from being spawned,
+ /// and wait until all workers are done,
+ /// or a timeout of roughly one second has been reached.
+ pub fn exit(&self) {
+ {
+ let mut state = self.state.lock().unwrap();
+ state.switch_to_inactive();
+ }
+ let mut rounds = 0;
+ loop {
+ rounds += 1;
+ {
+ let state = self.state.lock().unwrap();
+ let still_active = state.active_workers();
+
+ if still_active == 0 || rounds == 10 {
+ if still_active > 0 {
+ debug!("Exiting CoreResourceThreadPool with {:?} still working(should be zero)", still_active);
+ }
+ break;
+ }
+ }
+ thread::sleep(Duration::from_millis(100));
+ }
+ }
+}
+
impl CoreResourceManager {
pub fn new(
user_agent: Cow<'static, str>,
@@ -441,20 +568,28 @@ impl CoreResourceManager {
embedder_proxy: EmbedderProxy,
certificate_path: Option<String>,
) -> CoreResourceManager {
- let pool = rayon::ThreadPoolBuilder::new()
- .num_threads(16)
- .build()
- .unwrap();
+ let pool = CoreResourceThreadPool::new(16);
+ let pool_handle = Arc::new(pool);
CoreResourceManager {
user_agent: user_agent,
devtools_chan: devtools_channel,
swmanager_chan: None,
- filemanager: FileManager::new(embedder_proxy),
- fetch_pool: pool,
+ filemanager: FileManager::new(embedder_proxy, Arc::downgrade(&pool_handle)),
+ thread_pool: pool_handle,
certificate_path,
}
}
+ /// Exit the core resource manager.
+ pub fn exit(&mut self) {
+ // Prevents further work from being spawned on the pool,
+ // blocks until all workers in the pool are done,
+ // or a short timeout has been reached.
+ self.thread_pool.exit();
+
+ debug!("Exited CoreResourceManager");
+ }
+
fn set_cookie_for_url(
&mut self,
request: &ServoUrl,
@@ -486,7 +621,7 @@ impl CoreResourceManager {
_ => ResourceTimingType::Resource,
};
- self.fetch_pool.spawn(move || {
+ self.thread_pool.spawn(move || {
let mut request = request_builder.build();
// XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed)
// todo load context / mimesniff in fetch