diff options
Diffstat (limited to 'components/net/filemanager_thread.rs')
-rw-r--r-- | components/net/filemanager_thread.rs | 414 |
1 files changed, 223 insertions, 191 deletions
diff --git a/components/net/filemanager_thread.rs b/components/net/filemanager_thread.rs index 0672f0cfc65..d765c37ce32 100644 --- a/components/net/filemanager_thread.rs +++ b/components/net/filemanager_thread.rs @@ -3,6 +3,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ use crate::fetch::methods::{CancellationListener, Data, RangeRequestBounds}; +use crate::resource_thread::CoreResourceThreadPool; use crossbeam_channel::Sender; use embedder_traits::{EmbedderMsg, EmbedderProxy, FilterPattern}; use headers::{ContentLength, ContentType, HeaderMap, HeaderMapExt}; @@ -24,8 +25,7 @@ use std::mem; use std::ops::Index; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread; +use std::sync::{Arc, Mutex, RwLock, Weak}; use url::Url; use uuid::Uuid; @@ -72,13 +72,18 @@ enum FileImpl { pub struct FileManager { embedder_proxy: EmbedderProxy, store: Arc<FileManagerStore>, + thread_pool: Weak<CoreResourceThreadPool>, } impl FileManager { - pub fn new(embedder_proxy: EmbedderProxy) -> FileManager { + pub fn new( + embedder_proxy: EmbedderProxy, + pool_handle: Weak<CoreResourceThreadPool>, + ) -> FileManager { FileManager { embedder_proxy: embedder_proxy, store: Arc::new(FileManagerStore::new()), + thread_pool: pool_handle, } } @@ -90,14 +95,19 @@ impl FileManager { origin: FileOrigin, ) { let store = self.store.clone(); - thread::Builder::new() - .name("read file".to_owned()) - .spawn(move || { - if let Err(e) = store.try_read_file(&sender, id, check_url_validity, origin) { - let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e))); - } + self.thread_pool + .upgrade() + .and_then(|pool| { + pool.spawn(move || { + if let Err(e) = store.try_read_file(&sender, id, check_url_validity, origin) { + let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e))); + } + }); + Some(()) }) - .expect("Thread spawning failed"); + .unwrap_or_else(|| { + warn!("FileManager tried to read a file after CoreResourceManager has exited."); + }); } // Read a file for the Fetch implementation. @@ -113,7 +123,7 @@ impl FileManager { response: &mut Response, range: RangeRequestBounds, ) -> Result<(), BlobURLStoreError> { - self.store.fetch_blob_buf( + self.fetch_blob_buf( done_sender, cancellation_listener, &id, @@ -134,22 +144,36 @@ impl FileManager { FileManagerThreadMsg::SelectFile(filter, sender, origin, opt_test_path) => { let store = self.store.clone(); let embedder = self.embedder_proxy.clone(); - thread::Builder::new() - .name("select file".to_owned()) - .spawn(move || { - store.select_file(filter, sender, origin, opt_test_path, embedder); + self.thread_pool + .upgrade() + .and_then(|pool| { + pool.spawn(move || { + store.select_file(filter, sender, origin, opt_test_path, embedder); + }); + Some(()) }) - .expect("Thread spawning failed"); + .unwrap_or_else(|| { + warn!( + "FileManager tried to select a file after CoreResourceManager has exited." + ); + }); }, FileManagerThreadMsg::SelectFiles(filter, sender, origin, opt_test_paths) => { let store = self.store.clone(); let embedder = self.embedder_proxy.clone(); - thread::Builder::new() - .name("select files".to_owned()) - .spawn(move || { - store.select_files(filter, sender, origin, opt_test_paths, embedder); + self.thread_pool + .upgrade() + .and_then(|pool| { + pool.spawn(move || { + store.select_files(filter, sender, origin, opt_test_paths, embedder); + }); + Some(()) }) - .expect("Thread spawning failed"); + .unwrap_or_else(|| { + warn!( + "FileManager tried to select multiple files after CoreResourceManager has exited." + ); + }); }, FileManagerThreadMsg::ReadFile(sender, id, check_url_validity, origin) => { self.read_file(sender, id, check_url_validity, origin); @@ -171,6 +195,183 @@ impl FileManager { }, } } + + pub fn fetch_file_in_chunks( + &self, + done_sender: Sender<Data>, + mut reader: BufReader<File>, + res_body: ServoArc<Mutex<ResponseBody>>, + cancellation_listener: Arc<Mutex<CancellationListener>>, + range: RelativePos, + ) { + self.thread_pool + .upgrade() + .and_then(|pool| { + pool.spawn(move || { + loop { + if cancellation_listener.lock().unwrap().cancelled() { + *res_body.lock().unwrap() = ResponseBody::Done(vec![]); + let _ = done_sender.send(Data::Cancelled); + return; + } + let length = { + let buffer = reader.fill_buf().unwrap().to_vec(); + let mut buffer_len = buffer.len(); + if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() + { + let offset = usize::min( + { + if let Some(end) = range.end { + // HTTP Range requests are specified with closed ranges, + // while Rust uses half-open ranges. We add +1 here so + // we don't skip the last requested byte. + let remaining_bytes = + end as usize - range.start as usize - body.len() + + 1; + if remaining_bytes <= FILE_CHUNK_SIZE { + // This is the last chunk so we set buffer + // len to 0 to break the reading loop. + buffer_len = 0; + remaining_bytes + } else { + FILE_CHUNK_SIZE + } + } else { + FILE_CHUNK_SIZE + } + }, + buffer.len(), + ); + let chunk = &buffer[0..offset]; + body.extend_from_slice(chunk); + let _ = done_sender.send(Data::Payload(chunk.to_vec())); + } + buffer_len + }; + if length == 0 { + let mut body = res_body.lock().unwrap(); + let completed_body = match *body { + ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]), + _ => vec![], + }; + *body = ResponseBody::Done(completed_body); + let _ = done_sender.send(Data::Done); + break; + } + reader.consume(length); + } + }); + Some(()) + }) + .unwrap_or_else(|| { + warn!("FileManager tried to fetch a file in chunks after CoreResourceManager has exited."); + }); + } + + fn fetch_blob_buf( + &self, + done_sender: &Sender<Data>, + cancellation_listener: Arc<Mutex<CancellationListener>>, + id: &Uuid, + origin_in: &FileOrigin, + range: RangeRequestBounds, + check_url_validity: bool, + response: &mut Response, + ) -> Result<(), BlobURLStoreError> { + let file_impl = self.store.get_impl(id, origin_in, check_url_validity)?; + match file_impl { + FileImpl::Memory(buf) => { + let range = match range.get_final(Some(buf.size)) { + Ok(range) => range, + Err(_) => { + return Err(BlobURLStoreError::InvalidRange); + }, + }; + + let range = range.to_abs_range(buf.size as usize); + let len = range.len() as u64; + + set_headers( + &mut response.headers, + len, + buf.type_string.parse().unwrap_or(mime::TEXT_PLAIN), + /* filename */ None, + ); + + let mut bytes = vec![]; + bytes.extend_from_slice(buf.bytes.index(range)); + + let _ = done_sender.send(Data::Payload(bytes)); + let _ = done_sender.send(Data::Done); + + Ok(()) + }, + FileImpl::MetaDataOnly(metadata) => { + /* XXX: Snapshot state check (optional) https://w3c.github.io/FileAPI/#snapshot-state. + Concretely, here we create another file, and this file might not + has the same underlying file state (meta-info plus content) as the time + create_entry is called. + */ + + let file = File::open(&metadata.path) + .map_err(|e| BlobURLStoreError::External(e.to_string()))?; + + let range = match range.get_final(Some(metadata.size)) { + Ok(range) => range, + Err(_) => { + return Err(BlobURLStoreError::InvalidRange); + }, + }; + + let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); + if reader.seek(SeekFrom::Start(range.start as u64)).is_err() { + return Err(BlobURLStoreError::External( + "Unexpected method for blob".into(), + )); + } + + let filename = metadata + .path + .file_name() + .and_then(|osstr| osstr.to_str()) + .map(|s| s.to_string()); + + set_headers( + &mut response.headers, + metadata.size, + mime_guess::from_path(metadata.path) + .first() + .unwrap_or(mime::TEXT_PLAIN), + filename, + ); + + self.fetch_file_in_chunks( + done_sender.clone(), + reader, + response.body.clone(), + cancellation_listener, + range, + ); + + Ok(()) + }, + FileImpl::Sliced(parent_id, inner_rel_pos) => { + // Next time we don't need to check validity since + // we have already done that for requesting URL if necessary. + return self.fetch_blob_buf( + done_sender, + cancellation_listener, + &parent_id, + origin_in, + RangeRequestBounds::Final( + RelativePos::full_range().slice_inner(&inner_rel_pos), + ), + false, + response, + ); + }, + } + } } /// File manager's data store. It maintains a thread-safe mapping @@ -188,7 +389,7 @@ impl FileManagerStore { } /// Copy out the file backend implementation content - fn get_impl( + pub fn get_impl( &self, id: &Uuid, origin_in: &FileOrigin, @@ -510,111 +711,6 @@ impl FileManagerStore { ) } - fn fetch_blob_buf( - &self, - done_sender: &Sender<Data>, - cancellation_listener: Arc<Mutex<CancellationListener>>, - id: &Uuid, - origin_in: &FileOrigin, - range: RangeRequestBounds, - check_url_validity: bool, - response: &mut Response, - ) -> Result<(), BlobURLStoreError> { - let file_impl = self.get_impl(id, origin_in, check_url_validity)?; - match file_impl { - FileImpl::Memory(buf) => { - let range = match range.get_final(Some(buf.size)) { - Ok(range) => range, - Err(_) => { - return Err(BlobURLStoreError::InvalidRange); - }, - }; - - let range = range.to_abs_range(buf.size as usize); - let len = range.len() as u64; - - set_headers( - &mut response.headers, - len, - buf.type_string.parse().unwrap_or(mime::TEXT_PLAIN), - /* filename */ None, - ); - - let mut bytes = vec![]; - bytes.extend_from_slice(buf.bytes.index(range)); - - let _ = done_sender.send(Data::Payload(bytes)); - let _ = done_sender.send(Data::Done); - - Ok(()) - }, - FileImpl::MetaDataOnly(metadata) => { - /* XXX: Snapshot state check (optional) https://w3c.github.io/FileAPI/#snapshot-state. - Concretely, here we create another file, and this file might not - has the same underlying file state (meta-info plus content) as the time - create_entry is called. - */ - - let file = File::open(&metadata.path) - .map_err(|e| BlobURLStoreError::External(e.to_string()))?; - - let range = match range.get_final(Some(metadata.size)) { - Ok(range) => range, - Err(_) => { - return Err(BlobURLStoreError::InvalidRange); - }, - }; - - let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); - if reader.seek(SeekFrom::Start(range.start as u64)).is_err() { - return Err(BlobURLStoreError::External( - "Unexpected method for blob".into(), - )); - } - - let filename = metadata - .path - .file_name() - .and_then(|osstr| osstr.to_str()) - .map(|s| s.to_string()); - - set_headers( - &mut response.headers, - metadata.size, - mime_guess::from_path(metadata.path) - .first() - .unwrap_or(mime::TEXT_PLAIN), - filename, - ); - - fetch_file_in_chunks( - done_sender.clone(), - reader, - response.body.clone(), - cancellation_listener, - range, - ); - - Ok(()) - }, - FileImpl::Sliced(parent_id, inner_rel_pos) => { - // Next time we don't need to check validity since - // we have already done that for requesting URL if necessary. - return self.fetch_blob_buf( - done_sender, - cancellation_listener, - &parent_id, - origin_in, - RangeRequestBounds::Final( - RelativePos::full_range().slice_inner(&inner_rel_pos), - ), - false, - response, - ); - }, - } - } - fn dec_ref(&self, id: &Uuid, origin_in: &FileOrigin) -> Result<(), BlobURLStoreError> { let (do_remove, opt_parent_id) = match self.entries.read().unwrap().get(id) { Some(entry) => { @@ -763,70 +859,6 @@ fn read_file_in_chunks( } } -pub fn fetch_file_in_chunks( - done_sender: Sender<Data>, - mut reader: BufReader<File>, - res_body: ServoArc<Mutex<ResponseBody>>, - cancellation_listener: Arc<Mutex<CancellationListener>>, - range: RelativePos, -) { - thread::Builder::new() - .name("fetch file worker thread".to_string()) - .spawn(move || { - loop { - if cancellation_listener.lock().unwrap().cancelled() { - *res_body.lock().unwrap() = ResponseBody::Done(vec![]); - let _ = done_sender.send(Data::Cancelled); - return; - } - let length = { - let buffer = reader.fill_buf().unwrap().to_vec(); - let mut buffer_len = buffer.len(); - if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() { - let offset = usize::min( - { - if let Some(end) = range.end { - // HTTP Range requests are specified with closed ranges, - // while Rust uses half-open ranges. We add +1 here so - // we don't skip the last requested byte. - let remaining_bytes = - end as usize - range.start as usize - body.len() + 1; - if remaining_bytes <= FILE_CHUNK_SIZE { - // This is the last chunk so we set buffer - // len to 0 to break the reading loop. - buffer_len = 0; - remaining_bytes - } else { - FILE_CHUNK_SIZE - } - } else { - FILE_CHUNK_SIZE - } - }, - buffer.len(), - ); - let chunk = &buffer[0..offset]; - body.extend_from_slice(chunk); - let _ = done_sender.send(Data::Payload(chunk.to_vec())); - } - buffer_len - }; - if length == 0 { - let mut body = res_body.lock().unwrap(); - let completed_body = match *body { - ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]), - _ => vec![], - }; - *body = ResponseBody::Done(completed_body); - let _ = done_sender.send(Data::Done); - break; - } - reader.consume(length); - } - }) - .expect("Failed to create fetch file worker thread"); -} - fn set_headers(headers: &mut HeaderMap, content_length: u64, mime: Mime, filename: Option<String>) { headers.typed_insert(ContentLength(content_length)); headers.typed_insert(ContentType::from(mime.clone())); |