diff options
author | Fernando Jiménez Moreno <ferjmoreno@gmail.com> | 2018-11-10 11:57:47 +0100 |
---|---|---|
committer | Fernando Jiménez Moreno <ferjmoreno@gmail.com> | 2018-11-26 09:33:35 +0100 |
commit | a84442864d64242903a2b55c170b7f889ab4ab32 (patch) | |
tree | 1bf28997d93190f923bcaceccdb794e28d64840c /components/net/filemanager_thread.rs | |
parent | 8538634210988adabd5d75c8ff28cabd59a06baa (diff) | |
download | servo-a84442864d64242903a2b55c170b7f889ab4ab32.tar.gz servo-a84442864d64242903a2b55c170b7f889ab4ab32.zip |
Add support fo byte range requests for blob URLs
Diffstat (limited to 'components/net/filemanager_thread.rs')
-rw-r--r-- | components/net/filemanager_thread.rs | 147 |
1 files changed, 95 insertions, 52 deletions
diff --git a/components/net/filemanager_thread.rs b/components/net/filemanager_thread.rs index 660fe1d8875..032199d3f43 100644 --- a/components/net/filemanager_thread.rs +++ b/components/net/filemanager_thread.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use crate::fetch::methods::Data; +use crate::fetch::methods::{CancellationListener, Data}; use embedder_traits::{EmbedderMsg, EmbedderProxy, FilterPattern}; use headers_ext::{ContentLength, ContentType, HeaderMap, HeaderMapExt}; use http::header::{self, HeaderValue}; @@ -21,7 +21,8 @@ use servo_channel; use servo_config::prefs::PREFS; use std::collections::HashMap; use std::fs::File; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; +use std::mem; use std::ops::Index; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; @@ -30,6 +31,8 @@ use std::thread; use url::Url; use uuid::Uuid; +pub const FILE_CHUNK_SIZE: usize = 32768; //32 KB + /// FileManagerStore's entry struct FileStoreEntry { /// Origin of the entry's "creator" @@ -104,18 +107,21 @@ impl FileManager { // in a separate thread. pub fn fetch_file( &self, - sender: &servo_channel::Sender<Data>, + done_sender: &servo_channel::Sender<Data>, + cancellation_listener: Arc<Mutex<CancellationListener>>, id: Uuid, check_url_validity: bool, origin: FileOrigin, response: &mut Response, + range: RelativePos ) -> Result<(), String> { self.store .fetch_blob_buf( - sender, + done_sender, + cancellation_listener, &id, &origin, - RelativePos::full_range(), + range, check_url_validity, response, ) @@ -483,7 +489,7 @@ impl FileManagerStore { None => "".to_string(), }; - chunked_read(sender, &mut file, range.len(), opt_filename, type_string); + read_file_in_chunks(sender, &mut file, range.len(), opt_filename, type_string); Ok(()) } else { Err(BlobURLStoreError::InvalidEntry) @@ -522,17 +528,18 @@ impl FileManagerStore { fn fetch_blob_buf( &self, - sender: &servo_channel::Sender<Data>, + done_sender: &servo_channel::Sender<Data>, + cancellation_listener: Arc<Mutex<CancellationListener>>, id: &Uuid, origin_in: &FileOrigin, - rel_pos: RelativePos, + range: RelativePos, check_url_validity: bool, response: &mut Response, ) -> Result<(), BlobURLStoreError> { let file_impl = self.get_impl(id, origin_in, check_url_validity)?; match file_impl { FileImpl::Memory(buf) => { - let range = rel_pos.to_abs_range(buf.size as usize); + let range = range.to_abs_range(buf.size as usize); let len = range.len() as u64; set_headers( @@ -545,8 +552,8 @@ impl FileManagerStore { let mut bytes = vec![]; bytes.extend_from_slice(buf.bytes.index(range)); - let _ = sender.send(Data::Payload(bytes)); - let _ = sender.send(Data::Done); + let _ = done_sender.send(Data::Payload(bytes)); + let _ = done_sender.send(Data::Done); Ok(()) }, @@ -557,16 +564,12 @@ impl FileManagerStore { create_entry is called. */ - let mut file = File::open(&metadata.path) + let file = File::open(&metadata.path) .map_err(|e| BlobURLStoreError::External(e.to_string()))?; - let range = rel_pos.to_abs_range(metadata.size as usize); - let range_start = range.start as u64; - let seeked_start = file - .seek(SeekFrom::Start(range_start)) - .map_err(|e| BlobURLStoreError::External(e.to_string()))?; - if seeked_start != range_start { - return Err(BlobURLStoreError::InvalidEntry); + let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); + if reader.seek(SeekFrom::Start(range.start as u64)).is_err() { + return Err(BlobURLStoreError::External("Unexpected method for blob".into())); } let filename = metadata @@ -582,22 +585,23 @@ impl FileManagerStore { filename, ); - let body = response.body.clone(); - let sender = sender.clone(); - thread::Builder::new() - .name("fetch file".to_owned()) - .spawn(move || chunked_fetch(sender, &mut file, body)) - .expect("Thread spawn failed"); + fetch_file_in_chunks(done_sender.clone(), + reader, + response.body.clone(), + cancellation_listener, + range); + Ok(()) }, FileImpl::Sliced(parent_id, inner_rel_pos) => { // Next time we don't need to check validity since // we have already done that for requesting URL if necessary. return self.fetch_blob_buf( - sender, + done_sender, + cancellation_listener, &parent_id, origin_in, - rel_pos.slice_inner(&inner_rel_pos), + range.slice_inner(&inner_rel_pos), false, response, ); @@ -725,9 +729,7 @@ fn select_files_pref_enabled() -> bool { .unwrap_or(false) } -const CHUNK_SIZE: usize = 8192; - -fn chunked_read( +fn read_file_in_chunks( sender: &IpcSender<FileManagerResult<ReadFileProgress>>, file: &mut File, size: usize, @@ -735,7 +737,7 @@ fn chunked_read( type_string: String, ) { // First chunk - let mut buf = vec![0; CHUNK_SIZE]; + let mut buf = vec![0; FILE_CHUNK_SIZE]; match file.read(&mut buf) { Ok(n) => { buf.truncate(n); @@ -755,7 +757,7 @@ fn chunked_read( // Send the remaining chunks loop { - let mut buf = vec![0; CHUNK_SIZE]; + let mut buf = vec![0; FILE_CHUNK_SIZE]; match file.read(&mut buf) { Ok(0) => { let _ = sender.send(Ok(ReadFileProgress::EOF)); @@ -773,27 +775,68 @@ fn chunked_read( } } -fn chunked_fetch( - sender: servo_channel::Sender<Data>, - file: &mut File, - response_body: ServoArc<Mutex<ResponseBody>>, +pub fn fetch_file_in_chunks( + done_sender: servo_channel::Sender<Data>, + mut reader: BufReader<File>, + res_body: ServoArc<Mutex<ResponseBody>>, + cancellation_listener: Arc<Mutex<CancellationListener>>, + range: RelativePos, ) { - loop { - let mut buf = vec![0; CHUNK_SIZE]; - match file.read(&mut buf) { - Ok(0) | Err(_) => { - *response_body.lock().unwrap() = ResponseBody::Done(vec![]); - let _ = sender.send(Data::Done); - return; - }, - Ok(n) => { - buf.truncate(n); - let mut bytes = vec![]; - bytes.extend_from_slice(&buf); - let _ = sender.send(Data::Payload(buf)); - }, - } - } + thread::Builder::new() + .name("fetch file worker thread".to_string()) + .spawn(move || { + loop { + if cancellation_listener.lock().unwrap().cancelled() { + *res_body.lock().unwrap() = ResponseBody::Done(vec![]); + let _ = done_sender.send(Data::Cancelled); + return; + } + let length = { + let buffer = reader.fill_buf().unwrap().to_vec(); + let mut buffer_len = buffer.len(); + if let ResponseBody::Receiving(ref mut body) = + *res_body.lock().unwrap() + { + let offset = usize::min( + { + if let Some(end) = range.end { + let remaining_bytes = + end as usize - range.start as usize - body.len(); + if remaining_bytes <= FILE_CHUNK_SIZE { + // This is the last chunk so we set buffer + // len to 0 to break the reading loop. + buffer_len = 0; + remaining_bytes + } else { + FILE_CHUNK_SIZE + } + } else { + FILE_CHUNK_SIZE + } + }, + buffer.len(), + ); + body.extend_from_slice(&buffer[0..offset]); + let _ = done_sender.send(Data::Payload(buffer)); + } + buffer_len + }; + if length == 0 { + let mut body = res_body.lock().unwrap(); + let completed_body = match *body { + ResponseBody::Receiving(ref mut body) => { + mem::replace(body, vec![]) + }, + _ => vec![], + }; + *body = ResponseBody::Done(completed_body); + let _ = done_sender.send(Data::Done); + break; + } + reader.consume(length); + } + }) + .expect("Failed to create fetch file worker thread"); } fn set_headers(headers: &mut HeaderMap, content_length: u64, mime: Mime, filename: Option<String>) { |