diff options
author | bors-servo <lbergstrom+bors@mozilla.com> | 2019-10-08 07:56:55 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-08 07:56:55 -0400 |
commit | c1401f53b28916d69d0805babf884d5599f1891b (patch) | |
tree | 8b05e3fd043cadb77bf2e8638b84f54c0bbc0a53 | |
parent | 7371c8689d83edb11aa62dcdf3f7c93bde6e930a (diff) | |
parent | 4f3ba70704b8b160780af2e1b84d8a93a350af81 (diff) | |
download | servo-c1401f53b28916d69d0805babf884d5599f1891b.tar.gz servo-c1401f53b28916d69d0805babf884d5599f1891b.zip |
Auto merge of #24318 - gterzian:http-cache-condvar, r=asajeffrey
Http cache: wait on pending stores
<!-- Please describe your changes on the following line: -->
FIX #24166
---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `___` with appropriate data: -->
- [ ] `./mach build -d` does not report any errors
- [ ] `./mach test-tidy` does not report any errors
- [ ] These changes fix #___ (GitHub issue number if applicable)
<!-- Either: -->
- [ ] There are tests for these changes OR
- [ ] These changes do not require tests because ___
<!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.-->
<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->
<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/servo/24318)
<!-- Reviewable:end -->
-rw-r--r-- | components/net/fetch/methods.rs | 5 | ||||
-rw-r--r-- | components/net/http_cache.rs | 146 | ||||
-rw-r--r-- | components/net/http_loader.rs | 179 | ||||
-rw-r--r-- | components/net/resource_thread.rs | 1 |
4 files changed, 239 insertions, 92 deletions
diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index 503e93d3687..b60265ce2b8 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -41,6 +41,7 @@ lazy_static! { pub type Target<'a> = &'a mut (dyn FetchTaskTarget + Send); +#[derive(Clone)] pub enum Data { Payload(Vec<u8>), Done, @@ -456,7 +457,7 @@ pub fn main_fetch( // Step 24. target.process_response_eof(&response); - if let Ok(mut http_cache) = context.state.http_cache.write() { + if let Ok(http_cache) = context.state.http_cache.write() { http_cache.update_awaiting_consumers(&request, &response); } @@ -478,7 +479,7 @@ fn wait_for_response(response: &mut Response, target: Target, done_chan: &mut Do }, Data::Done => break, Data::Cancelled => { - response.aborted.store(true, Ordering::Relaxed); + response.aborted.store(true, Ordering::Release); break; }, } diff --git a/components/net/http_cache.rs b/components/net/http_cache.rs index 43f82824b1d..e47b4e81951 100644 --- a/components/net/http_cache.rs +++ b/components/net/http_cache.rs @@ -38,7 +38,8 @@ pub struct CacheKey { } impl CacheKey { - fn new(request: &Request) -> CacheKey { + /// Create a cache-key from a request. + pub(crate) fn new(request: &Request) -> CacheKey { CacheKey { url: request.current_url(), } @@ -127,7 +128,15 @@ pub struct HttpCache { entries: HashMap<CacheKey, Vec<CachedResource>>, } -/// Determine if a given response is cacheable based on the initial metadata received. +/// Determine if a response is cacheable by default <https://tools.ietf.org/html/rfc7231#section-6.1> +fn is_cacheable_by_default(status_code: u16) -> bool { + match status_code { + 200 | 203 | 204 | 206 | 300 | 301 | 404 | 405 | 410 | 414 | 501 => true, + _ => false, + } +} + +/// Determine if a given response is cacheable. /// Based on <https://tools.ietf.org/html/rfc7234#section-3> fn response_is_cacheable(metadata: &Metadata) -> bool { // TODO: if we determine that this cache should be considered shared: @@ -239,19 +248,16 @@ fn get_response_expiry(response: &Response) -> Duration { } else { max_heuristic }; - match *code { - 200 | 203 | 204 | 206 | 300 | 301 | 404 | 405 | 410 | 414 | 501 => { - // Status codes that are cacheable by default <https://tools.ietf.org/html/rfc7231#section-6.1> - return heuristic_freshness; - }, - _ => { - // Other status codes can only use heuristic freshness if the public cache directive is present. - if let Some(ref directives) = response.headers.typed_get::<CacheControl>() { - if directives.public() { - return heuristic_freshness; - } + if is_cacheable_by_default(*code) { + // Status codes that are cacheable by default can use heuristics to determine freshness. + return heuristic_freshness; + } else { + // Other status codes can only use heuristic freshness if the public cache directive is present. + if let Some(ref directives) = response.headers.typed_get::<CacheControl>() { + if directives.public() { + return heuristic_freshness; } - }, + } } } // Requires validation upon first use as default. @@ -296,8 +302,11 @@ fn create_cached_response( cached_resource: &CachedResource, cached_headers: &HeaderMap, done_chan: &mut DoneChannel, -) -> CachedResponse { +) -> Option<CachedResponse> { debug!("creating a cached response for {:?}", request.url()); + if cached_resource.aborted.load(Ordering::Acquire) { + return None; + } let resource_timing = ResourceFetchTiming::new(request.timing_type()); let mut response = Response::new( cached_resource.data.metadata.data.final_url.clone(), @@ -333,10 +342,11 @@ fn create_cached_response( // <https://tools.ietf.org/html/rfc7234#section-5.2.2.7> let has_expired = (adjusted_expires < time_since_validated) || (adjusted_expires == time_since_validated); - CachedResponse { + let cached_response = CachedResponse { response: response, needs_validation: has_expired, - } + }; + Some(cached_response) } /// Create a new resource, based on the bytes requested, and an existing resource, @@ -366,7 +376,7 @@ fn create_resource_with_bytes_from_resource( /// Support for range requests <https://tools.ietf.org/html/rfc7233>. fn handle_range_request( request: &Request, - candidates: Vec<&CachedResource>, + candidates: &[&CachedResource], range_spec: Vec<(Bound<u64>, Bound<u64>)>, done_chan: &mut DoneChannel, ) -> Option<CachedResponse> { @@ -411,7 +421,9 @@ fn handle_range_request( let cached_headers = new_resource.data.metadata.headers.lock().unwrap(); let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan); - return Some(cached_response); + if let Some(cached_response) = cached_response { + return Some(cached_response); + } } } }, @@ -444,7 +456,9 @@ fn handle_range_request( create_resource_with_bytes_from_resource(&bytes, partial_resource); let cached_response = create_cached_response(request, &new_resource, &*headers, done_chan); - return Some(cached_response); + if let Some(cached_response) = cached_response { + return Some(cached_response); + } } } } @@ -459,7 +473,9 @@ fn handle_range_request( let cached_headers = new_resource.data.metadata.headers.lock().unwrap(); let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan); - return Some(cached_response); + if let Some(cached_response) = cached_response { + return Some(cached_response); + } } } }, @@ -493,7 +509,9 @@ fn handle_range_request( create_resource_with_bytes_from_resource(&bytes, partial_resource); let cached_response = create_cached_response(request, &new_resource, &*headers, done_chan); - return Some(cached_response); + if let Some(cached_response) = cached_response { + return Some(cached_response); + } } } } @@ -508,7 +526,9 @@ fn handle_range_request( let cached_headers = new_resource.data.metadata.headers.lock().unwrap(); let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan); - return Some(cached_response); + if let Some(cached_response) = cached_response { + return Some(cached_response); + } } } }, @@ -546,7 +566,9 @@ fn handle_range_request( create_resource_with_bytes_from_resource(&bytes, partial_resource); let cached_response = create_cached_response(request, &new_resource, &*headers, done_chan); - return Some(cached_response); + if let Some(cached_response) = cached_response { + return Some(cached_response); + } } } } @@ -638,7 +660,7 @@ impl HttpCache { if let Some(range_spec) = request.headers.typed_get::<Range>() { return handle_range_request( request, - candidates, + candidates.as_slice(), range_spec.iter().collect(), done_chan, ); @@ -669,7 +691,9 @@ impl HttpCache { let cached_headers = cached_resource.data.metadata.headers.lock().unwrap(); let cached_response = create_cached_response(request, cached_resource, &*cached_headers, done_chan); - return Some(cached_response); + if let Some(cached_response) = cached_response { + return Some(cached_response); + } } } debug!("couldn't find an appropriate response, not caching"); @@ -677,35 +701,47 @@ impl HttpCache { None } - /// Updating consumers who received a response constructed with a ResponseBody::Receiving. - pub fn update_awaiting_consumers(&mut self, request: &Request, response: &Response) { - if let ResponseBody::Done(ref completed_body) = - *response.actual_response().body.lock().unwrap() - { - let entry_key = CacheKey::new(&request); - if let Some(cached_resources) = self.entries.get(&entry_key) { - // Ensure we only wake-up consumers of relevant resources, - // ie we don't want to wake-up 200 awaiting consumers with a 206. - let relevant_cached_resources = cached_resources - .iter() - .filter(|resource| resource.data.raw_status == response.raw_status); - for cached_resource in relevant_cached_resources { - let mut awaiting_consumers = cached_resource.awaiting_body.lock().unwrap(); - for done_sender in awaiting_consumers.drain(..) { - if cached_resource.aborted.load(Ordering::Relaxed) || - response.is_network_error() - { - // In the case of an aborted fetch or a network errror, - // wake-up all awaiting consumers. - // Each will then start a new network request. - // TODO: Wake-up only one consumer, and make it the producer on which others wait. - let _ = done_sender.send(Data::Cancelled); - } else { - let _ = done_sender.send(Data::Payload(completed_body.clone())); - let _ = done_sender.send(Data::Done); - } - } + /// Wake-up consumers of cached resources + /// whose response body was still receiving data when the resource was constructed, + /// and whose response has now either been completed or cancelled. + pub fn update_awaiting_consumers(&self, request: &Request, response: &Response) { + let entry_key = CacheKey::new(&request); + + let cached_resources = match self.entries.get(&entry_key) { + None => return, + Some(resources) => resources, + }; + + // Ensure we only wake-up consumers of relevant resources, + // ie we don't want to wake-up 200 awaiting consumers with a 206. + let relevant_cached_resources = cached_resources.iter().filter(|resource| { + if response.actual_response().is_network_error() { + return *resource.body.lock().unwrap() == ResponseBody::Empty; + } + resource.data.raw_status == response.raw_status + }); + + for cached_resource in relevant_cached_resources { + let mut awaiting_consumers = cached_resource.awaiting_body.lock().unwrap(); + if awaiting_consumers.is_empty() { + continue; + } + let to_send = if cached_resource.aborted.load(Ordering::Acquire) { + // In the case of an aborted fetch, + // wake-up all awaiting consumers. + // Each will then start a new network request. + // TODO: Wake-up only one consumer, and make it the producer on which others wait. + Data::Cancelled + } else { + match *cached_resource.body.lock().unwrap() { + ResponseBody::Done(_) | ResponseBody::Empty => Data::Done, + ResponseBody::Receiving(_) => { + continue; + }, } + }; + for done_sender in awaiting_consumers.drain(..) { + let _ = done_sender.send(to_send.clone()); } } } @@ -857,7 +893,7 @@ impl HttpCache { last_validated: time::now(), }), }; - let entry = self.entries.entry(entry_key).or_insert(vec![]); + let entry = self.entries.entry(entry_key).or_insert_with(|| vec![]); entry.push(entry_resource); // TODO: Complete incomplete responses, including 206 response, when stored here. // See A cache MAY complete a stored incomplete response by making a subsequent range request diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index 9d697e35e1e..f2158bb4aee 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -12,7 +12,7 @@ use crate::fetch::methods::{ }; use crate::fetch::methods::{Data, DoneChannel, FetchContext, Target}; use crate::hsts::HstsList; -use crate::http_cache::HttpCache; +use crate::http_cache::{CacheKey, HttpCache}; use crate::resource_thread::AuthCache; use crossbeam_channel::{unbounded, Sender}; use devtools_traits::{ @@ -53,7 +53,7 @@ use std::iter::FromIterator; use std::mem; use std::ops::Deref; use std::str::FromStr; -use std::sync::{Mutex, RwLock}; +use std::sync::{Condvar, Mutex, RwLock}; use std::time::{Duration, SystemTime}; use time::{self, Tm}; use tokio::prelude::{future, Future, Stream}; @@ -63,10 +63,25 @@ lazy_static! { pub static ref HANDLE: Mutex<Runtime> = { Mutex::new(Runtime::new().unwrap()) }; } +/// The various states an entry of the HttpCache can be in. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum HttpCacheEntryState { + /// The entry is fully up-to-date, + /// there are no pending concurrent stores, + /// and it is ready to construct cached responses. + ReadyToConstruct, + /// The entry is pending a number of concurrent stores. + PendingStore(usize), +} + pub struct HttpState { pub hsts_list: RwLock<HstsList>, pub cookie_jar: RwLock<CookieStorage>, pub http_cache: RwLock<HttpCache>, + /// A map of cache key to entry state, + /// reflecting whether the cache entry is ready to read from, + /// or whether a concurrent pending store should be awaited. + pub http_cache_state: Mutex<HashMap<CacheKey, Arc<(Mutex<HttpCacheEntryState>, Condvar)>>>, pub auth_cache: RwLock<AuthCache>, pub history_states: RwLock<HashMap<HistoryStateId, Vec<u8>>>, pub client: Client<Connector, Body>, @@ -80,6 +95,7 @@ impl HttpState { auth_cache: RwLock::new(AuthCache::new()), history_states: RwLock::new(HashMap::new()), http_cache: RwLock::new(HttpCache::new()), + http_cache_state: Mutex::new(HashMap::new()), client: create_http_client(ssl_connector_builder, HANDLE.lock().unwrap().executor()), } } @@ -1020,42 +1036,124 @@ fn http_network_or_cache_fetch( // Step 5.18 // TODO If there’s a proxy-authentication entry, use it as appropriate. - // Step 5.19 - if let Ok(http_cache) = context.state.http_cache.read() { - if let Some(response_from_cache) = http_cache.construct_response(&http_request, done_chan) { - let response_headers = response_from_cache.response.headers.clone(); - // Substep 1, 2, 3, 4 - let (cached_response, needs_revalidation) = - match (http_request.cache_mode, &http_request.mode) { - (CacheMode::ForceCache, _) => (Some(response_from_cache.response), false), - (CacheMode::OnlyIfCached, &RequestMode::SameOrigin) => { - (Some(response_from_cache.response), false) - }, - (CacheMode::OnlyIfCached, _) | - (CacheMode::NoStore, _) | - (CacheMode::Reload, _) => (None, false), - (_, _) => ( - Some(response_from_cache.response), - response_from_cache.needs_validation, - ), - }; - if needs_revalidation { - revalidating_flag = true; - // Substep 5 - if let Some(http_date) = response_headers.typed_get::<LastModified>() { - let http_date: SystemTime = http_date.into(); - http_request - .headers - .typed_insert(IfModifiedSince::from(http_date)); + // If the cache is not ready to construct a response, wait. + // + // The cache is not ready if a previous fetch checked the cache, found nothing, + // and moved on to a network fetch, and hasn't updated the cache yet with a pending resource. + // + // Note that this is a different workflow from the one involving `wait_for_cached_response`. + // That one happens when a fetch gets a cache hit, and the resource is pending completion from the network. + { + let (lock, cvar) = { + let entry_key = CacheKey::new(&http_request); + let mut state_map = context.state.http_cache_state.lock().unwrap(); + &*state_map + .entry(entry_key) + .or_insert_with(|| { + Arc::new(( + Mutex::new(HttpCacheEntryState::ReadyToConstruct), + Condvar::new(), + )) + }) + .clone() + }; + + // Start of critical section on http-cache state. + let mut state = lock.lock().unwrap(); + while let HttpCacheEntryState::PendingStore(_) = *state { + let (current_state, time_out) = cvar + .wait_timeout(state, Duration::from_millis(500)) + .unwrap(); + state = current_state; + if time_out.timed_out() { + // After a timeout, ignore the pending store. + break; + } + } + + // Step 5.19 + if let Ok(http_cache) = context.state.http_cache.read() { + if let Some(response_from_cache) = + http_cache.construct_response(&http_request, done_chan) + { + let response_headers = response_from_cache.response.headers.clone(); + // Substep 1, 2, 3, 4 + let (cached_response, needs_revalidation) = + match (http_request.cache_mode, &http_request.mode) { + (CacheMode::ForceCache, _) => (Some(response_from_cache.response), false), + (CacheMode::OnlyIfCached, &RequestMode::SameOrigin) => { + (Some(response_from_cache.response), false) + }, + (CacheMode::OnlyIfCached, _) | + (CacheMode::NoStore, _) | + (CacheMode::Reload, _) => (None, false), + (_, _) => ( + Some(response_from_cache.response), + response_from_cache.needs_validation, + ), + }; + if cached_response.is_none() { + // Ensure the done chan is not set if we're not using the cached response, + // as the cache might have set it to Some if it constructed a pending response. + *done_chan = None; + + // Update the cache state, incrementing the pending store count, + // or starting the count. + if let HttpCacheEntryState::PendingStore(i) = *state { + let new = i + 1; + *state = HttpCacheEntryState::PendingStore(new); + } else { + *state = HttpCacheEntryState::PendingStore(1); + } } - if let Some(entity_tag) = response_headers.get(header::ETAG) { - http_request - .headers - .insert(header::IF_NONE_MATCH, entity_tag.clone()); + if needs_revalidation { + revalidating_flag = true; + // Substep 5 + if let Some(http_date) = response_headers.typed_get::<LastModified>() { + let http_date: SystemTime = http_date.into(); + http_request + .headers + .typed_insert(IfModifiedSince::from(http_date)); + } + if let Some(entity_tag) = response_headers.get(header::ETAG) { + http_request + .headers + .insert(header::IF_NONE_MATCH, entity_tag.clone()); + } + } else { + // Substep 6 + response = cached_response; } + } + } + // Notify the next thread waiting in line, if there is any. + if *state == HttpCacheEntryState::ReadyToConstruct { + cvar.notify_one(); + } + // End of critical section on http-cache state. + } + + // Decrement the number of pending stores, + // and set the state to ready to construct, + // if no stores are pending. + fn update_http_cache_state(context: &FetchContext, http_request: &Request) { + let (lock, cvar) = { + let entry_key = CacheKey::new(&http_request); + let mut state_map = context.state.http_cache_state.lock().unwrap(); + &*state_map + .get_mut(&entry_key) + .expect("Entry in http-cache state to have been previously inserted") + .clone() + }; + let mut state = lock.lock().unwrap(); + if let HttpCacheEntryState::PendingStore(i) = *state { + let new = i - 1; + if new == 0 { + *state = HttpCacheEntryState::ReadyToConstruct; + // Notify the next thread waiting in line, if there is any. + cvar.notify_one(); } else { - // Substep 6 - response = cached_response; + *state = HttpCacheEntryState::PendingStore(new); } } } @@ -1065,6 +1163,7 @@ fn http_network_or_cache_fetch( // The cache constructed a response with a body of ResponseBody::Receiving. // We wait for the response in the cache to "finish", // with a body of either Done or Cancelled. + assert!(response.is_some()); loop { match ch .1 @@ -1095,6 +1194,9 @@ fn http_network_or_cache_fetch( if response.is_none() { // Substep 1 if http_request.cache_mode == CacheMode::OnlyIfCached { + // The cache will not be updated, + // set its state to ready to construct. + update_http_cache_state(context, &http_request); return Response::network_error(NetworkError::Internal( "Couldn't find response in cache".into(), )); @@ -1141,6 +1243,9 @@ fn http_network_or_cache_fetch( let mut response = response.unwrap(); + // The cache has been updated, set its state to ready to construct. + update_http_cache_state(context, &http_request); + // Step 8 // TODO: if necessary set response's range-requested flag @@ -1170,6 +1275,10 @@ fn http_network_or_cache_fetch( return response; } + // Make sure this is set to None, + // since we're about to start a new `http_network_or_cache_fetch`. + *done_chan = None; + // Substep 4 response = http_network_or_cache_fetch( http_request, diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs index 33f878f1207..b93ed291dd2 100644 --- a/components/net/resource_thread.rs +++ b/components/net/resource_thread.rs @@ -146,6 +146,7 @@ fn create_http_states( cookie_jar: RwLock::new(cookie_jar), auth_cache: RwLock::new(auth_cache), http_cache: RwLock::new(http_cache), + http_cache_state: Mutex::new(HashMap::new()), hsts_list: RwLock::new(hsts_list), history_states: RwLock::new(HashMap::new()), client: create_http_client(ssl_connector_builder, HANDLE.lock().unwrap().executor()), |