diff options
author | Gregory Terzian <gterzian@users.noreply.github.com> | 2017-11-26 21:31:26 +0800 |
---|---|---|
committer | Gregory Terzian <gterzian@users.noreply.github.com> | 2018-01-13 10:48:36 +0800 |
commit | 993e2f55eda267c758afd8bbc0e850232b4e7ba0 (patch) | |
tree | 008518a79eb01b085da79b774726c9145a266194 /components/net/http_cache.rs | |
parent | 609e975c500f734106f0006028b54f4d1e651969 (diff) | |
download | servo-993e2f55eda267c758afd8bbc0e850232b4e7ba0.tar.gz servo-993e2f55eda267c758afd8bbc0e850232b4e7ba0.zip |
handle caching of response with a body of ResponseBody::Receiving
Diffstat (limited to 'components/net/http_cache.rs')
-rw-r--r-- | components/net/http_cache.rs | 71 |
1 files changed, 49 insertions, 22 deletions
diff --git a/components/net/http_cache.rs b/components/net/http_cache.rs index d068fbe8803..219a627314c 100644 --- a/components/net/http_cache.rs +++ b/components/net/http_cache.rs @@ -7,7 +7,7 @@ //! A memory cache implementing the logic specified in http://tools.ietf.org/html/rfc7234 //! and <http://tools.ietf.org/html/rfc7232>. -use fetch::methods::DoneChannel; +use fetch::methods::{Data, DoneChannel}; use hyper::header; use hyper::header::ContentType; use hyper::header::Headers; @@ -23,6 +23,7 @@ use std::collections::HashMap; use std::str; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Sender}; use time; use time::{Duration, Tm}; @@ -66,6 +67,7 @@ struct CachedResource { expires: Duration, last_validated: Tm, aborted: Arc<AtomicBool>, + awaiting_body: Arc<Mutex<Vec<Sender<Data>>>> } /// Metadata about a loaded resource, such as is obtained from HTTP headers. @@ -271,11 +273,19 @@ fn get_expiry_adjustment_from_request_headers(request: &Request, expires: Durati } /// Create a CachedResponse from a request and a CachedResource. -fn create_cached_response(request: &Request, cached_resource: &CachedResource, cached_headers: &Headers) +fn create_cached_response(request: &Request, + cached_resource: &CachedResource, + cached_headers: &Headers, + done_chan: &mut DoneChannel) -> CachedResponse { let mut response = Response::new(cached_resource.metadata.final_url.clone()); response.headers = cached_headers.clone(); response.body = cached_resource.body.clone(); + if let ResponseBody::Receiving(_) = *cached_resource.body.lock().unwrap() { + let (done_sender, done_receiver) = channel(); + *done_chan = Some((done_sender.clone(), done_receiver)); + cached_resource.awaiting_body.lock().unwrap().push(done_sender); + } response.location_url = cached_resource.location_url.clone(); response.status = cached_resource.status.clone(); response.raw_status = cached_resource.raw_status.clone(); @@ -313,11 +323,15 @@ fn create_resource_with_bytes_from_resource(bytes: &[u8], resource: &CachedResou expires: resource.expires.clone(), last_validated: resource.last_validated.clone(), aborted: Arc::new(AtomicBool::new(false)), + awaiting_body: Arc::new(Mutex::new(vec![])) } } /// Support for range requests <https://tools.ietf.org/html/rfc7233>. -fn handle_range_request(request: &Request, candidates: Vec<&CachedResource>, range_spec: &[header::ByteRangeSpec]) +fn handle_range_request(request: &Request, + candidates: Vec<&CachedResource>, + range_spec: &[header::ByteRangeSpec], + done_chan: &mut DoneChannel) -> Option<CachedResponse> { let mut complete_cached_resources = candidates.iter().filter(|resource| { match resource.raw_status { @@ -348,7 +362,7 @@ fn handle_range_request(request: &Request, candidates: Vec<&CachedResource>, ran if let Some(bytes) = requested { let new_resource = create_resource_with_bytes_from_resource(bytes, complete_resource); let cached_headers = new_resource.metadata.headers.lock().unwrap(); - let cached_response = create_cached_response(request, &new_resource, &*cached_headers); + let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan); return Some(cached_response); } } @@ -375,7 +389,7 @@ fn handle_range_request(request: &Request, candidates: Vec<&CachedResource>, ran }; if let Some(bytes) = requested { let new_resource = create_resource_with_bytes_from_resource(&bytes, partial_resource); - let cached_response = create_cached_response(request, &new_resource, &*headers); + let cached_response = create_cached_response(request, &new_resource, &*headers, done_chan); return Some(cached_response); } } @@ -388,7 +402,7 @@ fn handle_range_request(request: &Request, candidates: Vec<&CachedResource>, ran if let Some(bytes) = requested { let new_resource = create_resource_with_bytes_from_resource(bytes, complete_resource); let cached_headers = new_resource.metadata.headers.lock().unwrap(); - let cached_response = create_cached_response(request, &new_resource, &*cached_headers); + let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan); return Some(cached_response); } } @@ -415,7 +429,7 @@ fn handle_range_request(request: &Request, candidates: Vec<&CachedResource>, ran }; if let Some(bytes) = requested { let new_resource = create_resource_with_bytes_from_resource(&bytes, partial_resource); - let cached_response = create_cached_response(request, &new_resource, &*headers); + let cached_response = create_cached_response(request, &new_resource, &*headers, done_chan); return Some(cached_response); } } @@ -428,7 +442,7 @@ fn handle_range_request(request: &Request, candidates: Vec<&CachedResource>, ran if let Some(bytes) = requested { let new_resource = create_resource_with_bytes_from_resource(bytes, complete_resource); let cached_headers = new_resource.metadata.headers.lock().unwrap(); - let cached_response = create_cached_response(request, &new_resource, &*cached_headers); + let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan); return Some(cached_response); } } @@ -455,7 +469,7 @@ fn handle_range_request(request: &Request, candidates: Vec<&CachedResource>, ran }; if let Some(bytes) = requested { let new_resource = create_resource_with_bytes_from_resource(&bytes, partial_resource); - let cached_response = create_cached_response(request, &new_resource, &*headers); + let cached_response = create_cached_response(request, &new_resource, &*headers, done_chan); return Some(cached_response); } } @@ -476,22 +490,14 @@ impl HttpCache { /// Constructing Responses from Caches. /// <https://tools.ietf.org/html/rfc7234#section-4> - pub fn construct_response(&self, request: &Request) -> Option<CachedResponse> { + pub fn construct_response(&self, request: &Request, done_chan: &mut DoneChannel) -> Option<CachedResponse> { // TODO: generate warning headers as appropriate <https://tools.ietf.org/html/rfc7234#section-5.5> if request.method != Method::Get { // Only Get requests are cached, avoid a url based match for others. return None; } let entry_key = CacheKey::new(request.clone()); - let resources = self.entries.get(&entry_key)?.into_iter().filter(|r| { - match *r.body.lock().unwrap() { - ResponseBody::Done(_) => !r.aborted.load(Ordering::Relaxed), - // TODO: use fetch::methods::DoneChannel, in order to be able to - // construct a response with a body in ResponseBody::Receiving mode. - ResponseBody::Receiving(_) => false, - ResponseBody::Empty => true - } - }); + let resources = self.entries.get(&entry_key)?.into_iter().filter(|r| { !r.aborted.load(Ordering::Relaxed) }); let mut candidates = vec![]; for cached_resource in resources { let mut can_be_constructed = true; @@ -541,7 +547,7 @@ impl HttpCache { } // Support for range requests if let Some(&header::Range::Bytes(ref range_spec)) = request.headers.get::<header::Range>() { - return handle_range_request(request, candidates, &range_spec); + return handle_range_request(request, candidates, &range_spec, done_chan); } else { // Not a Range request. if let Some(ref cached_resource) = candidates.first() { @@ -549,13 +555,33 @@ impl HttpCache { // TODO: select the most appropriate one, using a known mechanism from a selecting header field, // or using the Date header to return the most recent one. let cached_headers = cached_resource.metadata.headers.lock().unwrap(); - let cached_response = create_cached_response(request, cached_resource, &*cached_headers); + let cached_response = create_cached_response(request, cached_resource, &*cached_headers, done_chan); return Some(cached_response); } } None } + /// Updating consumers who received a response constructed with a ResponseBody::Receiving. + pub fn update_awaiting_consumers(&mut self, request: &Request, response: &Response) { + if let ResponseBody::Done(ref completed_body) = *response.body.lock().unwrap() { + let entry_key = CacheKey::new(request.clone()); + if let Some(cached_resources) = self.entries.get(&entry_key) { + for cached_resource in cached_resources.iter() { + let mut awaiting_consumers = cached_resource.awaiting_body.lock().unwrap(); + for done_sender in awaiting_consumers.drain(..) { + if cached_resource.aborted.load(Ordering::Relaxed) { + let _ = done_sender.send(Data::Cancelled); + } else { + let _ = done_sender.send(Data::Payload(completed_body.clone())); + let _ = done_sender.send(Data::Done); + } + }; + } + } + } + } + /// Freshening Stored Responses upon Validation. /// <https://tools.ietf.org/html/rfc7234#section-4.3.4> pub fn refresh(&mut self, request: &Request, response: Response, done_chan: &mut DoneChannel) -> Option<Response> { @@ -655,7 +681,8 @@ impl HttpCache { url_list: response.url_list.clone(), expires: expiry, last_validated: time::now(), - aborted: response.aborted.clone() + aborted: response.aborted.clone(), + awaiting_body: Arc::new(Mutex::new(vec![])) }; let entry = self.entries.entry(entry_key).or_insert(vec![]); entry.push(entry_resource); |