aboutsummaryrefslogtreecommitdiffstats
path: root/components/net/filemanager_thread.rs
diff options
context:
space:
mode:
Diffstat (limited to 'components/net/filemanager_thread.rs')
-rw-r--r--components/net/filemanager_thread.rs414
1 files changed, 223 insertions, 191 deletions
diff --git a/components/net/filemanager_thread.rs b/components/net/filemanager_thread.rs
index 0672f0cfc65..d765c37ce32 100644
--- a/components/net/filemanager_thread.rs
+++ b/components/net/filemanager_thread.rs
@@ -3,6 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::fetch::methods::{CancellationListener, Data, RangeRequestBounds};
+use crate::resource_thread::CoreResourceThreadPool;
use crossbeam_channel::Sender;
use embedder_traits::{EmbedderMsg, EmbedderProxy, FilterPattern};
use headers::{ContentLength, ContentType, HeaderMap, HeaderMapExt};
@@ -24,8 +25,7 @@ use std::mem;
use std::ops::Index;
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
-use std::sync::{Arc, Mutex, RwLock};
-use std::thread;
+use std::sync::{Arc, Mutex, RwLock, Weak};
use url::Url;
use uuid::Uuid;
@@ -72,13 +72,18 @@ enum FileImpl {
pub struct FileManager {
embedder_proxy: EmbedderProxy,
store: Arc<FileManagerStore>,
+ thread_pool: Weak<CoreResourceThreadPool>,
}
impl FileManager {
- pub fn new(embedder_proxy: EmbedderProxy) -> FileManager {
+ pub fn new(
+ embedder_proxy: EmbedderProxy,
+ pool_handle: Weak<CoreResourceThreadPool>,
+ ) -> FileManager {
FileManager {
embedder_proxy: embedder_proxy,
store: Arc::new(FileManagerStore::new()),
+ thread_pool: pool_handle,
}
}
@@ -90,14 +95,19 @@ impl FileManager {
origin: FileOrigin,
) {
let store = self.store.clone();
- thread::Builder::new()
- .name("read file".to_owned())
- .spawn(move || {
- if let Err(e) = store.try_read_file(&sender, id, check_url_validity, origin) {
- let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e)));
- }
+ self.thread_pool
+ .upgrade()
+ .and_then(|pool| {
+ pool.spawn(move || {
+ if let Err(e) = store.try_read_file(&sender, id, check_url_validity, origin) {
+ let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e)));
+ }
+ });
+ Some(())
})
- .expect("Thread spawning failed");
+ .unwrap_or_else(|| {
+ warn!("FileManager tried to read a file after CoreResourceManager has exited.");
+ });
}
// Read a file for the Fetch implementation.
@@ -113,7 +123,7 @@ impl FileManager {
response: &mut Response,
range: RangeRequestBounds,
) -> Result<(), BlobURLStoreError> {
- self.store.fetch_blob_buf(
+ self.fetch_blob_buf(
done_sender,
cancellation_listener,
&id,
@@ -134,22 +144,36 @@ impl FileManager {
FileManagerThreadMsg::SelectFile(filter, sender, origin, opt_test_path) => {
let store = self.store.clone();
let embedder = self.embedder_proxy.clone();
- thread::Builder::new()
- .name("select file".to_owned())
- .spawn(move || {
- store.select_file(filter, sender, origin, opt_test_path, embedder);
+ self.thread_pool
+ .upgrade()
+ .and_then(|pool| {
+ pool.spawn(move || {
+ store.select_file(filter, sender, origin, opt_test_path, embedder);
+ });
+ Some(())
})
- .expect("Thread spawning failed");
+ .unwrap_or_else(|| {
+ warn!(
+ "FileManager tried to select a file after CoreResourceManager has exited."
+ );
+ });
},
FileManagerThreadMsg::SelectFiles(filter, sender, origin, opt_test_paths) => {
let store = self.store.clone();
let embedder = self.embedder_proxy.clone();
- thread::Builder::new()
- .name("select files".to_owned())
- .spawn(move || {
- store.select_files(filter, sender, origin, opt_test_paths, embedder);
+ self.thread_pool
+ .upgrade()
+ .and_then(|pool| {
+ pool.spawn(move || {
+ store.select_files(filter, sender, origin, opt_test_paths, embedder);
+ });
+ Some(())
})
- .expect("Thread spawning failed");
+ .unwrap_or_else(|| {
+ warn!(
+ "FileManager tried to select multiple files after CoreResourceManager has exited."
+ );
+ });
},
FileManagerThreadMsg::ReadFile(sender, id, check_url_validity, origin) => {
self.read_file(sender, id, check_url_validity, origin);
@@ -171,6 +195,183 @@ impl FileManager {
},
}
}
+
+ pub fn fetch_file_in_chunks(
+ &self,
+ done_sender: Sender<Data>,
+ mut reader: BufReader<File>,
+ res_body: ServoArc<Mutex<ResponseBody>>,
+ cancellation_listener: Arc<Mutex<CancellationListener>>,
+ range: RelativePos,
+ ) {
+ self.thread_pool
+ .upgrade()
+ .and_then(|pool| {
+ pool.spawn(move || {
+ loop {
+ if cancellation_listener.lock().unwrap().cancelled() {
+ *res_body.lock().unwrap() = ResponseBody::Done(vec![]);
+ let _ = done_sender.send(Data::Cancelled);
+ return;
+ }
+ let length = {
+ let buffer = reader.fill_buf().unwrap().to_vec();
+ let mut buffer_len = buffer.len();
+ if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap()
+ {
+ let offset = usize::min(
+ {
+ if let Some(end) = range.end {
+ // HTTP Range requests are specified with closed ranges,
+ // while Rust uses half-open ranges. We add +1 here so
+ // we don't skip the last requested byte.
+ let remaining_bytes =
+ end as usize - range.start as usize - body.len() +
+ 1;
+ if remaining_bytes <= FILE_CHUNK_SIZE {
+ // This is the last chunk so we set buffer
+ // len to 0 to break the reading loop.
+ buffer_len = 0;
+ remaining_bytes
+ } else {
+ FILE_CHUNK_SIZE
+ }
+ } else {
+ FILE_CHUNK_SIZE
+ }
+ },
+ buffer.len(),
+ );
+ let chunk = &buffer[0..offset];
+ body.extend_from_slice(chunk);
+ let _ = done_sender.send(Data::Payload(chunk.to_vec()));
+ }
+ buffer_len
+ };
+ if length == 0 {
+ let mut body = res_body.lock().unwrap();
+ let completed_body = match *body {
+ ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]),
+ _ => vec![],
+ };
+ *body = ResponseBody::Done(completed_body);
+ let _ = done_sender.send(Data::Done);
+ break;
+ }
+ reader.consume(length);
+ }
+ });
+ Some(())
+ })
+ .unwrap_or_else(|| {
+ warn!("FileManager tried to fetch a file in chunks after CoreResourceManager has exited.");
+ });
+ }
+
+ fn fetch_blob_buf(
+ &self,
+ done_sender: &Sender<Data>,
+ cancellation_listener: Arc<Mutex<CancellationListener>>,
+ id: &Uuid,
+ origin_in: &FileOrigin,
+ range: RangeRequestBounds,
+ check_url_validity: bool,
+ response: &mut Response,
+ ) -> Result<(), BlobURLStoreError> {
+ let file_impl = self.store.get_impl(id, origin_in, check_url_validity)?;
+ match file_impl {
+ FileImpl::Memory(buf) => {
+ let range = match range.get_final(Some(buf.size)) {
+ Ok(range) => range,
+ Err(_) => {
+ return Err(BlobURLStoreError::InvalidRange);
+ },
+ };
+
+ let range = range.to_abs_range(buf.size as usize);
+ let len = range.len() as u64;
+
+ set_headers(
+ &mut response.headers,
+ len,
+ buf.type_string.parse().unwrap_or(mime::TEXT_PLAIN),
+ /* filename */ None,
+ );
+
+ let mut bytes = vec![];
+ bytes.extend_from_slice(buf.bytes.index(range));
+
+ let _ = done_sender.send(Data::Payload(bytes));
+ let _ = done_sender.send(Data::Done);
+
+ Ok(())
+ },
+ FileImpl::MetaDataOnly(metadata) => {
+ /* XXX: Snapshot state check (optional) https://w3c.github.io/FileAPI/#snapshot-state.
+ Concretely, here we create another file, and this file might not
+ has the same underlying file state (meta-info plus content) as the time
+ create_entry is called.
+ */
+
+ let file = File::open(&metadata.path)
+ .map_err(|e| BlobURLStoreError::External(e.to_string()))?;
+
+ let range = match range.get_final(Some(metadata.size)) {
+ Ok(range) => range,
+ Err(_) => {
+ return Err(BlobURLStoreError::InvalidRange);
+ },
+ };
+
+ let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file);
+ if reader.seek(SeekFrom::Start(range.start as u64)).is_err() {
+ return Err(BlobURLStoreError::External(
+ "Unexpected method for blob".into(),
+ ));
+ }
+
+ let filename = metadata
+ .path
+ .file_name()
+ .and_then(|osstr| osstr.to_str())
+ .map(|s| s.to_string());
+
+ set_headers(
+ &mut response.headers,
+ metadata.size,
+ mime_guess::from_path(metadata.path)
+ .first()
+ .unwrap_or(mime::TEXT_PLAIN),
+ filename,
+ );
+
+ self.fetch_file_in_chunks(
+ done_sender.clone(),
+ reader,
+ response.body.clone(),
+ cancellation_listener,
+ range,
+ );
+
+ Ok(())
+ },
+ FileImpl::Sliced(parent_id, inner_rel_pos) => {
+ // Next time we don't need to check validity since
+ // we have already done that for requesting URL if necessary.
+ return self.fetch_blob_buf(
+ done_sender,
+ cancellation_listener,
+ &parent_id,
+ origin_in,
+ RangeRequestBounds::Final(
+ RelativePos::full_range().slice_inner(&inner_rel_pos),
+ ),
+ false,
+ response,
+ );
+ },
+ }
+ }
}
/// File manager's data store. It maintains a thread-safe mapping
@@ -188,7 +389,7 @@ impl FileManagerStore {
}
/// Copy out the file backend implementation content
- fn get_impl(
+ pub fn get_impl(
&self,
id: &Uuid,
origin_in: &FileOrigin,
@@ -510,111 +711,6 @@ impl FileManagerStore {
)
}
- fn fetch_blob_buf(
- &self,
- done_sender: &Sender<Data>,
- cancellation_listener: Arc<Mutex<CancellationListener>>,
- id: &Uuid,
- origin_in: &FileOrigin,
- range: RangeRequestBounds,
- check_url_validity: bool,
- response: &mut Response,
- ) -> Result<(), BlobURLStoreError> {
- let file_impl = self.get_impl(id, origin_in, check_url_validity)?;
- match file_impl {
- FileImpl::Memory(buf) => {
- let range = match range.get_final(Some(buf.size)) {
- Ok(range) => range,
- Err(_) => {
- return Err(BlobURLStoreError::InvalidRange);
- },
- };
-
- let range = range.to_abs_range(buf.size as usize);
- let len = range.len() as u64;
-
- set_headers(
- &mut response.headers,
- len,
- buf.type_string.parse().unwrap_or(mime::TEXT_PLAIN),
- /* filename */ None,
- );
-
- let mut bytes = vec![];
- bytes.extend_from_slice(buf.bytes.index(range));
-
- let _ = done_sender.send(Data::Payload(bytes));
- let _ = done_sender.send(Data::Done);
-
- Ok(())
- },
- FileImpl::MetaDataOnly(metadata) => {
- /* XXX: Snapshot state check (optional) https://w3c.github.io/FileAPI/#snapshot-state.
- Concretely, here we create another file, and this file might not
- has the same underlying file state (meta-info plus content) as the time
- create_entry is called.
- */
-
- let file = File::open(&metadata.path)
- .map_err(|e| BlobURLStoreError::External(e.to_string()))?;
-
- let range = match range.get_final(Some(metadata.size)) {
- Ok(range) => range,
- Err(_) => {
- return Err(BlobURLStoreError::InvalidRange);
- },
- };
-
- let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file);
- if reader.seek(SeekFrom::Start(range.start as u64)).is_err() {
- return Err(BlobURLStoreError::External(
- "Unexpected method for blob".into(),
- ));
- }
-
- let filename = metadata
- .path
- .file_name()
- .and_then(|osstr| osstr.to_str())
- .map(|s| s.to_string());
-
- set_headers(
- &mut response.headers,
- metadata.size,
- mime_guess::from_path(metadata.path)
- .first()
- .unwrap_or(mime::TEXT_PLAIN),
- filename,
- );
-
- fetch_file_in_chunks(
- done_sender.clone(),
- reader,
- response.body.clone(),
- cancellation_listener,
- range,
- );
-
- Ok(())
- },
- FileImpl::Sliced(parent_id, inner_rel_pos) => {
- // Next time we don't need to check validity since
- // we have already done that for requesting URL if necessary.
- return self.fetch_blob_buf(
- done_sender,
- cancellation_listener,
- &parent_id,
- origin_in,
- RangeRequestBounds::Final(
- RelativePos::full_range().slice_inner(&inner_rel_pos),
- ),
- false,
- response,
- );
- },
- }
- }
-
fn dec_ref(&self, id: &Uuid, origin_in: &FileOrigin) -> Result<(), BlobURLStoreError> {
let (do_remove, opt_parent_id) = match self.entries.read().unwrap().get(id) {
Some(entry) => {
@@ -763,70 +859,6 @@ fn read_file_in_chunks(
}
}
-pub fn fetch_file_in_chunks(
- done_sender: Sender<Data>,
- mut reader: BufReader<File>,
- res_body: ServoArc<Mutex<ResponseBody>>,
- cancellation_listener: Arc<Mutex<CancellationListener>>,
- range: RelativePos,
-) {
- thread::Builder::new()
- .name("fetch file worker thread".to_string())
- .spawn(move || {
- loop {
- if cancellation_listener.lock().unwrap().cancelled() {
- *res_body.lock().unwrap() = ResponseBody::Done(vec![]);
- let _ = done_sender.send(Data::Cancelled);
- return;
- }
- let length = {
- let buffer = reader.fill_buf().unwrap().to_vec();
- let mut buffer_len = buffer.len();
- if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
- let offset = usize::min(
- {
- if let Some(end) = range.end {
- // HTTP Range requests are specified with closed ranges,
- // while Rust uses half-open ranges. We add +1 here so
- // we don't skip the last requested byte.
- let remaining_bytes =
- end as usize - range.start as usize - body.len() + 1;
- if remaining_bytes <= FILE_CHUNK_SIZE {
- // This is the last chunk so we set buffer
- // len to 0 to break the reading loop.
- buffer_len = 0;
- remaining_bytes
- } else {
- FILE_CHUNK_SIZE
- }
- } else {
- FILE_CHUNK_SIZE
- }
- },
- buffer.len(),
- );
- let chunk = &buffer[0..offset];
- body.extend_from_slice(chunk);
- let _ = done_sender.send(Data::Payload(chunk.to_vec()));
- }
- buffer_len
- };
- if length == 0 {
- let mut body = res_body.lock().unwrap();
- let completed_body = match *body {
- ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]),
- _ => vec![],
- };
- *body = ResponseBody::Done(completed_body);
- let _ = done_sender.send(Data::Done);
- break;
- }
- reader.consume(length);
- }
- })
- .expect("Failed to create fetch file worker thread");
-}
-
fn set_headers(headers: &mut HeaderMap, content_length: u64, mime: Mime, filename: Option<String>) {
headers.typed_insert(ContentLength(content_length));
headers.typed_insert(ContentType::from(mime.clone()));