diff options
author | Glenn Watson <gw@intuitionlibrary.com> | 2015-04-20 13:34:26 +1000 |
---|---|---|
committer | Glenn Watson <gw@intuitionlibrary.com> | 2015-04-23 09:40:24 +1000 |
commit | d8aef7208e5ed9705f551f0bf0e0cd6607ff224f (patch) | |
tree | 4a2954d38055abea3189f8afd8cf580f8d90c600 /components/net/image_cache_task.rs | |
parent | e278e5b9a27738bdca7a151b4295628e1f179e29 (diff) | |
download | servo-d8aef7208e5ed9705f551f0bf0e0cd6607ff224f.tar.gz servo-d8aef7208e5ed9705f551f0bf0e0cd6607ff224f.zip |
Refactored image cache task - details below.
* Simpler image cache API for clients to use.
* Significantly fewer threads.
* One thread for image cache task (multiplexes commands, decoder threads and async resource requests).
* 4 threads for decoder worker tasks.
* Removed ReflowEvent hacks in script and layout tasks.
* Image elements pass a Trusted<T> to image cache, which is used to dirty nodes via script task. Previous use of Untrusted addresses was unsafe.
* Image requests such as background-image on layout / paint threads trigger repaint only rather than full reflow.
* Add reflow batching for when multiple images load quickly.
* Reduces the number of paints loading wikipedia from ~95 to ~35.
* Reasonably simple to add proper prefetch support in a follow up PR.
* Async loaded images always construct Image fragments now, instead of generic.
* Image fragments support the image not being present.
* Simpler implementation of synchronous image loading for reftests.
* Removed image holder.
* image.onload support.
* image NaturalWidth and NaturalHeight support.
* Updated WPT expectations.
Diffstat (limited to 'components/net/image_cache_task.rs')
-rw-r--r-- | components/net/image_cache_task.rs | 613 |
1 files changed, 283 insertions, 330 deletions
diff --git a/components/net/image_cache_task.rs b/components/net/image_cache_task.rs index b231f69ec39..54949d53b54 100644 --- a/components/net/image_cache_task.rs +++ b/components/net/image_cache_task.rs @@ -2,408 +2,361 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::ResourceTask; +use collections::borrow::ToOwned; use net_traits::image::base::{Image, load_from_memory}; -use net_traits::image_cache_task::{ImageResponseMsg, ImageCacheTask, Msg}; -use net_traits::image_cache_task::{load_image_data}; -use profile::time::{self, profile}; -use url::Url; - -use std::borrow::ToOwned; +use net_traits::image_cache_task::{ImageState, ImageCacheTask, ImageCacheChan, ImageCacheCommand, ImageCacheResult}; +use net_traits::load_whole_resource; use std::collections::HashMap; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use std::mem::replace; -use std::sync::{Arc, Mutex}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::mem; +use std::sync::Arc; +use std::sync::mpsc::{channel, Sender, Receiver, Select}; use util::resource_files::resources_dir_path; use util::task::spawn_named; use util::taskpool::TaskPool; +use url::Url; +use net_traits::{AsyncResponseTarget, ControlMsg, LoadData, ResponseAction, ResourceTask, LoadConsumer}; +use net_traits::image_cache_task::ImageResponder; + +/// +/// TODO(gw): Remaining work on image cache: +/// * Make use of the prefetch support in various parts of the code. +/// * Experiment with changing the image 'key' from being a url to a resource id +/// This might be a decent performance win and/or memory saving in some cases (esp. data urls) +/// * Profile time in GetImageIfAvailable - might be worth caching these results per paint / layout task. +/// + +/// Represents an image that is either being loaded +/// by the resource task, or decoded by a worker thread. +struct PendingLoad { + bytes: Vec<u8>, + result: Option<Result<(), String>>, + listeners: Vec<ImageListener>, +} + +impl PendingLoad { + fn new() -> PendingLoad { + PendingLoad { + bytes: vec!(), + result: None, + listeners: vec!(), + } + } -pub trait ImageCacheTaskFactory { - fn new(resource_task: ResourceTask, task_pool: TaskPool, - time_profiler_chan: time::ProfilerChan, - load_placeholder: LoadPlaceholder) -> Self; - fn new_sync(resource_task: ResourceTask, task_pool: TaskPool, - time_profiler_chan: time::ProfilerChan, - load_placeholder: LoadPlaceholder) -> Self; + fn add_listener(&mut self, listener: ImageListener) { + self.listeners.push(listener); + } } -impl ImageCacheTaskFactory for ImageCacheTask { - fn new(resource_task: ResourceTask, task_pool: TaskPool, - time_profiler_chan: time::ProfilerChan, - load_placeholder: LoadPlaceholder) -> ImageCacheTask { - let (chan, port) = channel(); - let chan_clone = chan.clone(); - - spawn_named("ImageCacheTask".to_owned(), move || { - let mut cache = ImageCache { - resource_task: resource_task, - port: port, - chan: chan_clone, - state_map: HashMap::new(), - wait_map: HashMap::new(), - need_exit: None, - task_pool: task_pool, - time_profiler_chan: time_profiler_chan, - placeholder_data: Arc::new(vec!()), - }; - cache.run(load_placeholder); - }); +/// Represents an image that has completed loading. +/// Images that fail to load (due to network or decode +/// failure) are still stored here, so that they aren't +/// fetched again. +struct CompletedLoad { + image: Option<Arc<Image>>, +} - ImageCacheTask { - chan: chan, +impl CompletedLoad { + fn new(image: Option<Arc<Image>>) -> CompletedLoad { + CompletedLoad { + image: image, } } +} + +/// Stores information to notify a client when the state +/// of an image changes. +struct ImageListener { + sender: ImageCacheChan, + responder: Option<Box<ImageResponder>>, +} - fn new_sync(resource_task: ResourceTask, task_pool: TaskPool, - time_profiler_chan: time::ProfilerChan, - load_placeholder: LoadPlaceholder) -> ImageCacheTask { - let (chan, port) = channel(); +impl ImageListener { + fn new(sender: ImageCacheChan, responder: Option<Box<ImageResponder>>) -> ImageListener { + ImageListener { + sender: sender, + responder: responder, + } + } - spawn_named("ImageCacheTask (sync)".to_owned(), move || { - let inner_cache: ImageCacheTask = ImageCacheTaskFactory::new(resource_task, task_pool, - time_profiler_chan, load_placeholder); + fn notify(self, image: Option<Arc<Image>>) { + let ImageCacheChan(ref sender) = self.sender; + let msg = ImageCacheResult { + responder: self.responder, + image: image, + }; + sender.send(msg).ok(); + } +} - loop { - let msg: Msg = port.recv().unwrap(); +struct ResourceLoadInfo { + action: ResponseAction, + url: Url, +} - match msg { - Msg::GetImage(url, response) => { - inner_cache.send(Msg::WaitForImage(url, response)); - } - Msg::Exit(response) => { - inner_cache.send(Msg::Exit(response)); - break; - } - msg => inner_cache.send(msg) - } - } - }); +struct ResourceListener { + url: Url, + sender: Sender<ResourceLoadInfo>, +} - ImageCacheTask { - chan: chan, - } +impl AsyncResponseTarget for ResourceListener { + fn invoke_with_listener(&self, action: ResponseAction) { + self.sender.send(ResourceLoadInfo { + action: action, + url: self.url.clone(), + }).unwrap(); } } +/// Implementation of the image cache struct ImageCache { - /// A handle to the resource task for fetching the image binaries - resource_task: ResourceTask, - /// The port on which we'll receive client requests - port: Receiver<Msg>, - /// A copy of the shared chan to give to child tasks - chan: Sender<Msg>, - /// The state of processing an image for a URL - state_map: HashMap<Url, ImageState>, - /// List of clients waiting on a WaitForImage response - wait_map: HashMap<Url, Arc<Mutex<Vec<Sender<ImageResponseMsg>>>>>, - need_exit: Option<Sender<()>>, + // Receive commands from clients + cmd_receiver: Receiver<ImageCacheCommand>, + + // Receive notifications from the resource task + progress_receiver: Receiver<ResourceLoadInfo>, + progress_sender: Sender<ResourceLoadInfo>, + + // Receive notifications from the decoder thread pool + decoder_receiver: Receiver<DecoderMsg>, + decoder_sender: Sender<DecoderMsg>, + + // Worker threads for decoding images. task_pool: TaskPool, - time_profiler_chan: time::ProfilerChan, - // Default image used when loading fails. - placeholder_data: Arc<Vec<u8>>, -} -#[derive(Clone)] -enum ImageState { - Init, - Prefetching(AfterPrefetch), - Prefetched(Vec<u8>), - Decoding, - Decoded(Arc<Image>), - Failed + // Resource task handle + resource_task: ResourceTask, + + // Images that are loading over network, or decoding. + pending_loads: HashMap<Url, PendingLoad>, + + // Images that have finished loading (successful or not) + completed_loads: HashMap<Url, CompletedLoad>, + + // The placeholder image used when an image fails to load + placeholder_image: Option<Arc<Image>>, } -#[derive(Clone)] -enum AfterPrefetch { - DoDecode, - DoNotDecode +/// Message that the decoder worker threads send to main image cache task. +struct DecoderMsg { + url: Url, + image: Option<Image>, } -pub enum LoadPlaceholder { - Preload, - Ignore +/// The types of messages that the main image cache task receives. +enum SelectResult { + Command(ImageCacheCommand), + Progress(ResourceLoadInfo), + Decoder(DecoderMsg), } impl ImageCache { - // Used to preload the default placeholder. - fn init(&mut self) { - let mut placeholder_url = resources_dir_path(); - placeholder_url.push("rippy.jpg"); - let image = load_image_data(Url::from_file_path(&*placeholder_url).unwrap(), self.resource_task.clone(), &self.placeholder_data); + fn run(&mut self) { + let mut exit_sender: Option<Sender<()>> = None; - match image { - Err(..) => debug!("image_cache_task: failed loading the placeholder."), - Ok(image_data) => self.placeholder_data = Arc::new(image_data), - } - } - - pub fn run(&mut self, load_placeholder: LoadPlaceholder) { - // We have to load the placeholder before running. - match load_placeholder { - LoadPlaceholder::Preload => self.init(), - LoadPlaceholder::Ignore => debug!("image_cache_task: use old behavior."), - } + loop { + let result = { + let sel = Select::new(); - let mut store_chan: Option<Sender<()>> = None; - let mut store_prefetched_chan: Option<Sender<()>> = None; + let mut cmd_handle = sel.handle(&self.cmd_receiver); + let mut progress_handle = sel.handle(&self.progress_receiver); + let mut decoder_handle = sel.handle(&self.decoder_receiver); - loop { - let msg = self.port.recv().unwrap(); + unsafe { + cmd_handle.add(); + progress_handle.add(); + decoder_handle.add(); + } - match msg { - Msg::Prefetch(url) => self.prefetch(url), - Msg::StorePrefetchedImageData(url, data) => { - store_prefetched_chan.map(|chan| { - chan.send(()).unwrap(); - }); - store_prefetched_chan = None; + let ret = sel.wait(); - self.store_prefetched_image_data(url, data); + if ret == cmd_handle.id() { + SelectResult::Command(self.cmd_receiver.recv().unwrap()) + } else if ret == decoder_handle.id() { + SelectResult::Decoder(self.decoder_receiver.recv().unwrap()) + } else { + SelectResult::Progress(self.progress_receiver.recv().unwrap()) } - Msg::Decode(url) => self.decode(url), - Msg::StoreImage(url, image) => { - store_chan.map(|chan| { - chan.send(()).unwrap(); - }); - store_chan = None; - - self.store_image(url, image) + }; + + match result { + SelectResult::Command(cmd) => { + exit_sender = self.handle_cmd(cmd); } - Msg::GetImage(url, response) => self.get_image(url, response), - Msg::WaitForImage(url, response) => { - self.wait_for_image(url, response) + SelectResult::Progress(msg) => { + self.handle_progress(msg); } - Msg::WaitForStore(chan) => store_chan = Some(chan), - Msg::WaitForStorePrefetched(chan) => store_prefetched_chan = Some(chan), - Msg::Exit(response) => { - assert!(self.need_exit.is_none()); - self.need_exit = Some(response); + SelectResult::Decoder(msg) => { + self.handle_decoder(msg); } } - let need_exit = replace(&mut self.need_exit, None); - - match need_exit { - Some(response) => { - // Wait until we have no outstanding requests and subtasks - // before exiting - let mut can_exit = true; - for (_, state) in self.state_map.iter() { - match *state { - ImageState::Prefetching(..) => can_exit = false, - ImageState::Decoding => can_exit = false, - - ImageState::Init | ImageState::Prefetched(..) | - ImageState::Decoded(..) | ImageState::Failed => () - } - } - - if can_exit { - response.send(()).unwrap(); + // Can only exit when all pending loads are complete. + if let Some(ref exit_sender) = exit_sender { + if self.pending_loads.len() == 0 { + exit_sender.send(()).unwrap(); break; - } else { - self.need_exit = Some(response); } - } - None => () } } } - fn get_state(&self, url: &Url) -> ImageState { - match self.state_map.get(url) { - Some(state) => state.clone(), - None => ImageState::Init - } - } - - fn set_state(&mut self, url: Url, state: ImageState) { - self.state_map.insert(url, state); - } - - fn prefetch(&mut self, url: Url) { - match self.get_state(&url) { - ImageState::Init => { - let to_cache = self.chan.clone(); - let resource_task = self.resource_task.clone(); - let url_clone = url.clone(); - let placeholder = self.placeholder_data.clone(); - spawn_named("ImageCacheTask (prefetch)".to_owned(), move || { - let url = url_clone; - debug!("image_cache_task: started fetch for {}", url.serialize()); - - let image = load_image_data(url.clone(), resource_task.clone(), &placeholder); - to_cache.send(Msg::StorePrefetchedImageData(url.clone(), image)).unwrap(); - debug!("image_cache_task: ended fetch for {}", url.serialize()); - }); - - self.set_state(url, ImageState::Prefetching(AfterPrefetch::DoNotDecode)); + // Handle a request from a client + fn handle_cmd(&mut self, cmd: ImageCacheCommand) -> Option<Sender<()>> { + match cmd { + ImageCacheCommand::Exit(sender) => { + return Some(sender); } - - ImageState::Prefetching(..) | ImageState::Prefetched(..) | - ImageState::Decoding | ImageState::Decoded(..) | ImageState::Failed => { - // We've already begun working on this image + ImageCacheCommand::RequestImage(url, result_chan, responder) => { + self.request_image(url, result_chan, responder); } - } - } - - fn store_prefetched_image_data(&mut self, url: Url, data: Result<Vec<u8>, ()>) { - match self.get_state(&url) { - ImageState::Prefetching(next_step) => { - match data { - Ok(data) => { - self.set_state(url.clone(), ImageState::Prefetched(data)); - match next_step { - AfterPrefetch::DoDecode => self.decode(url), - _ => () - } - } - Err(..) => { - self.set_state(url.clone(), ImageState::Failed); - self.purge_waiters(url, || ImageResponseMsg::ImageFailed); - } - } - } - - ImageState::Init - | ImageState::Prefetched(..) - | ImageState::Decoding - | ImageState::Decoded(..) - | ImageState::Failed => { - panic!("wrong state for storing prefetched image") - } - } - } - - fn decode(&mut self, url: Url) { - match self.get_state(&url) { - ImageState::Init => panic!("decoding image before prefetch"), - - ImageState::Prefetching(AfterPrefetch::DoNotDecode) => { - // We don't have the data yet, queue up the decode - self.set_state(url, ImageState::Prefetching(AfterPrefetch::DoDecode)) + ImageCacheCommand::GetImageIfAvailable(url, consumer) => { + let result = match self.completed_loads.get(&url) { + Some(completed_load) => { + completed_load.image.clone().ok_or(ImageState::LoadError) + } + None => { + let pending_load = self.pending_loads.get(&url); + Err(pending_load.map_or(ImageState::NotRequested, |_| ImageState::Pending)) + } + }; + consumer.send(result).unwrap(); } + }; - ImageState::Prefetching(AfterPrefetch::DoDecode) => { - // We don't have the data yet, but the decode request is queued up - } + None + } - ImageState::Prefetched(data) => { - let to_cache = self.chan.clone(); - let url_clone = url.clone(); - let time_profiler_chan = self.time_profiler_chan.clone(); - - self.task_pool.execute(move || { - let url = url_clone; - debug!("image_cache_task: started image decode for {}", url.serialize()); - let image = profile(time::ProfilerCategory::ImageDecoding, - None, time_profiler_chan, || { - load_from_memory(&data) - }); - - let image = image.map(Arc::new); - to_cache.send(Msg::StoreImage(url.clone(), image)).unwrap(); - debug!("image_cache_task: ended image decode for {}", url.serialize()); - }); - - self.set_state(url, ImageState::Decoding); + // Handle progress messages from the resource task + fn handle_progress(&mut self, msg: ResourceLoadInfo) { + match msg.action { + ResponseAction::HeadersAvailable(_) => {} + ResponseAction::DataAvailable(data) => { + let pending_load = self.pending_loads.get_mut(&msg.url).unwrap(); + pending_load.bytes.push_all(data.as_slice()); } - - ImageState::Decoding | ImageState::Decoded(..) | ImageState::Failed => { - // We've already begun decoding + ResponseAction::ResponseComplete(result) => { + match result { + Ok(()) => { + let pending_load = self.pending_loads.get_mut(&msg.url).unwrap(); + pending_load.result = Some(result); + + let bytes = mem::replace(&mut pending_load.bytes, vec!()); + let url = msg.url.clone(); + let sender = self.decoder_sender.clone(); + + self.task_pool.execute(move || { + let image = load_from_memory(bytes.as_slice()); + let msg = DecoderMsg { + url: url, + image: image + }; + sender.send(msg).unwrap(); + }); + } + Err(_) => { + let placeholder_image = self.placeholder_image.clone(); + self.complete_load(msg.url, placeholder_image); + } + } } } } - fn store_image(&mut self, url: Url, image: Option<Arc<Image>>) { - - match self.get_state(&url) { - ImageState::Decoding => { - match image { - Some(image) => { - self.set_state(url.clone(), ImageState::Decoded(image.clone())); - self.purge_waiters(url, || ImageResponseMsg::ImageReady(image.clone()) ); - } - None => { - self.set_state(url.clone(), ImageState::Failed); - self.purge_waiters(url, || ImageResponseMsg::ImageFailed ); - } - } - } - - ImageState::Init - | ImageState::Prefetching(..) - | ImageState::Prefetched(..) - | ImageState::Decoded(..) - | ImageState::Failed => { - panic!("incorrect state in store_image") - } - } + // Handle a message from one of the decoder worker threads + fn handle_decoder(&mut self, msg: DecoderMsg) { + let image = msg.image.map(Arc::new); + self.complete_load(msg.url, image); } - fn purge_waiters<F>(&mut self, url: Url, f: F) where F: Fn() -> ImageResponseMsg { - match self.wait_map.remove(&url) { - Some(waiters) => { - let items = waiters.lock().unwrap(); - for response in items.iter() { - response.send(f()).unwrap(); - } - } - None => () - } - } + // Change state of a url from pending -> loaded. + fn complete_load(&mut self, url: Url, image: Option<Arc<Image>>) { + let pending_load = self.pending_loads.remove(&url).unwrap(); + + let completed_load = CompletedLoad::new(image.clone()); + self.completed_loads.insert(url, completed_load); - fn get_image(&self, url: Url, response: Sender<ImageResponseMsg>) { - match self.get_state(&url) { - ImageState::Init => panic!("request for image before prefetch"), - ImageState::Prefetching(AfterPrefetch::DoDecode) => response.send(ImageResponseMsg::ImageNotReady).unwrap(), - ImageState::Prefetching(AfterPrefetch::DoNotDecode) | ImageState::Prefetched(..) => panic!("request for image before decode"), - ImageState::Decoding => response.send(ImageResponseMsg::ImageNotReady).unwrap(), - ImageState::Decoded(image) => response.send(ImageResponseMsg::ImageReady(image)).unwrap(), - ImageState::Failed => response.send(ImageResponseMsg::ImageFailed).unwrap(), + for listener in pending_load.listeners.into_iter() { + listener.notify(image.clone()); } } - fn wait_for_image(&mut self, url: Url, response: Sender<ImageResponseMsg>) { - match self.get_state(&url) { - ImageState::Init => panic!("request for image before prefetch"), - - ImageState::Prefetching(AfterPrefetch::DoNotDecode) | ImageState::Prefetched(..) => panic!("request for image before decode"), + // Request an image from the cache + fn request_image(&mut self, url: Url, result_chan: ImageCacheChan, responder: Option<Box<ImageResponder>>) { + let image_listener = ImageListener::new(result_chan, responder); - ImageState::Prefetching(AfterPrefetch::DoDecode) | ImageState::Decoding => { - // We don't have this image yet - match self.wait_map.entry(url) { - Occupied(mut entry) => { - entry.get_mut().lock().unwrap().push(response); + // Check if already completed + match self.completed_loads.get(&url) { + Some(completed_load) => { + // It's already completed, return a notify straight away + image_listener.notify(completed_load.image.clone()); + } + None => { + // Check if the load is already pending + match self.pending_loads.entry(url.clone()) { + Occupied(mut e) => { + // It's pending, so add the listener for state changes + let pending_load = e.get_mut(); + pending_load.add_listener(image_listener); } - Vacant(entry) => { - entry.insert(Arc::new(Mutex::new(vec!(response)))); + Vacant(e) => { + // A new load request! Add the pending load and request + // it from the resource task. + let mut pending_load = PendingLoad::new(); + pending_load.add_listener(image_listener); + e.insert(pending_load); + + let load_data = LoadData::new(url.clone()); + let listener = box ResourceListener { + url: url, + sender: self.progress_sender.clone(), + }; + self.resource_task.send(ControlMsg::Load(load_data, LoadConsumer::Listener(listener))).unwrap(); } } } - - ImageState::Decoded(image) => { - response.send(ImageResponseMsg::ImageReady(image)).unwrap(); - } - - ImageState::Failed => { - response.send(ImageResponseMsg::ImageFailed).unwrap(); - } } } } -pub fn spawn_listener<F, A>(f: F) -> Sender<A> - where F: FnOnce(Receiver<A>) + Send + 'static, - A: Send + 'static -{ - let (setup_chan, setup_port) = channel(); +/// Create a new image cache. +pub fn new_image_cache_task(resource_task: ResourceTask) -> ImageCacheTask { + let (cmd_sender, cmd_receiver) = channel(); + let (progress_sender, progress_receiver) = channel(); + let (decoder_sender, decoder_receiver) = channel(); + + spawn_named("ImageCacheThread".to_owned(), move || { - spawn_named("ImageCacheTask (listener)".to_owned(), move || { - let (chan, port) = channel(); - setup_chan.send(chan).unwrap(); - f(port); + // Preload the placeholder image, used when images fail to load. + let mut placeholder_url = resources_dir_path(); + // TODO (Savago): replace for a prettier one. + placeholder_url.push("rippy.jpg"); + let url = Url::from_file_path(&*placeholder_url).unwrap(); + let placeholder_image = match load_whole_resource(&resource_task, url) { + Err(..) => { + debug!("image_cache_task: failed loading the placeholder."); + None + } + Ok((_, image_data)) => { + Some(Arc::new(load_from_memory(&image_data).unwrap())) + } + }; + + let mut cache = ImageCache { + cmd_receiver: cmd_receiver, + progress_sender: progress_sender, + progress_receiver: progress_receiver, + decoder_sender: decoder_sender, + decoder_receiver: decoder_receiver, + task_pool: TaskPool::new(4), + pending_loads: HashMap::new(), + completed_loads: HashMap::new(), + resource_task: resource_task, + placeholder_image: placeholder_image, + }; + + cache.run(); }); - setup_port.recv().unwrap() + + ImageCacheTask::new(cmd_sender) } |