aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbors-servo <lbergstrom+bors@mozilla.com>2019-10-08 07:56:55 -0400
committerGitHub <noreply@github.com>2019-10-08 07:56:55 -0400
commitc1401f53b28916d69d0805babf884d5599f1891b (patch)
tree8b05e3fd043cadb77bf2e8638b84f54c0bbc0a53
parent7371c8689d83edb11aa62dcdf3f7c93bde6e930a (diff)
parent4f3ba70704b8b160780af2e1b84d8a93a350af81 (diff)
downloadservo-c1401f53b28916d69d0805babf884d5599f1891b.tar.gz
servo-c1401f53b28916d69d0805babf884d5599f1891b.zip
Auto merge of #24318 - gterzian:http-cache-condvar, r=asajeffrey
Http cache: wait on pending stores <!-- Please describe your changes on the following line: --> FIX #24166 --- <!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `___` with appropriate data: --> - [ ] `./mach build -d` does not report any errors - [ ] `./mach test-tidy` does not report any errors - [ ] These changes fix #___ (GitHub issue number if applicable) <!-- Either: --> - [ ] There are tests for these changes OR - [ ] These changes do not require tests because ___ <!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.--> <!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. --> <!-- Reviewable:start --> --- This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/servo/24318) <!-- Reviewable:end -->
-rw-r--r--components/net/fetch/methods.rs5
-rw-r--r--components/net/http_cache.rs146
-rw-r--r--components/net/http_loader.rs179
-rw-r--r--components/net/resource_thread.rs1
4 files changed, 239 insertions, 92 deletions
diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs
index 503e93d3687..b60265ce2b8 100644
--- a/components/net/fetch/methods.rs
+++ b/components/net/fetch/methods.rs
@@ -41,6 +41,7 @@ lazy_static! {
pub type Target<'a> = &'a mut (dyn FetchTaskTarget + Send);
+#[derive(Clone)]
pub enum Data {
Payload(Vec<u8>),
Done,
@@ -456,7 +457,7 @@ pub fn main_fetch(
// Step 24.
target.process_response_eof(&response);
- if let Ok(mut http_cache) = context.state.http_cache.write() {
+ if let Ok(http_cache) = context.state.http_cache.write() {
http_cache.update_awaiting_consumers(&request, &response);
}
@@ -478,7 +479,7 @@ fn wait_for_response(response: &mut Response, target: Target, done_chan: &mut Do
},
Data::Done => break,
Data::Cancelled => {
- response.aborted.store(true, Ordering::Relaxed);
+ response.aborted.store(true, Ordering::Release);
break;
},
}
diff --git a/components/net/http_cache.rs b/components/net/http_cache.rs
index 43f82824b1d..e47b4e81951 100644
--- a/components/net/http_cache.rs
+++ b/components/net/http_cache.rs
@@ -38,7 +38,8 @@ pub struct CacheKey {
}
impl CacheKey {
- fn new(request: &Request) -> CacheKey {
+ /// Create a cache-key from a request.
+ pub(crate) fn new(request: &Request) -> CacheKey {
CacheKey {
url: request.current_url(),
}
@@ -127,7 +128,15 @@ pub struct HttpCache {
entries: HashMap<CacheKey, Vec<CachedResource>>,
}
-/// Determine if a given response is cacheable based on the initial metadata received.
+/// Determine if a response is cacheable by default <https://tools.ietf.org/html/rfc7231#section-6.1>
+fn is_cacheable_by_default(status_code: u16) -> bool {
+ match status_code {
+ 200 | 203 | 204 | 206 | 300 | 301 | 404 | 405 | 410 | 414 | 501 => true,
+ _ => false,
+ }
+}
+
+/// Determine if a given response is cacheable.
/// Based on <https://tools.ietf.org/html/rfc7234#section-3>
fn response_is_cacheable(metadata: &Metadata) -> bool {
// TODO: if we determine that this cache should be considered shared:
@@ -239,19 +248,16 @@ fn get_response_expiry(response: &Response) -> Duration {
} else {
max_heuristic
};
- match *code {
- 200 | 203 | 204 | 206 | 300 | 301 | 404 | 405 | 410 | 414 | 501 => {
- // Status codes that are cacheable by default <https://tools.ietf.org/html/rfc7231#section-6.1>
- return heuristic_freshness;
- },
- _ => {
- // Other status codes can only use heuristic freshness if the public cache directive is present.
- if let Some(ref directives) = response.headers.typed_get::<CacheControl>() {
- if directives.public() {
- return heuristic_freshness;
- }
+ if is_cacheable_by_default(*code) {
+ // Status codes that are cacheable by default can use heuristics to determine freshness.
+ return heuristic_freshness;
+ } else {
+ // Other status codes can only use heuristic freshness if the public cache directive is present.
+ if let Some(ref directives) = response.headers.typed_get::<CacheControl>() {
+ if directives.public() {
+ return heuristic_freshness;
}
- },
+ }
}
}
// Requires validation upon first use as default.
@@ -296,8 +302,11 @@ fn create_cached_response(
cached_resource: &CachedResource,
cached_headers: &HeaderMap,
done_chan: &mut DoneChannel,
-) -> CachedResponse {
+) -> Option<CachedResponse> {
debug!("creating a cached response for {:?}", request.url());
+ if cached_resource.aborted.load(Ordering::Acquire) {
+ return None;
+ }
let resource_timing = ResourceFetchTiming::new(request.timing_type());
let mut response = Response::new(
cached_resource.data.metadata.data.final_url.clone(),
@@ -333,10 +342,11 @@ fn create_cached_response(
// <https://tools.ietf.org/html/rfc7234#section-5.2.2.7>
let has_expired =
(adjusted_expires < time_since_validated) || (adjusted_expires == time_since_validated);
- CachedResponse {
+ let cached_response = CachedResponse {
response: response,
needs_validation: has_expired,
- }
+ };
+ Some(cached_response)
}
/// Create a new resource, based on the bytes requested, and an existing resource,
@@ -366,7 +376,7 @@ fn create_resource_with_bytes_from_resource(
/// Support for range requests <https://tools.ietf.org/html/rfc7233>.
fn handle_range_request(
request: &Request,
- candidates: Vec<&CachedResource>,
+ candidates: &[&CachedResource],
range_spec: Vec<(Bound<u64>, Bound<u64>)>,
done_chan: &mut DoneChannel,
) -> Option<CachedResponse> {
@@ -411,7 +421,9 @@ fn handle_range_request(
let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response =
create_cached_response(request, &new_resource, &*cached_headers, done_chan);
- return Some(cached_response);
+ if let Some(cached_response) = cached_response {
+ return Some(cached_response);
+ }
}
}
},
@@ -444,7 +456,9 @@ fn handle_range_request(
create_resource_with_bytes_from_resource(&bytes, partial_resource);
let cached_response =
create_cached_response(request, &new_resource, &*headers, done_chan);
- return Some(cached_response);
+ if let Some(cached_response) = cached_response {
+ return Some(cached_response);
+ }
}
}
}
@@ -459,7 +473,9 @@ fn handle_range_request(
let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response =
create_cached_response(request, &new_resource, &*cached_headers, done_chan);
- return Some(cached_response);
+ if let Some(cached_response) = cached_response {
+ return Some(cached_response);
+ }
}
}
},
@@ -493,7 +509,9 @@ fn handle_range_request(
create_resource_with_bytes_from_resource(&bytes, partial_resource);
let cached_response =
create_cached_response(request, &new_resource, &*headers, done_chan);
- return Some(cached_response);
+ if let Some(cached_response) = cached_response {
+ return Some(cached_response);
+ }
}
}
}
@@ -508,7 +526,9 @@ fn handle_range_request(
let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response =
create_cached_response(request, &new_resource, &*cached_headers, done_chan);
- return Some(cached_response);
+ if let Some(cached_response) = cached_response {
+ return Some(cached_response);
+ }
}
}
},
@@ -546,7 +566,9 @@ fn handle_range_request(
create_resource_with_bytes_from_resource(&bytes, partial_resource);
let cached_response =
create_cached_response(request, &new_resource, &*headers, done_chan);
- return Some(cached_response);
+ if let Some(cached_response) = cached_response {
+ return Some(cached_response);
+ }
}
}
}
@@ -638,7 +660,7 @@ impl HttpCache {
if let Some(range_spec) = request.headers.typed_get::<Range>() {
return handle_range_request(
request,
- candidates,
+ candidates.as_slice(),
range_spec.iter().collect(),
done_chan,
);
@@ -669,7 +691,9 @@ impl HttpCache {
let cached_headers = cached_resource.data.metadata.headers.lock().unwrap();
let cached_response =
create_cached_response(request, cached_resource, &*cached_headers, done_chan);
- return Some(cached_response);
+ if let Some(cached_response) = cached_response {
+ return Some(cached_response);
+ }
}
}
debug!("couldn't find an appropriate response, not caching");
@@ -677,35 +701,47 @@ impl HttpCache {
None
}
- /// Updating consumers who received a response constructed with a ResponseBody::Receiving.
- pub fn update_awaiting_consumers(&mut self, request: &Request, response: &Response) {
- if let ResponseBody::Done(ref completed_body) =
- *response.actual_response().body.lock().unwrap()
- {
- let entry_key = CacheKey::new(&request);
- if let Some(cached_resources) = self.entries.get(&entry_key) {
- // Ensure we only wake-up consumers of relevant resources,
- // ie we don't want to wake-up 200 awaiting consumers with a 206.
- let relevant_cached_resources = cached_resources
- .iter()
- .filter(|resource| resource.data.raw_status == response.raw_status);
- for cached_resource in relevant_cached_resources {
- let mut awaiting_consumers = cached_resource.awaiting_body.lock().unwrap();
- for done_sender in awaiting_consumers.drain(..) {
- if cached_resource.aborted.load(Ordering::Relaxed) ||
- response.is_network_error()
- {
- // In the case of an aborted fetch or a network errror,
- // wake-up all awaiting consumers.
- // Each will then start a new network request.
- // TODO: Wake-up only one consumer, and make it the producer on which others wait.
- let _ = done_sender.send(Data::Cancelled);
- } else {
- let _ = done_sender.send(Data::Payload(completed_body.clone()));
- let _ = done_sender.send(Data::Done);
- }
- }
+ /// Wake-up consumers of cached resources
+ /// whose response body was still receiving data when the resource was constructed,
+ /// and whose response has now either been completed or cancelled.
+ pub fn update_awaiting_consumers(&self, request: &Request, response: &Response) {
+ let entry_key = CacheKey::new(&request);
+
+ let cached_resources = match self.entries.get(&entry_key) {
+ None => return,
+ Some(resources) => resources,
+ };
+
+ // Ensure we only wake-up consumers of relevant resources,
+ // ie we don't want to wake-up 200 awaiting consumers with a 206.
+ let relevant_cached_resources = cached_resources.iter().filter(|resource| {
+ if response.actual_response().is_network_error() {
+ return *resource.body.lock().unwrap() == ResponseBody::Empty;
+ }
+ resource.data.raw_status == response.raw_status
+ });
+
+ for cached_resource in relevant_cached_resources {
+ let mut awaiting_consumers = cached_resource.awaiting_body.lock().unwrap();
+ if awaiting_consumers.is_empty() {
+ continue;
+ }
+ let to_send = if cached_resource.aborted.load(Ordering::Acquire) {
+ // In the case of an aborted fetch,
+ // wake-up all awaiting consumers.
+ // Each will then start a new network request.
+ // TODO: Wake-up only one consumer, and make it the producer on which others wait.
+ Data::Cancelled
+ } else {
+ match *cached_resource.body.lock().unwrap() {
+ ResponseBody::Done(_) | ResponseBody::Empty => Data::Done,
+ ResponseBody::Receiving(_) => {
+ continue;
+ },
}
+ };
+ for done_sender in awaiting_consumers.drain(..) {
+ let _ = done_sender.send(to_send.clone());
}
}
}
@@ -857,7 +893,7 @@ impl HttpCache {
last_validated: time::now(),
}),
};
- let entry = self.entries.entry(entry_key).or_insert(vec![]);
+ let entry = self.entries.entry(entry_key).or_insert_with(|| vec![]);
entry.push(entry_resource);
// TODO: Complete incomplete responses, including 206 response, when stored here.
// See A cache MAY complete a stored incomplete response by making a subsequent range request
diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs
index 9d697e35e1e..f2158bb4aee 100644
--- a/components/net/http_loader.rs
+++ b/components/net/http_loader.rs
@@ -12,7 +12,7 @@ use crate::fetch::methods::{
};
use crate::fetch::methods::{Data, DoneChannel, FetchContext, Target};
use crate::hsts::HstsList;
-use crate::http_cache::HttpCache;
+use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache;
use crossbeam_channel::{unbounded, Sender};
use devtools_traits::{
@@ -53,7 +53,7 @@ use std::iter::FromIterator;
use std::mem;
use std::ops::Deref;
use std::str::FromStr;
-use std::sync::{Mutex, RwLock};
+use std::sync::{Condvar, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use time::{self, Tm};
use tokio::prelude::{future, Future, Stream};
@@ -63,10 +63,25 @@ lazy_static! {
pub static ref HANDLE: Mutex<Runtime> = { Mutex::new(Runtime::new().unwrap()) };
}
+/// The various states an entry of the HttpCache can be in.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum HttpCacheEntryState {
+ /// The entry is fully up-to-date,
+ /// there are no pending concurrent stores,
+ /// and it is ready to construct cached responses.
+ ReadyToConstruct,
+ /// The entry is pending a number of concurrent stores.
+ PendingStore(usize),
+}
+
pub struct HttpState {
pub hsts_list: RwLock<HstsList>,
pub cookie_jar: RwLock<CookieStorage>,
pub http_cache: RwLock<HttpCache>,
+ /// A map of cache key to entry state,
+ /// reflecting whether the cache entry is ready to read from,
+ /// or whether a concurrent pending store should be awaited.
+ pub http_cache_state: Mutex<HashMap<CacheKey, Arc<(Mutex<HttpCacheEntryState>, Condvar)>>>,
pub auth_cache: RwLock<AuthCache>,
pub history_states: RwLock<HashMap<HistoryStateId, Vec<u8>>>,
pub client: Client<Connector, Body>,
@@ -80,6 +95,7 @@ impl HttpState {
auth_cache: RwLock::new(AuthCache::new()),
history_states: RwLock::new(HashMap::new()),
http_cache: RwLock::new(HttpCache::new()),
+ http_cache_state: Mutex::new(HashMap::new()),
client: create_http_client(ssl_connector_builder, HANDLE.lock().unwrap().executor()),
}
}
@@ -1020,42 +1036,124 @@ fn http_network_or_cache_fetch(
// Step 5.18
// TODO If there’s a proxy-authentication entry, use it as appropriate.
- // Step 5.19
- if let Ok(http_cache) = context.state.http_cache.read() {
- if let Some(response_from_cache) = http_cache.construct_response(&http_request, done_chan) {
- let response_headers = response_from_cache.response.headers.clone();
- // Substep 1, 2, 3, 4
- let (cached_response, needs_revalidation) =
- match (http_request.cache_mode, &http_request.mode) {
- (CacheMode::ForceCache, _) => (Some(response_from_cache.response), false),
- (CacheMode::OnlyIfCached, &RequestMode::SameOrigin) => {
- (Some(response_from_cache.response), false)
- },
- (CacheMode::OnlyIfCached, _) |
- (CacheMode::NoStore, _) |
- (CacheMode::Reload, _) => (None, false),
- (_, _) => (
- Some(response_from_cache.response),
- response_from_cache.needs_validation,
- ),
- };
- if needs_revalidation {
- revalidating_flag = true;
- // Substep 5
- if let Some(http_date) = response_headers.typed_get::<LastModified>() {
- let http_date: SystemTime = http_date.into();
- http_request
- .headers
- .typed_insert(IfModifiedSince::from(http_date));
+ // If the cache is not ready to construct a response, wait.
+ //
+ // The cache is not ready if a previous fetch checked the cache, found nothing,
+ // and moved on to a network fetch, and hasn't updated the cache yet with a pending resource.
+ //
+ // Note that this is a different workflow from the one involving `wait_for_cached_response`.
+ // That one happens when a fetch gets a cache hit, and the resource is pending completion from the network.
+ {
+ let (lock, cvar) = {
+ let entry_key = CacheKey::new(&http_request);
+ let mut state_map = context.state.http_cache_state.lock().unwrap();
+ &*state_map
+ .entry(entry_key)
+ .or_insert_with(|| {
+ Arc::new((
+ Mutex::new(HttpCacheEntryState::ReadyToConstruct),
+ Condvar::new(),
+ ))
+ })
+ .clone()
+ };
+
+ // Start of critical section on http-cache state.
+ let mut state = lock.lock().unwrap();
+ while let HttpCacheEntryState::PendingStore(_) = *state {
+ let (current_state, time_out) = cvar
+ .wait_timeout(state, Duration::from_millis(500))
+ .unwrap();
+ state = current_state;
+ if time_out.timed_out() {
+ // After a timeout, ignore the pending store.
+ break;
+ }
+ }
+
+ // Step 5.19
+ if let Ok(http_cache) = context.state.http_cache.read() {
+ if let Some(response_from_cache) =
+ http_cache.construct_response(&http_request, done_chan)
+ {
+ let response_headers = response_from_cache.response.headers.clone();
+ // Substep 1, 2, 3, 4
+ let (cached_response, needs_revalidation) =
+ match (http_request.cache_mode, &http_request.mode) {
+ (CacheMode::ForceCache, _) => (Some(response_from_cache.response), false),
+ (CacheMode::OnlyIfCached, &RequestMode::SameOrigin) => {
+ (Some(response_from_cache.response), false)
+ },
+ (CacheMode::OnlyIfCached, _) |
+ (CacheMode::NoStore, _) |
+ (CacheMode::Reload, _) => (None, false),
+ (_, _) => (
+ Some(response_from_cache.response),
+ response_from_cache.needs_validation,
+ ),
+ };
+ if cached_response.is_none() {
+ // Ensure the done chan is not set if we're not using the cached response,
+ // as the cache might have set it to Some if it constructed a pending response.
+ *done_chan = None;
+
+ // Update the cache state, incrementing the pending store count,
+ // or starting the count.
+ if let HttpCacheEntryState::PendingStore(i) = *state {
+ let new = i + 1;
+ *state = HttpCacheEntryState::PendingStore(new);
+ } else {
+ *state = HttpCacheEntryState::PendingStore(1);
+ }
}
- if let Some(entity_tag) = response_headers.get(header::ETAG) {
- http_request
- .headers
- .insert(header::IF_NONE_MATCH, entity_tag.clone());
+ if needs_revalidation {
+ revalidating_flag = true;
+ // Substep 5
+ if let Some(http_date) = response_headers.typed_get::<LastModified>() {
+ let http_date: SystemTime = http_date.into();
+ http_request
+ .headers
+ .typed_insert(IfModifiedSince::from(http_date));
+ }
+ if let Some(entity_tag) = response_headers.get(header::ETAG) {
+ http_request
+ .headers
+ .insert(header::IF_NONE_MATCH, entity_tag.clone());
+ }
+ } else {
+ // Substep 6
+ response = cached_response;
}
+ }
+ }
+ // Notify the next thread waiting in line, if there is any.
+ if *state == HttpCacheEntryState::ReadyToConstruct {
+ cvar.notify_one();
+ }
+ // End of critical section on http-cache state.
+ }
+
+ // Decrement the number of pending stores,
+ // and set the state to ready to construct,
+ // if no stores are pending.
+ fn update_http_cache_state(context: &FetchContext, http_request: &Request) {
+ let (lock, cvar) = {
+ let entry_key = CacheKey::new(&http_request);
+ let mut state_map = context.state.http_cache_state.lock().unwrap();
+ &*state_map
+ .get_mut(&entry_key)
+ .expect("Entry in http-cache state to have been previously inserted")
+ .clone()
+ };
+ let mut state = lock.lock().unwrap();
+ if let HttpCacheEntryState::PendingStore(i) = *state {
+ let new = i - 1;
+ if new == 0 {
+ *state = HttpCacheEntryState::ReadyToConstruct;
+ // Notify the next thread waiting in line, if there is any.
+ cvar.notify_one();
} else {
- // Substep 6
- response = cached_response;
+ *state = HttpCacheEntryState::PendingStore(new);
}
}
}
@@ -1065,6 +1163,7 @@ fn http_network_or_cache_fetch(
// The cache constructed a response with a body of ResponseBody::Receiving.
// We wait for the response in the cache to "finish",
// with a body of either Done or Cancelled.
+ assert!(response.is_some());
loop {
match ch
.1
@@ -1095,6 +1194,9 @@ fn http_network_or_cache_fetch(
if response.is_none() {
// Substep 1
if http_request.cache_mode == CacheMode::OnlyIfCached {
+ // The cache will not be updated,
+ // set its state to ready to construct.
+ update_http_cache_state(context, &http_request);
return Response::network_error(NetworkError::Internal(
"Couldn't find response in cache".into(),
));
@@ -1141,6 +1243,9 @@ fn http_network_or_cache_fetch(
let mut response = response.unwrap();
+ // The cache has been updated, set its state to ready to construct.
+ update_http_cache_state(context, &http_request);
+
// Step 8
// TODO: if necessary set response's range-requested flag
@@ -1170,6 +1275,10 @@ fn http_network_or_cache_fetch(
return response;
}
+ // Make sure this is set to None,
+ // since we're about to start a new `http_network_or_cache_fetch`.
+ *done_chan = None;
+
// Substep 4
response = http_network_or_cache_fetch(
http_request,
diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs
index 33f878f1207..b93ed291dd2 100644
--- a/components/net/resource_thread.rs
+++ b/components/net/resource_thread.rs
@@ -146,6 +146,7 @@ fn create_http_states(
cookie_jar: RwLock::new(cookie_jar),
auth_cache: RwLock::new(auth_cache),
http_cache: RwLock::new(http_cache),
+ http_cache_state: Mutex::new(HashMap::new()),
hsts_list: RwLock::new(hsts_list),
history_states: RwLock::new(HashMap::new()),
client: create_http_client(ssl_connector_builder, HANDLE.lock().unwrap().executor()),