diff options
Diffstat (limited to 'components/net/image_cache_thread.rs')
-rw-r--r-- | components/net/image_cache_thread.rs | 399 |
1 files changed, 179 insertions, 220 deletions
diff --git a/components/net/image_cache_thread.rs b/components/net/image_cache_thread.rs index 51d8324bd48..5beacb8a1a4 100644 --- a/components/net/image_cache_thread.rs +++ b/components/net/image_cache_thread.rs @@ -5,12 +5,11 @@ use immeta::load_from_buf; use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; use ipc_channel::router::ROUTER; -use net_traits::{CoreResourceThread, NetworkError, fetch_async, FetchResponseMsg}; +use net_traits::{NetworkError, FetchResponseMsg}; use net_traits::image::base::{Image, ImageMetadata, PixelFormat, load_from_memory}; -use net_traits::image_cache_thread::{ImageCacheChan, ImageCacheCommand, ImageCacheThread, ImageState}; -use net_traits::image_cache_thread::{ImageCacheResult, ImageOrMetadataAvailable, ImageResponse, UsePlaceholder}; -use net_traits::image_cache_thread::ImageResponder; -use net_traits::request::{Destination, RequestInit, Type as RequestType}; +use net_traits::image_cache_thread::{ImageCacheCommand, ImageCacheThread, ImageState}; +use net_traits::image_cache_thread::{ImageOrMetadataAvailable, ImageResponse, UsePlaceholder}; +use net_traits::image_cache_thread::{ImageResponder, PendingImageId, CanRequestImages}; use servo_config::resource_files::resources_dir_path; use servo_url::ServoUrl; use std::borrow::ToOwned; @@ -58,20 +57,54 @@ fn is_image_opaque(format: webrender_traits::ImageFormat, bytes: &[u8]) -> bool struct PendingLoad { // The bytes loaded so far. Reset to an empty vector once loading // is complete and the buffer has been transmitted to the decoder. - bytes: Vec<u8>, + bytes: ImageBytes, // Image metadata, if available. metadata: Option<ImageMetadata>, // Once loading is complete, the result of the operation. result: Option<Result<(), NetworkError>>, - listeners: Vec<ImageListener>, + listeners: Vec<ImageResponder>, // The url being loaded. Do not forget that this may be several Mb // if we are loading a data: url. url: ServoUrl, } +enum ImageBytes { + InProgress(Vec<u8>), + Complete(Arc<Vec<u8>>), +} + +impl ImageBytes { + fn extend_from_slice(&mut self, data: &[u8]) { + match *self { + ImageBytes::InProgress(ref mut bytes) => bytes.extend_from_slice(data), + ImageBytes::Complete(_) => panic!("attempted modification of complete image bytes"), + } + } + + fn mark_complete(&mut self) -> Arc<Vec<u8>> { + let bytes = { + let own_bytes = match *self { + ImageBytes::InProgress(ref mut bytes) => bytes, + ImageBytes::Complete(_) => panic!("attempted modification of complete image bytes"), + }; + mem::replace(own_bytes, vec![]) + }; + let bytes = Arc::new(bytes); + *self = ImageBytes::Complete(bytes.clone()); + bytes + } + + fn as_slice(&self) -> &[u8] { + match *self { + ImageBytes::InProgress(ref bytes) => &bytes, + ImageBytes::Complete(ref bytes) => &*bytes, + } + } +} + enum LoadResult { Loaded(Image), PlaceholderLoaded(Arc<Image>), @@ -81,7 +114,7 @@ enum LoadResult { impl PendingLoad { fn new(url: ServoUrl) -> PendingLoad { PendingLoad { - bytes: vec!(), + bytes: ImageBytes::InProgress(vec!()), metadata: None, result: None, listeners: vec!(), @@ -89,7 +122,7 @@ impl PendingLoad { } } - fn add_listener(&mut self, listener: ImageListener) { + fn add_listener(&mut self, listener: ImageResponder) { self.listeners.push(listener); } } @@ -109,11 +142,12 @@ struct AllPendingLoads { keygen: LoadKeyGenerator, } -// Result of accessing a cache. -#[derive(Eq, PartialEq)] -enum CacheResult { - Hit, // The value was in the cache. - Miss, // The value was not in the cache and needed to be regenerated. +/// Result of accessing a cache. +enum CacheResult<'a> { + /// The value was in the cache. + Hit(LoadKey, &'a mut PendingLoad), + /// The value was not in the cache and needed to be regenerated. + Miss(Option<(LoadKey, &'a mut PendingLoad)>), } impl AllPendingLoads { @@ -131,20 +165,11 @@ impl AllPendingLoads { self.loads.is_empty() } - // get a PendingLoad from its LoadKey. Prefer this to `get_by_url`, - // for performance reasons. + // get a PendingLoad from its LoadKey. fn get_by_key_mut(&mut self, key: &LoadKey) -> Option<&mut PendingLoad> { self.loads.get_mut(key) } - // get a PendingLoad from its url. When possible, prefer `get_by_key_mut`. - fn get_by_url(&self, url: &ServoUrl) -> Option<&PendingLoad> { - self.url_to_load_key.get(url). - and_then(|load_key| - self.loads.get(load_key) - ) - } - fn remove(&mut self, key: &LoadKey) -> Option<PendingLoad> { self.loads.remove(key). and_then(|pending_load| { @@ -153,13 +178,18 @@ impl AllPendingLoads { }) } - fn get_cached(&mut self, url: ServoUrl) -> (CacheResult, LoadKey, &mut PendingLoad) { + fn get_cached<'a>(&'a mut self, url: ServoUrl, can_request: CanRequestImages) + -> CacheResult<'a> { match self.url_to_load_key.entry(url.clone()) { Occupied(url_entry) => { let load_key = url_entry.get(); - (CacheResult::Hit, *load_key, self.loads.get_mut(load_key).unwrap()) + CacheResult::Hit(*load_key, self.loads.get_mut(load_key).unwrap()) } Vacant(url_entry) => { + if can_request == CanRequestImages::No { + return CacheResult::Miss(None); + } + let load_key = self.keygen.next(); url_entry.insert(load_key); @@ -168,7 +198,7 @@ impl AllPendingLoads { Occupied(_) => unreachable!(), Vacant(load_entry) => { let mut_load = load_entry.insert(pending_load); - (CacheResult::Miss, load_key, mut_load) + CacheResult::Miss(Some((load_key, mut_load))) } } } @@ -182,27 +212,20 @@ impl AllPendingLoads { /// fetched again. struct CompletedLoad { image_response: ImageResponse, + id: PendingImageId, } impl CompletedLoad { - fn new(image_response: ImageResponse) -> CompletedLoad { + fn new(image_response: ImageResponse, id: PendingImageId) -> CompletedLoad { CompletedLoad { image_response: image_response, + id: id, } } } -/// Stores information to notify a client when the state -/// of an image changes. -struct ImageListener { - sender: ImageCacheChan, - responder: Option<ImageResponder>, - send_metadata_msg: bool, -} - // A key used to communicate during loading. -#[derive(Eq, Hash, PartialEq, Clone, Copy)] -struct LoadKey(u64); +type LoadKey = PendingImageId; struct LoadKeyGenerator { counter: u64 @@ -214,34 +237,9 @@ impl LoadKeyGenerator { counter: 0 } } - fn next(&mut self) -> LoadKey { + fn next(&mut self) -> PendingImageId { self.counter += 1; - LoadKey(self.counter) - } -} - -impl ImageListener { - fn new(sender: ImageCacheChan, responder: Option<ImageResponder>, send_metadata_msg: bool) -> ImageListener { - ImageListener { - sender: sender, - responder: responder, - send_metadata_msg: send_metadata_msg, - } - } - - fn notify(&self, image_response: ImageResponse) { - if !self.send_metadata_msg { - if let ImageResponse::MetadataLoaded(_) = image_response { - return; - } - } - - let ImageCacheChan(ref sender) = self.sender; - let msg = ImageCacheResult { - responder: self.responder.clone(), - image_response: image_response, - }; - sender.send(msg).ok(); + PendingImageId(self.counter) } } @@ -252,16 +250,11 @@ struct ResourceLoadInfo { /// Implementation of the image cache struct ImageCache { - progress_sender: Sender<ResourceLoadInfo>, - decoder_sender: Sender<DecoderMsg>, // Worker threads for decoding images. thread_pool: ThreadPool, - // Resource thread handle - core_resource_thread: CoreResourceThread, - // Images that are loading over network, or decoding. pending_loads: AllPendingLoads, @@ -284,7 +277,6 @@ struct DecoderMsg { struct Receivers { cmd_receiver: Receiver<ImageCacheCommand>, decoder_receiver: Receiver<DecoderMsg>, - progress_receiver: Receiver<ResourceLoadInfo>, } impl Receivers { @@ -292,16 +284,12 @@ impl Receivers { fn recv(&self) -> SelectResult { let cmd_receiver = &self.cmd_receiver; let decoder_receiver = &self.decoder_receiver; - let progress_receiver = &self.progress_receiver; select! { msg = cmd_receiver.recv() => { SelectResult::Command(msg.unwrap()) }, msg = decoder_receiver.recv() => { SelectResult::Decoder(msg.unwrap()) - }, - msg = progress_receiver.recv() => { - SelectResult::Progress(msg.unwrap()) } } } @@ -310,7 +298,6 @@ impl Receivers { /// The types of messages that the main image cache thread receives. enum SelectResult { Command(ImageCacheCommand), - Progress(ResourceLoadInfo), Decoder(DecoderMsg), } @@ -347,8 +334,7 @@ fn get_placeholder_image(webrender_api: &webrender_traits::RenderApi) -> io::Res } impl ImageCache { - fn run(core_resource_thread: CoreResourceThread, - webrender_api: webrender_traits::RenderApi, + fn run(webrender_api: webrender_traits::RenderApi, ipc_command_receiver: IpcReceiver<ImageCacheCommand>) { // Preload the placeholder image, used when images fail to load. let placeholder_image = get_placeholder_image(&webrender_api).ok(); @@ -356,15 +342,12 @@ impl ImageCache { // Ask the router to proxy messages received over IPC to us. let cmd_receiver = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(ipc_command_receiver); - let (progress_sender, progress_receiver) = channel(); let (decoder_sender, decoder_receiver) = channel(); let mut cache = ImageCache { - progress_sender: progress_sender, decoder_sender: decoder_sender, thread_pool: ThreadPool::new(4), pending_loads: AllPendingLoads::new(), completed_loads: HashMap::new(), - core_resource_thread: core_resource_thread, placeholder_image: placeholder_image, webrender_api: webrender_api, }; @@ -372,7 +355,6 @@ impl ImageCache { let receivers = Receivers { cmd_receiver: cmd_receiver, decoder_receiver: decoder_receiver, - progress_receiver: progress_receiver, }; let mut exit_sender: Option<IpcSender<()>> = None; @@ -382,9 +364,6 @@ impl ImageCache { SelectResult::Command(cmd) => { exit_sender = cache.handle_cmd(cmd); } - SelectResult::Progress(msg) => { - cache.handle_progress(msg); - } SelectResult::Decoder(msg) => { cache.handle_decoder(msg); } @@ -406,22 +385,22 @@ impl ImageCache { ImageCacheCommand::Exit(sender) => { return Some(sender); } - ImageCacheCommand::RequestImage(url, result_chan, responder) => { - self.request_image(url, result_chan, responder, false); - } - ImageCacheCommand::RequestImageAndMetadata(url, result_chan, responder) => { - self.request_image(url, result_chan, responder, true); + ImageCacheCommand::AddListener(id, responder) => { + self.add_listener(id, responder); } - ImageCacheCommand::GetImageIfAvailable(url, use_placeholder, consumer) => { - let result = self.get_image_if_available(url, use_placeholder); + ImageCacheCommand::GetImageOrMetadataIfAvailable(url, + use_placeholder, + can_request, + consumer) => { + let result = self.get_image_or_meta_if_available(url, use_placeholder, can_request); + // TODO(#15501): look for opportunities to clean up cache if this send fails. let _ = consumer.send(result); } - ImageCacheCommand::GetImageOrMetadataIfAvailable(url, use_placeholder, consumer) => { - let result = self.get_image_or_meta_if_available(url, use_placeholder); - let _ = consumer.send(result); - } - ImageCacheCommand::StoreDecodeImage(url, image_vector) => { - self.store_decode_image(url, image_vector); + ImageCacheCommand::StoreDecodeImage(id, data) => { + self.handle_progress(ResourceLoadInfo { + action: data, + key: id + }); } }; @@ -433,41 +412,41 @@ impl ImageCache { match (msg.action, msg.key) { (FetchResponseMsg::ProcessRequestBody, _) | (FetchResponseMsg::ProcessRequestEOF, _) => return, - (FetchResponseMsg::ProcessResponse(_), _) => {} + (FetchResponseMsg::ProcessResponse(_response), _) => {} (FetchResponseMsg::ProcessResponseChunk(data), _) => { + debug!("got some data for {:?}", msg.key); let pending_load = self.pending_loads.get_by_key_mut(&msg.key).unwrap(); pending_load.bytes.extend_from_slice(&data); //jmr0 TODO: possibly move to another task? if let None = pending_load.metadata { - if let Ok(metadata) = load_from_buf(&pending_load.bytes) { + if let Ok(metadata) = load_from_buf(&pending_load.bytes.as_slice()) { let dimensions = metadata.dimensions(); let img_metadata = ImageMetadata { width: dimensions.width, - height: dimensions.height }; - pending_load.metadata = Some(img_metadata.clone()); + height: dimensions.height }; for listener in &pending_load.listeners { - listener.notify(ImageResponse::MetadataLoaded(img_metadata.clone()).clone()); + listener.respond(ImageResponse::MetadataLoaded(img_metadata.clone())); } + pending_load.metadata = Some(img_metadata); } } } (FetchResponseMsg::ProcessResponseEOF(result), key) => { + debug!("received EOF for {:?}", key); match result { Ok(()) => { let pending_load = self.pending_loads.get_by_key_mut(&msg.key).unwrap(); pending_load.result = Some(result); - let bytes = mem::replace(&mut pending_load.bytes, vec!()); + let bytes = pending_load.bytes.mark_complete(); let sender = self.decoder_sender.clone(); + debug!("async decoding {} ({:?})", pending_load.url, key); self.thread_pool.execute(move || { - let image = load_from_memory(&bytes); - let msg = DecoderMsg { - key: key, - image: image - }; + let msg = decode_bytes_sync(key, &*bytes); sender.send(msg).unwrap(); }); } Err(_) => { + debug!("processing error for {:?}", key); match self.placeholder_image.clone() { Some(placeholder_image) => { self.complete_load(msg.key, LoadResult::PlaceholderLoaded( @@ -492,7 +471,10 @@ impl ImageCache { // Change state of a url from pending -> loaded. fn complete_load(&mut self, key: LoadKey, mut load_result: LoadResult) { - let pending_load = self.pending_loads.remove(&key).unwrap(); + let pending_load = match self.pending_loads.remove(&key) { + Some(load) => load, + None => return, + }; match load_result { LoadResult::Loaded(ref mut image) => { @@ -518,145 +500,122 @@ impl ImageCache { LoadResult::None => ImageResponse::None, }; - let completed_load = CompletedLoad::new(image_response.clone()); + let completed_load = CompletedLoad::new(image_response.clone(), key); self.completed_loads.insert(pending_load.url.into(), completed_load); for listener in pending_load.listeners { - listener.notify(image_response.clone()); + listener.respond(image_response.clone()); } } - // Request an image from the cache. If the image hasn't been - // loaded/decoded yet, it will be loaded/decoded in the - // background. If send_metadata_msg is set, the channel will be notified - // that image metadata is available, possibly before the image has finished - // loading. - fn request_image(&mut self, - url: ServoUrl, - result_chan: ImageCacheChan, - responder: Option<ImageResponder>, - send_metadata_msg: bool) { - let image_listener = ImageListener::new(result_chan, responder, send_metadata_msg); - - // Check if already completed - match self.completed_loads.get(&url) { - Some(completed_load) => { - // It's already completed, return a notify straight away - image_listener.notify(completed_load.image_response.clone()); - } - None => { - // Check if the load is already pending - let (cache_result, load_key, mut pending_load) = self.pending_loads.get_cached(url.clone()); - pending_load.add_listener(image_listener); - match cache_result { - CacheResult::Miss => { - // A new load request! Request the load from - // the resource thread. - // https://html.spec.whatwg.org/multipage/#update-the-image-data - // step 12. - // - // TODO(emilio): ServoUrl in more places please! - let request = RequestInit { - url: url.clone(), - type_: RequestType::Image, - destination: Destination::Image, - origin: url.clone(), - .. RequestInit::default() - }; - - let progress_sender = self.progress_sender.clone(); - fetch_async(request, &self.core_resource_thread, move |action| { - let action = match action { - FetchResponseMsg::ProcessRequestBody | - FetchResponseMsg::ProcessRequestEOF => return, - a => a - }; - progress_sender.send(ResourceLoadInfo { - action: action, - key: load_key, - }).unwrap(); - }); - } - CacheResult::Hit => { - // Request is already on its way. - } - } + /// Add a listener for a given image if it is still pending, or notify the + /// listener if the image is complete. + fn add_listener(&mut self, + id: PendingImageId, + listener: ImageResponder) { + if let Some(load) = self.pending_loads.get_by_key_mut(&id) { + if let Some(ref metadata) = load.metadata { + listener.respond(ImageResponse::MetadataLoaded(metadata.clone())); } + load.add_listener(listener); + return; } + if let Some(load) = self.completed_loads.values().find(|l| l.id == id) { + listener.respond(load.image_response.clone()); + return; + } + warn!("Couldn't find cached entry for listener {:?}", id); } - fn get_image_if_available(&mut self, - url: ServoUrl, - placeholder: UsePlaceholder, ) - -> Result<Arc<Image>, ImageState> { - let img_or_metadata = self.get_image_or_meta_if_available(url, placeholder); - match img_or_metadata { - Ok(ImageOrMetadataAvailable::ImageAvailable(image)) => Ok(image), - Ok(ImageOrMetadataAvailable::MetadataAvailable(_)) => Err(ImageState::Pending), - Err(err) => Err(err), - } + /// Return a completed image if it exists, or None if there is no complete load + /// or the complete load is not fully decoded or is unavailable. + fn get_completed_image_if_available(&self, + url: &ServoUrl, + placeholder: UsePlaceholder) + -> Option<Result<ImageOrMetadataAvailable, ImageState>> { + self.completed_loads.get(url).map(|completed_load| { + match (&completed_load.image_response, placeholder) { + (&ImageResponse::Loaded(ref image), _) | + (&ImageResponse::PlaceholderLoaded(ref image), UsePlaceholder::Yes) => { + Ok(ImageOrMetadataAvailable::ImageAvailable(image.clone())) + } + (&ImageResponse::PlaceholderLoaded(_), UsePlaceholder::No) | + (&ImageResponse::None, _) | + (&ImageResponse::MetadataLoaded(_), _) => { + Err(ImageState::LoadError) + } + } + }) } + /// Return any available metadata or image for the given URL, or an indication that + /// the image is not yet available if it is in progress, or else reserve a slot in + /// the cache for the URL if the consumer can request images. fn get_image_or_meta_if_available(&mut self, url: ServoUrl, - placeholder: UsePlaceholder) + placeholder: UsePlaceholder, + can_request: CanRequestImages) -> Result<ImageOrMetadataAvailable, ImageState> { - match self.completed_loads.get(&url) { - Some(completed_load) => { - match (completed_load.image_response.clone(), placeholder) { - (ImageResponse::Loaded(image), _) | - (ImageResponse::PlaceholderLoaded(image), UsePlaceholder::Yes) => { - Ok(ImageOrMetadataAvailable::ImageAvailable(image)) + if let Some(result) = self.get_completed_image_if_available(&url, placeholder) { + debug!("{} is available", url); + return result; + } + + let decoded = { + let result = self.pending_loads.get_cached(url.clone(), can_request); + match result { + CacheResult::Hit(key, pl) => match (&pl.result, &pl.metadata) { + (&Some(Ok(_)), _) => { + debug!("sync decoding {} ({:?})", url, key); + decode_bytes_sync(key, &pl.bytes.as_slice()) + } + (&None, &Some(ref meta)) => { + debug!("metadata available for {} ({:?})", url, key); + return Ok(ImageOrMetadataAvailable::MetadataAvailable(meta.clone())) } - (ImageResponse::PlaceholderLoaded(_), UsePlaceholder::No) | - (ImageResponse::None, _) | - (ImageResponse::MetadataLoaded(_), _) => { - Err(ImageState::LoadError) + (&Some(Err(_)), _) | (&None, &None) => { + debug!("{} ({:?}) is still pending", url, key); + return Err(ImageState::Pending(key)); } + }, + CacheResult::Miss(Some((key, _pl))) => { + debug!("should be requesting {} ({:?})", url, key); + return Err(ImageState::NotRequested(key)); + } + CacheResult::Miss(None) => { + debug!("couldn't find an entry for {}", url); + return Err(ImageState::LoadError); } } - None => { - let pl = match self.pending_loads.get_by_url(&url) { - Some(pl) => pl, - None => return Err(ImageState::NotRequested), - }; - - let meta = match pl.metadata { - Some(ref meta) => meta, - None => return Err(ImageState::Pending), - }; + }; - Ok(ImageOrMetadataAvailable::MetadataAvailable(meta.clone())) - } + // In the case where a decode is ongoing (or waiting in a queue) but we have the + // full response available, we decode the bytes synchronously and ignore the + // async decode when it finishes later. + // TODO: make this behaviour configurable according to the caller's needs. + self.handle_decoder(decoded); + match self.get_completed_image_if_available(&url, placeholder) { + Some(result) => result, + None => Err(ImageState::LoadError), } } - - fn store_decode_image(&mut self, - ref_url: ServoUrl, - loaded_bytes: Vec<u8>) { - let (cache_result, load_key, _) = self.pending_loads.get_cached(ref_url.clone()); - assert!(cache_result == CacheResult::Miss); - let action = FetchResponseMsg::ProcessResponseChunk(loaded_bytes); - let _ = self.progress_sender.send(ResourceLoadInfo { - action: action, - key: load_key, - }); - let action = FetchResponseMsg::ProcessResponseEOF(Ok(())); - let _ = self.progress_sender.send(ResourceLoadInfo { - action: action, - key: load_key, - }); - } } /// Create a new image cache. -pub fn new_image_cache_thread(core_resource_thread: CoreResourceThread, - webrender_api: webrender_traits::RenderApi) -> ImageCacheThread { +pub fn new_image_cache_thread(webrender_api: webrender_traits::RenderApi) -> ImageCacheThread { let (ipc_command_sender, ipc_command_receiver) = ipc::channel().unwrap(); thread::Builder::new().name("ImageCacheThread".to_owned()).spawn(move || { - ImageCache::run(core_resource_thread, webrender_api, ipc_command_receiver) + ImageCache::run(webrender_api, ipc_command_receiver) }).expect("Thread spawning failed"); ImageCacheThread::new(ipc_command_sender) } + +fn decode_bytes_sync(key: LoadKey, bytes: &[u8]) -> DecoderMsg { + let image = load_from_memory(bytes); + DecoderMsg { + key: key, + image: image + } +} |