diff options
Diffstat (limited to 'components/net/resource_thread.rs')
-rw-r--r-- | components/net/resource_thread.rs | 151 |
1 files changed, 143 insertions, 8 deletions
diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs index bdbc815baa4..0bf17e7570f 100644 --- a/components/net/resource_thread.rs +++ b/components/net/resource_thread.rs @@ -46,6 +46,7 @@ use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, RwLock}; use std::thread; +use std::time::Duration; /// Returns a tuple of (public, private) senders to the new threads. pub fn new_resource_threads( @@ -345,6 +346,7 @@ impl ResourceChannelManager { Err(_) => warn!("Error writing hsts list to disk"), } } + self.resource_manager.exit(); let _ = sender.send(()); return false; }, @@ -429,10 +431,135 @@ pub struct CoreResourceManager { devtools_chan: Option<Sender<DevtoolsControlMsg>>, swmanager_chan: Option<IpcSender<CustomResponseMediator>>, filemanager: FileManager, - fetch_pool: rayon::ThreadPool, + thread_pool: Arc<CoreResourceThreadPool>, certificate_path: Option<String>, } +/// The state of the thread-pool used by CoreResource. +struct ThreadPoolState { + /// The number of active workers. + active_workers: u32, + /// Whether the pool can spawn additional work. + active: bool, +} + +impl ThreadPoolState { + pub fn new() -> ThreadPoolState { + ThreadPoolState { + active_workers: 0, + active: true, + } + } + + /// Is the pool still able to spawn new work? + pub fn is_active(&self) -> bool { + self.active + } + + /// How many workers are currently active? + pub fn active_workers(&self) -> u32 { + self.active_workers + } + + /// Prevent additional work from being spawned. + pub fn switch_to_inactive(&mut self) { + self.active = false; + } + + /// Add to the count of active workers. + pub fn increment_active(&mut self) { + self.active_workers += 1; + } + + /// Substract from the count of active workers. + pub fn decrement_active(&mut self) { + self.active_workers -= 1; + } +} + +/// Threadpool used by Fetch and file operations. +pub struct CoreResourceThreadPool { + pool: rayon::ThreadPool, + state: Arc<Mutex<ThreadPoolState>>, +} + +impl CoreResourceThreadPool { + pub fn new(num_threads: usize) -> CoreResourceThreadPool { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(); + let state = Arc::new(Mutex::new(ThreadPoolState::new())); + CoreResourceThreadPool { pool: pool, state } + } + + /// Spawn work on the thread-pool, if still active. + /// + /// There is no need to give feedback to the caller, + /// because if we do not perform work, + /// it is because the system as a whole is exiting. + pub fn spawn<OP>(&self, work: OP) + where + OP: FnOnce() + Send + 'static, + { + { + let mut state = self.state.lock().unwrap(); + if state.is_active() { + state.increment_active(); + } else { + // Don't spawn any work. + return; + } + } + + let state = self.state.clone(); + + self.pool.spawn(move || { + { + let mut state = state.lock().unwrap(); + if !state.is_active() { + // Decrement number of active workers and return, + // without doing any work. + return state.decrement_active(); + } + } + // Perform work. + work(); + { + // Decrement number of active workers. + let mut state = state.lock().unwrap(); + state.decrement_active(); + } + }); + } + + /// Prevent further work from being spawned, + /// and wait until all workers are done, + /// or a timeout of roughly one second has been reached. + pub fn exit(&self) { + { + let mut state = self.state.lock().unwrap(); + state.switch_to_inactive(); + } + let mut rounds = 0; + loop { + rounds += 1; + { + let state = self.state.lock().unwrap(); + let still_active = state.active_workers(); + + if still_active == 0 || rounds == 10 { + if still_active > 0 { + debug!("Exiting CoreResourceThreadPool with {:?} still working(should be zero)", still_active); + } + break; + } + } + thread::sleep(Duration::from_millis(100)); + } + } +} + impl CoreResourceManager { pub fn new( user_agent: Cow<'static, str>, @@ -441,20 +568,28 @@ impl CoreResourceManager { embedder_proxy: EmbedderProxy, certificate_path: Option<String>, ) -> CoreResourceManager { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(16) - .build() - .unwrap(); + let pool = CoreResourceThreadPool::new(16); + let pool_handle = Arc::new(pool); CoreResourceManager { user_agent: user_agent, devtools_chan: devtools_channel, swmanager_chan: None, - filemanager: FileManager::new(embedder_proxy), - fetch_pool: pool, + filemanager: FileManager::new(embedder_proxy, Arc::downgrade(&pool_handle)), + thread_pool: pool_handle, certificate_path, } } + /// Exit the core resource manager. + pub fn exit(&mut self) { + // Prevents further work from being spawned on the pool, + // blocks until all workers in the pool are done, + // or a short timeout has been reached. + self.thread_pool.exit(); + + debug!("Exited CoreResourceManager"); + } + fn set_cookie_for_url( &mut self, request: &ServoUrl, @@ -486,7 +621,7 @@ impl CoreResourceManager { _ => ResourceTimingType::Resource, }; - self.fetch_pool.spawn(move || { + self.thread_pool.spawn(move || { let mut request = request_builder.build(); // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed) // todo load context / mimesniff in fetch |