diff options
author | Naveen Gattu <naveen.gattu@gmail.com> | 2021-12-23 11:15:35 -0800 |
---|---|---|
committer | Naveen Gattu <naveen.gattu@gmail.com> | 2022-01-16 09:34:17 -0800 |
commit | a48a111cee6298f4628d4ae12b60a43c1b17cecf (patch) | |
tree | af1ff0300471853886267631e2d66c937a604e9e /components | |
parent | 5df705a41f6c4c1f2ffeec257dfe0129ce5fa8e0 (diff) | |
download | servo-a48a111cee6298f4628d4ae12b60a43c1b17cecf.tar.gz servo-a48a111cee6298f4628d4ae12b60a43c1b17cecf.zip |
Upgrade Hyper
Diffstat (limited to 'components')
37 files changed, 474 insertions, 397 deletions
diff --git a/components/constellation/Cargo.toml b/components/constellation/Cargo.toml index b70bde4341e..6ef5c41f00a 100644 --- a/components/constellation/Cargo.toml +++ b/components/constellation/Cargo.toml @@ -22,7 +22,8 @@ embedder_traits = { path = "../embedder_traits" } euclid = "0.20" gfx = { path = "../gfx" } gfx_traits = { path = "../gfx_traits" } -http = "0.1" +http = "0.2" +headers = "0.3" ipc-channel = "0.14" keyboard-types = "0.5" layout_traits = { path = "../layout_traits" } diff --git a/components/devtools/Cargo.toml b/components/devtools/Cargo.toml index 68da8489cb7..ef86fe41686 100644 --- a/components/devtools/Cargo.toml +++ b/components/devtools/Cargo.toml @@ -14,9 +14,8 @@ path = "lib.rs" crossbeam-channel = "0.4" devtools_traits = { path = "../devtools_traits" } embedder_traits = { path = "../embedder_traits" } -headers = "0.2" -http = "0.1" -hyper = "0.12" +headers = "0.3" +http = "0.2" ipc-channel = "0.14" log = "0.4" msg = { path = "../msg" } diff --git a/components/devtools/actors/network_event.rs b/components/devtools/actors/network_event.rs index 622ac6002eb..1c593e3dcb6 100644 --- a/components/devtools/actors/network_event.rs +++ b/components/devtools/actors/network_event.rs @@ -12,8 +12,7 @@ use crate::StreamId; use devtools_traits::HttpRequest as DevtoolsHttpRequest; use devtools_traits::HttpResponse as DevtoolsHttpResponse; use headers::{ContentType, Cookie, HeaderMapExt}; -use http::{header, HeaderMap}; -use hyper::{Method, StatusCode}; +use http::{header, HeaderMap, Method, StatusCode}; use serde_json::{Map, Value}; use std::net::TcpStream; use time::Tm; @@ -343,6 +342,7 @@ impl NetworkEventActor { pub fn add_request(&mut self, request: DevtoolsHttpRequest) { self.request.url = request.url.as_str().to_owned(); + self.request.method = request.method.clone(); self.request.headers = request.headers.clone(); self.request.body = request.body; diff --git a/components/devtools_traits/Cargo.toml b/components/devtools_traits/Cargo.toml index 366f4d42370..d6c13c581ad 100644 --- a/components/devtools_traits/Cargo.toml +++ b/components/devtools_traits/Cargo.toml @@ -12,7 +12,8 @@ path = "lib.rs" [dependencies] bitflags = "1.0" -http = "0.1" +headers = "0.3" +http = "0.2" ipc-channel = "0.14" malloc_size_of = { path = "../malloc_size_of" } malloc_size_of_derive = "0.1" diff --git a/components/devtools_traits/lib.rs b/components/devtools_traits/lib.rs index d7047960329..dbdf255b71b 100644 --- a/components/devtools_traits/lib.rs +++ b/components/devtools_traits/lib.rs @@ -18,8 +18,8 @@ extern crate malloc_size_of_derive; #[macro_use] extern crate serde; -use http::method::Method; use http::HeaderMap; +use http::Method; use ipc_channel::ipc::IpcSender; use msg::constellation_msg::{BrowsingContextId, PipelineId}; use servo_url::ServoUrl; diff --git a/components/malloc_size_of/Cargo.toml b/components/malloc_size_of/Cargo.toml index 040d377aeb6..60c7fee9c7d 100644 --- a/components/malloc_size_of/Cargo.toml +++ b/components/malloc_size_of/Cargo.toml @@ -13,7 +13,7 @@ servo = [ "accountable-refcell", "content-security-policy", "crossbeam-channel", - "hyper", + "http", "hyper_serde", "keyboard-types", "serde", @@ -34,8 +34,8 @@ crossbeam-channel = { version = "0.4", optional = true } cssparser = "0.29" euclid = "0.20" hashglobe = { path = "../hashglobe" } -hyper = { version = "0.12", optional = true } -hyper_serde = { version = "0.11", optional = true } +http = { version = "0.2", optional = true } +hyper_serde = { version = "0.12", optional = true } keyboard-types = { version = "0.5", optional = true } selectors = { path = "../selectors" } serde = { version = "1.0.27", optional = true } @@ -46,7 +46,7 @@ smallvec = "1.0" string_cache = { version = "0.8", optional = true } thin-slice = "0.1.0" time = { version = "0.1.41", optional = true } -tokio = "0.2" +tokio = "1" url = { version = "2.0", optional = true } uuid = { version = "0.8", features = ["v4"], optional = true } void = "1.0.2" diff --git a/components/malloc_size_of/lib.rs b/components/malloc_size_of/lib.rs index 046b6d2fc5d..559d10e5926 100644 --- a/components/malloc_size_of/lib.rs +++ b/components/malloc_size_of/lib.rs @@ -57,7 +57,7 @@ extern crate cssparser; extern crate euclid; extern crate hashglobe; #[cfg(feature = "servo")] -extern crate hyper; +extern crate http; #[cfg(feature = "servo")] extern crate hyper_serde; #[cfg(feature = "servo")] @@ -957,7 +957,7 @@ impl<T> MallocSizeOf for tokio::sync::mpsc::UnboundedSender<T> { } #[cfg(feature = "servo")] -impl MallocSizeOf for hyper::StatusCode { +impl MallocSizeOf for http::StatusCode { fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 } diff --git a/components/net/Cargo.toml b/components/net/Cargo.toml index 9bca5e1435b..9be41935f62 100644 --- a/components/net/Cargo.toml +++ b/components/net/Cargo.toml @@ -19,7 +19,7 @@ async-recursion = "0.3.2" async-tungstenite = { version = "0.9", features = ["tokio-openssl"] } base64 = "0.10.1" brotli = "3" -bytes = "0.4" +bytes = "1" content-security-policy = { version = "0.4.0", features = ["serde"] } cookie_rs = { package = "cookie", version = "0.11" } crossbeam-channel = "0.4" @@ -27,14 +27,13 @@ data-url = "0.1.0" devtools_traits = { path = "../devtools_traits" } embedder_traits = { path = "../embedder_traits" } flate2 = "1" -futures = "0.1" -futures03 = { version = "0.3", package = "futures" } -futures-util = { version = "0.3", features = ["compat"] } -headers = "0.2" -http = "0.1" -hyper = "0.12" -hyper-openssl = "0.7" -hyper_serde = "0.11" +futures = { version = "0.3", package = "futures" } +futures-util = { version = "0.3" } +headers = "0.3" +http = "0.2" +hyper = { version = "0.14", features = ["client", "http1", "http2", "tcp", "stream"] } +hyper-openssl = "0.9.1" +hyper_serde = "0.12" immeta = "0.4" ipc-channel = "0.14" lazy_static = "1" @@ -59,19 +58,21 @@ servo_arc = { path = "../servo_arc" } servo_config = { path = "../config" } servo_url = { path = "../url" } time = "0.1.41" -tokio = "0.1" +tokio = { version = "1", package = "tokio", features = ["sync", "macros", "rt-multi-thread"] } tokio2 = { version = "0.2", package = "tokio", features = ["sync", "macros", "rt-threaded", "tcp"] } -tokio-compat = "0.1" +tokio-stream = "0.1" tungstenite = "0.11" url = "2.0" uuid = { version = "0.8", features = ["v4"] } webrender_api = { git = "https://github.com/servo/webrender" } [dev-dependencies] -futures = "0.1" +futures = {version = "0.3", features = ["compat"]} std_test_override = { path = "../std_test_override" } -tokio-openssl = "0.3" -tokio-test = "0.2" +tokio-openssl = "0.6" +tokio-test = "0.4" +tokio-stream = { version = "0.1", features = ["net"] } +hyper = { version = "0.14", features = ["full"] } [[test]] name = "main" diff --git a/components/net/connector.rs b/components/net/connector.rs index dc44002a85a..cd24d8697f5 100644 --- a/components/net/connector.rs +++ b/components/net/connector.rs @@ -3,10 +3,12 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ use crate::hosts::replace_host; -use hyper::client::connect::{Connect, Destination}; +use crate::http_loader::HANDLE; +use futures::{task::Context, task::Poll, Future}; +use http::uri::{Authority, Uri as Destination}; use hyper::client::HttpConnector as HyperHttpConnector; -use hyper::rt::Future; -use hyper::{Body, Client}; +use hyper::rt::Executor; +use hyper::{service::Service, Body, Client}; use hyper_openssl::HttpsConnector; use openssl::ex_data::Index; use openssl::ssl::{ @@ -15,7 +17,6 @@ use openssl::ssl::{ use openssl::x509::{self, X509StoreContext}; use std::collections::hash_map::{Entry, HashMap}; use std::sync::{Arc, Mutex}; -use tokio::prelude::future::Executor; pub const BUF_SIZE: usize = 32768; pub const ALPN_H2_H1: &'static [u8] = b"\x02h2\x08http/1.1"; @@ -67,30 +68,53 @@ impl ConnectionCerts { } } +#[derive(Clone)] pub struct HttpConnector { inner: HyperHttpConnector, } impl HttpConnector { fn new() -> HttpConnector { - let mut inner = HyperHttpConnector::new(4); + let mut inner = HyperHttpConnector::new(); inner.enforce_http(false); inner.set_happy_eyeballs_timeout(None); HttpConnector { inner } } } -impl Connect for HttpConnector { - type Transport = <HyperHttpConnector as Connect>::Transport; - type Error = <HyperHttpConnector as Connect>::Error; - type Future = <HyperHttpConnector as Connect>::Future; +impl Service<Destination> for HttpConnector { + type Response = <HyperHttpConnector as Service<Destination>>::Response; + type Error = <HyperHttpConnector as Service<Destination>>::Error; + type Future = <HyperHttpConnector as Service<Destination>>::Future; - fn connect(&self, dest: Destination) -> Self::Future { + fn call(&mut self, dest: Destination) -> Self::Future { // Perform host replacement when making the actual TCP connection. let mut new_dest = dest.clone(); - let addr = replace_host(dest.host()); - new_dest.set_host(&*addr).unwrap(); - self.inner.connect(new_dest) + let mut parts = dest.into_parts(); + + if let Some(auth) = parts.authority { + let host = auth.host(); + let host = replace_host(host); + + let authority = if let Some(port) = auth.port() { + format!("{}:{}", host, port.as_str()) + } else { + format!("{}", &*host) + }; + + if let Ok(authority) = Authority::from_maybe_shared(authority) { + parts.authority = Some(authority); + if let Ok(dest) = Destination::from_parts(parts) { + new_dest = dest + } + } + } + + self.inner.call(new_dest) + } + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Ok(()).into() } } @@ -209,18 +233,28 @@ pub fn create_tls_config( cfg } -pub fn create_http_client<E>(tls_config: TlsConfig, executor: E) -> Client<Connector, Body> +struct TokioExecutor {} + +impl<F> Executor<F> for TokioExecutor where - E: Executor<Box<dyn Future<Error = (), Item = ()> + Send + 'static>> + Sync + Send + 'static, + F: Future<Output = ()> + 'static + std::marker::Send, { + fn execute(&self, fut: F) { + HANDLE.lock().unwrap().as_ref().unwrap().spawn(fut); + } +} + +pub fn create_http_client(tls_config: TlsConfig) -> Client<Connector, Body> { let mut connector = HttpsConnector::with_connector(HttpConnector::new(), tls_config).unwrap(); connector.set_callback(|configuration, destination| { - configuration.set_ex_data(*HOST_INDEX, Host(destination.host().to_owned())); + if let Some(host) = destination.host() { + configuration.set_ex_data(*HOST_INDEX, Host(host.to_owned())); + } Ok(()) }); Client::builder() .http1_title_case_headers(true) - .executor(executor) + .executor(TokioExecutor {}) .build(connector) } diff --git a/components/net/cookie.rs b/components/net/cookie.rs index 4d77d0911c4..3c0b904496d 100644 --- a/components/net/cookie.rs +++ b/components/net/cookie.rs @@ -5,7 +5,7 @@ //! Implementation of cookie creation and matching as specified by //! http://tools.ietf.org/html/rfc6265 -use hyper_serde::{self, Serde}; +use hyper_serde::Serde; use net_traits::pub_domains::is_pub_domain; use net_traits::CookieSource; use servo_url::ServoUrl; diff --git a/components/net/cookie_storage.rs b/components/net/cookie_storage.rs index f0815635476..380f2ece094 100644 --- a/components/net/cookie_storage.rs +++ b/components/net/cookie_storage.rs @@ -159,14 +159,14 @@ impl CookieStorage { // http://tools.ietf.org/html/rfc6265#section-5.4 pub fn cookies_for_url(&mut self, url: &ServoUrl, source: CookieSource) -> Option<String> { let filterer = |c: &&mut Cookie| -> bool { - info!( + debug!( " === SENT COOKIE : {} {} {:?} {:?}", c.cookie.name(), c.cookie.value(), c.cookie.domain(), c.cookie.path() ); - info!( + debug!( " === SENT COOKIE RESULT {}", c.appropriate_for_url(url, source) ); diff --git a/components/net/decoder.rs b/components/net/decoder.rs index b54035fa4d9..d151e08ceec 100644 --- a/components/net/decoder.rs +++ b/components/net/decoder.rs @@ -29,20 +29,24 @@ The following types directly support the gzip compression case: use crate::connector::BUF_SIZE; use brotli::Decompressor; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use flate2::read::DeflateDecoder; -use futures::{Async, Future, Poll, Stream}; +use futures::{task::Context, task::Poll, Future, Stream}; use hyper::header::{HeaderValue, CONTENT_ENCODING, TRANSFER_ENCODING}; -use hyper::{self, Body, Chunk, Response}; +use hyper::{self, Body, Response}; use libflate::non_blocking::gzip; use std::cmp; use std::fmt; use std::io::{self, Read}; use std::mem; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::Waker; +#[derive(Debug)] pub enum Error { Io(io::Error), - Hyper(hyper::error::Error), + Hyper(hyper::Error), } impl From<io::Error> for Error { @@ -51,8 +55,8 @@ impl From<io::Error> for Error { } } -impl From<hyper::error::Error> for Error { - fn from(err: hyper::error::Error) -> Error { +impl From<hyper::Error> for Error { + fn from(err: hyper::Error) -> Error { Error::Hyper(err) } } @@ -93,10 +97,11 @@ struct Pending { } /// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results -/// as a `Chunk`. +/// as a `Bytes`. struct Gzip { inner: Box<gzip::Decoder<Peeked<ReadableChunks<Body>>>>, buf: BytesMut, + reader: Arc<Mutex<ReadableChunks<Body>>>, } impl fmt::Debug for Decoder { @@ -162,37 +167,36 @@ impl Decoder { } impl Stream for Decoder { - type Item = Chunk; - type Error = Error; + type Item = Result<Bytes, Error>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Do a read or poll for a pending decoder value. let new_value = match self.inner { - Inner::Pending(ref mut future) => match future.poll() { - Ok(Async::Ready(inner)) => inner, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => return Err(e.into()), + Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { + Poll::Ready(inner) => inner, + Poll::Pending => return Poll::Pending, }, - Inner::PlainText(ref mut body) => return body.poll().map_err(|e| e.into()), - Inner::Gzip(ref mut decoder) => return decoder.poll(), - Inner::Brotli(ref mut decoder) => return decoder.poll(), - Inner::Deflate(ref mut decoder) => return decoder.poll(), + Inner::PlainText(ref mut body) => { + return Pin::new(body).poll_next(cx).map_err(|e| e.into()) + }, + Inner::Gzip(ref mut decoder) => return Pin::new(decoder).poll_next(cx), + Inner::Brotli(ref mut decoder) => return Pin::new(decoder).poll_next(cx), + Inner::Deflate(ref mut decoder) => return Pin::new(decoder).poll_next(cx), }; + // self.inner = new_value; - self.poll() + self.poll_next(cx) } } impl Future for Pending { - type Item = Inner; - type Error = hyper::error::Error; - - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let body_state = match self.body.poll_stream() { - Ok(Async::Ready(state)) => state, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => return Err(e), + type Output = Inner; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let body_state = match self.body.poll_stream(cx) { + Poll::Ready(state) => state, + Poll::Pending => return Poll::Pending, }; let body = mem::replace(&mut self.body, ReadableChunks::new(Body::empty())); @@ -200,110 +204,133 @@ impl Future for Pending { // if the stream was empty, or truly had an UnexpectedEof. // Therefore, we need to check for EOF first. match body_state { - StreamState::Eof => Ok(Async::Ready(Inner::PlainText(Body::empty()))), - StreamState::HasMore => Ok(Async::Ready(match self.type_ { + StreamState::Eof => Poll::Ready(Inner::PlainText(Body::empty())), + StreamState::HasMore => Poll::Ready(match self.type_ { DecoderType::Gzip => Inner::Gzip(Gzip::new(body)), DecoderType::Brotli => Inner::Brotli(Brotli::new(body)), DecoderType::Deflate => Inner::Deflate(Deflate::new(body)), - })), + }), } } } impl Gzip { fn new(stream: ReadableChunks<Body>) -> Self { + let stream = Arc::new(Mutex::new(stream)); + let reader = stream.clone(); Gzip { buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), inner: Box::new(gzip::Decoder::new(Peeked::new(stream))), + reader: reader, } } } #[allow(unsafe_code)] -fn poll_with_read(reader: &mut dyn Read, buf: &mut BytesMut) -> Poll<Option<Chunk>, Error> { - if buf.remaining_mut() == 0 { - buf.reserve(INIT_BUFFER_SIZE); - } +fn poll_with_read(reader: &mut dyn Read, buf: &mut BytesMut) -> Poll<Option<Result<Bytes, Error>>> { + // Ensure a full size buffer is available. + // `reserve` is optimized to reclaim space over allocating. + buf.reserve(INIT_BUFFER_SIZE); // The buffer contains uninitialised memory so getting a readable slice is unsafe. // We trust the reader not to read from the memory given. // // To be safe, this memory could be zeroed before passing to the reader. // Otherwise we might need to deal with the case where the reader panics. + let read = { - let mut buf = unsafe { buf.bytes_mut() }; - reader.read(&mut buf) + let buf = unsafe { + let ptr = buf.chunk_mut().as_mut_ptr(); + std::slice::from_raw_parts_mut(ptr, buf.capacity()) + }; + reader.read(&mut *buf) }; match read { - Ok(read) if read == 0 => Ok(Async::Ready(None)), + Ok(read) if read == 0 => Poll::Ready(None), Ok(read) => { unsafe { buf.advance_mut(read) }; - let chunk = Chunk::from(buf.split_to(read).freeze()); - - Ok(Async::Ready(Some(chunk))) + let chunk = buf.split_to(read).freeze(); + Poll::Ready(Some(Ok(chunk))) }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(Async::NotReady), - Err(e) => Err(e.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending, + Err(e) => Poll::Ready(Some(Err(e.into()))), } } impl Stream for Gzip { - type Item = Chunk; - type Error = Error; + type Item = Result<Bytes, Error>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - poll_with_read(&mut self.inner, &mut self.buf) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut buf = self.buf.clone(); + if let Ok(mut reader) = self.reader.lock() { + reader.waker = Some(cx.waker().clone()); + } + poll_with_read(&mut self.inner, &mut buf) } } /// A brotli decoder that reads from a `brotli::Decompressor` into a `BytesMut` and emits the results -/// as a `Chunk`. +/// as a `Bytes`. struct Brotli { inner: Box<Decompressor<Peeked<ReadableChunks<Body>>>>, buf: BytesMut, + reader: Arc<Mutex<ReadableChunks<Body>>>, } impl Brotli { fn new(stream: ReadableChunks<Body>) -> Self { + let stream = Arc::new(Mutex::new(stream)); + let reader = stream.clone(); Self { buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), inner: Box::new(Decompressor::new(Peeked::new(stream), BUF_SIZE)), + reader: reader, } } } impl Stream for Brotli { - type Item = Chunk; - type Error = Error; + type Item = Result<Bytes, Error>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - poll_with_read(&mut self.inner, &mut self.buf) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut buf = self.buf.clone(); + if let Ok(mut reader) = self.reader.lock() { + reader.waker = Some(cx.waker().clone()); + } + poll_with_read(&mut self.inner, &mut buf) } } /// A deflate decoder that reads from a `deflate::Decoder` into a `BytesMut` and emits the results -/// as a `Chunk`. +/// as a `Bytes`. struct Deflate { inner: Box<DeflateDecoder<Peeked<ReadableChunks<Body>>>>, buf: BytesMut, + reader: Arc<Mutex<ReadableChunks<Body>>>, } impl Deflate { fn new(stream: ReadableChunks<Body>) -> Self { + let stream = Arc::new(Mutex::new(stream)); + let reader = stream.clone(); Self { buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), inner: Box::new(DeflateDecoder::new(Peeked::new(stream))), + reader: reader, } } } impl Stream for Deflate { - type Item = Chunk; - type Error = Error; + type Item = Result<Bytes, Error>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - poll_with_read(&mut self.inner, &mut self.buf) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut buf = self.buf.clone(); + if let Ok(mut reader) = self.reader.lock() { + reader.waker = Some(cx.waker().clone()); + } + poll_with_read(&mut self.inner, &mut buf) } } @@ -311,17 +338,21 @@ impl Stream for Deflate { pub struct ReadableChunks<S> { state: ReadState, stream: S, + waker: Option<Waker>, } enum ReadState { /// A chunk is ready to be read from. - Ready(Chunk), + Ready(Bytes), /// The next chunk isn't ready yet. NotReady, /// The stream has finished. Eof, + /// Stream is in err + Error(hyper::Error), } +#[derive(Debug)] enum StreamState { /// More bytes can be read from the stream. HasMore, @@ -334,7 +365,7 @@ struct Peeked<R> { state: PeekedState, peeked_buf: [u8; 10], pos: usize, - inner: R, + inner: Arc<Mutex<R>>, } enum PeekedState { @@ -346,7 +377,7 @@ enum PeekedState { impl<R> Peeked<R> { #[inline] - fn new(inner: R) -> Self { + fn new(inner: Arc<Mutex<R>>) -> Self { Peeked { state: PeekedState::NotReady, peeked_buf: [0; 10], @@ -383,11 +414,13 @@ impl<R: Read> Read for Peeked<R> { if self.pos == peeked_buf_len { self.not_ready(); } - return Ok(len); }, PeekedState::NotReady => { - let read = self.inner.read(&mut self.peeked_buf[self.pos..]); + let mut buf = &mut self.peeked_buf[self.pos..]; + let stream = self.inner.clone(); + let mut reader = stream.lock().unwrap(); + let read = reader.read(&mut buf); match read { Ok(0) => self.ready(), @@ -411,6 +444,7 @@ impl<S> ReadableChunks<S> { ReadableChunks { state: ReadState::NotReady, stream: stream, + waker: None, } } } @@ -423,9 +457,12 @@ impl<S> fmt::Debug for ReadableChunks<S> { impl<S> Read for ReadableChunks<S> where - S: Stream<Item = Chunk, Error = hyper::error::Error>, + S: Stream<Item = Result<Bytes, hyper::Error>> + std::marker::Unpin, { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let waker = self.waker.as_ref().unwrap().clone(); + let mut cx = Context::from_waker(&waker); + loop { let ret; match self.state { @@ -440,15 +477,15 @@ where return Ok(len); } }, - ReadState::NotReady => match self.poll_stream() { - Ok(Async::Ready(StreamState::HasMore)) => continue, - Ok(Async::Ready(StreamState::Eof)) => return Ok(0), - Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()), - Err(e) => { - return Err(io::Error::new(io::ErrorKind::Other, e)); - }, + ReadState::NotReady => match self.poll_stream(&mut cx) { + Poll::Ready(StreamState::HasMore) => continue, + Poll::Ready(StreamState::Eof) => return Ok(0), + Poll::Pending => return Err(io::ErrorKind::WouldBlock.into()), }, ReadState::Eof => return Ok(0), + ReadState::Error(ref err) => { + return Err(io::Error::new(io::ErrorKind::Other, err.to_string())) + }, } self.state = ReadState::NotReady; return Ok(ret); @@ -458,26 +495,29 @@ where impl<S> ReadableChunks<S> where - S: Stream<Item = Chunk, Error = hyper::error::Error>, + S: Stream<Item = Result<Bytes, hyper::Error>> + std::marker::Unpin, { /// Poll the readiness of the inner reader. /// /// This function will update the internal state and return a simplified /// version of the `ReadState`. - fn poll_stream(&mut self) -> Poll<StreamState, hyper::error::Error> { - match self.stream.poll() { - Ok(Async::Ready(Some(chunk))) => { + fn poll_stream(&mut self, cx: &mut Context<'_>) -> Poll<StreamState> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { self.state = ReadState::Ready(chunk); - Ok(Async::Ready(StreamState::HasMore)) + Poll::Ready(StreamState::HasMore) }, - Ok(Async::Ready(None)) => { - self.state = ReadState::Eof; + Poll::Ready(Some(Err(err))) => { + self.state = ReadState::Error(err); - Ok(Async::Ready(StreamState::Eof)) + Poll::Ready(StreamState::Eof) + }, + Poll::Ready(None) => { + self.state = ReadState::Eof; + Poll::Ready(StreamState::Eof) }, - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => Err(e), + Poll::Pending => Poll::Pending, } } } diff --git a/components/net/fetch/cors_cache.rs b/components/net/fetch/cors_cache.rs index 0a809529c69..666a160a591 100644 --- a/components/net/fetch/cors_cache.rs +++ b/components/net/fetch/cors_cache.rs @@ -10,7 +10,7 @@ //! with CORSRequest being expanded into FetchRequest (etc) use http::header::HeaderName; -use hyper::Method; +use http::Method; use net_traits::request::{CredentialsMode, Origin, Request}; use servo_url::ServoUrl; use time::{self, Timespec}; diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index a7d82cac318..cf99b0d8adb 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -14,8 +14,8 @@ use crossbeam_channel::Sender; use devtools_traits::DevtoolsControlMsg; use headers::{AccessControlExposeHeaders, ContentType, HeaderMapExt, Range}; use http::header::{self, HeaderMap, HeaderName}; -use hyper::Method; -use hyper::StatusCode; +use http::Method; +use http::StatusCode; use ipc_channel::ipc::{self, IpcReceiver}; use mime::{self, Mime}; use net_traits::blob_url_store::{parse_blob_url, BlobURLStoreError}; @@ -40,7 +40,7 @@ use std::ops::Bound; use std::str; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; -use tokio2::sync::mpsc::{ +use tokio::sync::mpsc::{ unbounded_channel, UnboundedReceiver as TokioReceiver, UnboundedSender as TokioSender, }; @@ -522,7 +522,9 @@ async fn wait_for_response( Some(Data::Payload(vec)) => { target.process_response_chunk(vec); }, - Some(Data::Done) => break, + Some(Data::Done) => { + break; + }, Some(Data::Cancelled) => { response.aborted.store(true, Ordering::Release); break; diff --git a/components/net/filemanager_thread.rs b/components/net/filemanager_thread.rs index a1e8ff40116..bc217874981 100644 --- a/components/net/filemanager_thread.rs +++ b/components/net/filemanager_thread.rs @@ -27,7 +27,7 @@ use std::ops::Index; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock, Weak}; -use tokio2::sync::mpsc::UnboundedSender as TokioSender; +use tokio::sync::mpsc::UnboundedSender as TokioSender; use url::Url; use uuid::Uuid; diff --git a/components/net/http_cache.rs b/components/net/http_cache.rs index fe97ed13c8a..31f171c1c28 100644 --- a/components/net/http_cache.rs +++ b/components/net/http_cache.rs @@ -12,8 +12,7 @@ use headers::{ CacheControl, ContentRange, Expires, HeaderMapExt, LastModified, Pragma, Range, Vary, }; use http::header::HeaderValue; -use http::{header, HeaderMap}; -use hyper::{Method, StatusCode}; +use http::{header, HeaderMap, Method, StatusCode}; use malloc_size_of::Measurable; use malloc_size_of::{ MallocSizeOf, MallocSizeOfOps, MallocUnconditionalShallowSizeOf, MallocUnconditionalSizeOf, @@ -29,7 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::time::SystemTime; use time::{Duration, Timespec, Tm}; -use tokio2::sync::mpsc::{unbounded_channel as unbounded, UnboundedSender as TokioSender}; +use tokio::sync::mpsc::{unbounded_channel as unbounded, UnboundedSender as TokioSender}; /// The key used to differentiate requests in the cache. #[derive(Clone, Eq, Hash, MallocSizeOf, PartialEq)] diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index cb0d43c255c..595fb3c124d 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -12,12 +12,13 @@ use crate::hsts::HstsList; use crate::http_cache::{CacheKey, HttpCache}; use crate::resource_thread::AuthCache; use async_recursion::async_recursion; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use core::convert::Infallible; +use crossbeam_channel::Sender; use devtools_traits::{ ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest, }; use devtools_traits::{HttpResponse as DevtoolsHttpResponse, NetworkEvent}; -use futures_util::compat::*; +use futures::{future, StreamExt, TryFutureExt, TryStreamExt}; use headers::authorization::Basic; use headers::{AccessControlAllowCredentials, AccessControlAllowHeaders, HeaderMapExt}; use headers::{ @@ -28,12 +29,11 @@ use headers::{AccessControlAllowOrigin, AccessControlMaxAge}; use headers::{CacheControl, ContentEncoding, ContentLength}; use headers::{IfModifiedSince, LastModified, Origin as HyperOrigin, Pragma, Referer, UserAgent}; use http::header::{ - self, HeaderName, HeaderValue, ACCEPT, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LOCATION, - CONTENT_TYPE, + self, HeaderValue, ACCEPT, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LOCATION, CONTENT_TYPE, }; -use http::{HeaderMap, Request as HyperRequest}; -use hyper::header::TRANSFER_ENCODING; -use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode}; +use http::{HeaderMap, Method, Request as HyperRequest, StatusCode}; +use hyper::header::{HeaderName, TRANSFER_ENCODING}; +use hyper::{Body, Client, Response as HyperResponse}; use hyper_serde::Serde; use ipc_channel::ipc::{self, IpcSender}; use ipc_channel::router::ROUTER; @@ -65,14 +65,15 @@ use std::ops::Deref; use std::sync::{Arc as StdArc, Condvar, Mutex, RwLock}; use std::time::{Duration, SystemTime}; use time::{self, Tm}; -use tokio::prelude::{future, Future, Sink, Stream}; -use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender}; -use tokio2::sync::mpsc::{unbounded_channel, UnboundedSender as Tokio02Sender}; -use tokio_compat::runtime::{Builder, Runtime}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc::{ + channel, unbounded_channel, Receiver as TokioReceiver, Sender as TokioSender, + UnboundedReceiver, UnboundedSender, +}; +use tokio_stream::wrappers::ReceiverStream; lazy_static! { - pub static ref HANDLE: Mutex<Option<Runtime>> = - Mutex::new(Some(Builder::new().build().unwrap())); + pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap())); } /// The various states an entry of the HttpCache can be in. @@ -110,10 +111,7 @@ impl HttpState { history_states: RwLock::new(HashMap::new()), http_cache: RwLock::new(HttpCache::new()), http_cache_state: Mutex::new(HashMap::new()), - client: create_http_client( - tls_config, - HANDLE.lock().unwrap().as_ref().unwrap().executor(), - ), + client: create_http_client(tls_config), extra_certs: ExtraCerts::new(), connection_certs: ConnectionCerts::new(), } @@ -440,7 +438,7 @@ enum BodyStream { Chunked(TokioReceiver<Vec<u8>>), /// A body whose bytes are buffered /// and sent in one chunk over the network. - Buffered(Receiver<BodyChunk>), + Buffered(UnboundedReceiver<BodyChunk>), } /// The sink side of the body passed to hyper, @@ -451,7 +449,7 @@ enum BodySink { /// A Crossbeam sender used to send chunks to the fetch worker, /// where they will be buffered /// in order to ensure they are not streamed them over the network. - Buffered(Sender<BodyChunk>), + Buffered(UnboundedSender<BodyChunk>), } impl BodySink { @@ -459,12 +457,9 @@ impl BodySink { match self { BodySink::Chunked(ref sender) => { let sender = sender.clone(); - HANDLE - .lock() - .unwrap() - .as_mut() - .unwrap() - .spawn(sender.send(bytes).map(|_| ()).map_err(|_| ())); + HANDLE.lock().unwrap().as_mut().unwrap().spawn(async move { + let _ = sender.send(bytes).await; + }); }, BodySink::Buffered(ref sender) => { let _ = sender.send(BodyChunk::Chunk(bytes)); @@ -474,20 +469,7 @@ impl BodySink { pub fn close(&self) { match self { - BodySink::Chunked(ref sender) => { - let mut sender = sender.clone(); - HANDLE - .lock() - .unwrap() - .as_mut() - .unwrap() - .spawn(future::lazy(move || { - if sender.close().is_err() { - warn!("Failed to close network request sink."); - } - Ok(()) - })); - }, + BodySink::Chunked(_) => { /* no need to close sender */ }, BodySink::Buffered(ref sender) => { let _ = sender.send(BodyChunk::Done); }, @@ -506,7 +488,7 @@ async fn obtain_response( request_id: Option<&str>, is_xhr: bool, context: &FetchContext, - fetch_terminated: Tokio02Sender<bool>, + fetch_terminated: UnboundedSender<bool>, ) -> Result<(HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>), NetworkError> { { let mut headers = request_headers.clone(); @@ -537,7 +519,7 @@ async fn obtain_response( // However since this doesn't appear documented, and we're using an ancient version, // for now we buffer manually to ensure we don't stream requests // to servers that might not know how to handle them. - let (sender, receiver) = unbounded(); + let (sender, receiver) = unbounded_channel(); (BodySink::Buffered(sender), BodyStream::Buffered(receiver)) }; @@ -557,6 +539,7 @@ async fn obtain_response( ROUTER.add_route( body_port.to_opaque(), Box::new(move |message| { + info!("Received message"); let bytes: Vec<u8> = match message.to().unwrap() { BodyChunkResponse::Chunk(bytes) => bytes, BodyChunkResponse::Done => { @@ -593,23 +576,25 @@ async fn obtain_response( ); let body = match stream { - BodyStream::Chunked(receiver) => Body::wrap_stream(receiver), - BodyStream::Buffered(receiver) => { + BodyStream::Chunked(receiver) => { + let stream = ReceiverStream::new(receiver); + Body::wrap_stream(stream.map(Ok::<_, Infallible>)) + }, + BodyStream::Buffered(mut receiver) => { // Accumulate bytes received over IPC into a vector. let mut body = vec![]; loop { - match receiver.recv() { - Ok(BodyChunk::Chunk(mut bytes)) => { + match receiver.recv().await { + Some(BodyChunk::Chunk(mut bytes)) => { body.append(&mut bytes); }, - Ok(BodyChunk::Done) => break, - Err(_) => warn!("Failed to read all chunks from request body."), + Some(BodyChunk::Done) => break, + None => warn!("Failed to read all chunks from request body."), } } body.into() }, }; - HyperRequest::builder() .method(method) .uri(encoded_url) @@ -709,12 +694,11 @@ async fn obtain_response( debug!("Not notifying devtools (no request_id)"); None }; - Ok((Decoder::detect(res), msg)) + future::ready(Ok((Decoder::detect(res), msg))) }) .map_err(move |e| { NetworkError::from_hyper_error(&e, connection_certs_clone.remove(host_clone)) }) - .compat() // convert from Future01 to Future03 .await } } @@ -1850,7 +1834,7 @@ async fn http_network_fetch( send_response_to_devtools( &sender, request_id.unwrap(), - meta_headers.map(Serde::into_inner), + meta_headers.map(|hdrs| Serde::into_inner(hdrs)), meta_status, pipeline_id, ); @@ -1866,19 +1850,22 @@ async fn http_network_fetch( HANDLE.lock().unwrap().as_ref().unwrap().spawn( res.into_body() - .map_err(|_| ()) - .fold(res_body, move |res_body, chunk| { + .map_err(|e| { + warn!("Error streaming response body: {:?}", e); + () + }) + .try_fold(res_body, move |res_body, chunk| { if cancellation_listener.lock().unwrap().cancelled() { *res_body.lock().unwrap() = ResponseBody::Done(vec![]); let _ = done_sender.send(Data::Cancelled); - return tokio::prelude::future::failed(()); + return future::ready(Err(())); } if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() { - let bytes = chunk.into_bytes(); + let bytes = chunk; body.extend_from_slice(&*bytes); let _ = done_sender.send(Data::Payload(bytes.to_vec())); } - tokio::prelude::future::ok(res_body) + future::ready(Ok(res_body)) }) .and_then(move |res_body| { debug!("successfully finished response for {:?}", url1); @@ -1893,10 +1880,10 @@ async fn http_network_fetch( .unwrap() .set_attribute(ResourceAttribute::ResponseEnd); let _ = done_sender2.send(Data::Done); - tokio::prelude::future::ok(()) + future::ready(Ok(())) }) .map_err(move |_| { - warn!("finished response for {:?} with error", url2); + debug!("finished response for {:?}", url2); let mut body = res_body2.lock().unwrap(); let completed_body = match *body { ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]), diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs index e59aca10fcf..4b858eaa48f 100644 --- a/components/net/resource_thread.rs +++ b/components/net/resource_thread.rs @@ -16,7 +16,7 @@ use crate::hsts::HstsList; use crate::http_cache::HttpCache; use crate::http_loader::{http_redirect_fetch, HttpState, HANDLE}; use crate::storage_thread::StorageThreadFactory; -use crate::websocket_loader::{self, HANDLE as WS_HANDLE}; +use crate::websocket_loader; use crossbeam_channel::Sender; use devtools_traits::DevtoolsControlMsg; use embedder_traits::resources::{self, Resource}; @@ -155,15 +155,12 @@ fn create_http_states( history_states: RwLock::new(HashMap::new()), http_cache: RwLock::new(http_cache), http_cache_state: Mutex::new(HashMap::new()), - client: create_http_client( - create_tls_config( - &certs, - ALPN_H2_H1, - extra_certs.clone(), - connection_certs.clone(), - ), - HANDLE.lock().unwrap().as_ref().unwrap().executor(), - ), + client: create_http_client(create_tls_config( + &certs, + ALPN_H2_H1, + extra_certs.clone(), + connection_certs.clone(), + )), extra_certs, connection_certs, }; @@ -178,15 +175,12 @@ fn create_http_states( history_states: RwLock::new(HashMap::new()), http_cache: RwLock::new(HttpCache::new()), http_cache_state: Mutex::new(HashMap::new()), - client: create_http_client( - create_tls_config( - &certs, - ALPN_H2_H1, - extra_certs.clone(), - connection_certs.clone(), - ), - HANDLE.lock().unwrap().as_ref().unwrap().executor(), - ), + client: create_http_client(create_tls_config( + &certs, + ALPN_H2_H1, + extra_certs.clone(), + connection_certs.clone(), + )), extra_certs, connection_certs, }; @@ -616,12 +610,6 @@ impl CoreResourceManager { // or a short timeout has been reached. self.thread_pool.exit(); - // Shut-down the async runtime used by fetch workers. - drop(HANDLE.lock().unwrap().take()); - - // Shut-down the async runtime used by websocket workers. - drop(WS_HANDLE.lock().unwrap().take()); - debug!("Exited CoreResourceManager"); } @@ -680,58 +668,49 @@ impl CoreResourceManager { _ => (FileTokenCheck::NotRequired, None), }; - HANDLE - .lock() - .unwrap() - .as_ref() - .unwrap() - .spawn_std(async move { - // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed) - // todo load context / mimesniff in fetch - // todo referrer policy? - // todo service worker stuff - let context = FetchContext { - state: http_state, - user_agent: ua, - devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))), - filemanager: Arc::new(Mutex::new(filemanager)), - file_token, - cancellation_listener: Arc::new(Mutex::new(CancellationListener::new( - cancel_chan, - ))), - timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( - request.timing_type(), - ))), - }; - - match res_init_ { - Some(res_init) => { - let response = Response::from_init(res_init, timing_type); - http_redirect_fetch( - &mut request, - &mut CorsCache::new(), - response, - true, - &mut sender, - &mut None, - &context, - ) - .await; - }, - None => { - fetch(&mut request, &mut sender, &context).await; - }, - }; - - // Remove token after fetch. - if let Some(id) = blob_url_file_id.as_ref() { - context - .filemanager - .lock() - .unwrap() - .invalidate_token(&context.file_token, id); - } - }); + HANDLE.lock().unwrap().as_ref().unwrap().spawn(async move { + // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed) + // todo load context / mimesniff in fetch + // todo referrer policy? + // todo service worker stuff + let context = FetchContext { + state: http_state, + user_agent: ua, + devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))), + filemanager: Arc::new(Mutex::new(filemanager)), + file_token, + cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(cancel_chan))), + timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(request.timing_type()))), + }; + + match res_init_ { + Some(res_init) => { + let response = Response::from_init(res_init, timing_type); + http_redirect_fetch( + &mut request, + &mut CorsCache::new(), + response, + true, + &mut sender, + &mut None, + &context, + ) + .await; + }, + None => { + fetch(&mut request, &mut sender, &context).await; + }, + }; + + // Remove token after fetch. + if let Some(id) = blob_url_file_id.as_ref() { + context + .filemanager + .lock() + .unwrap() + .invalidate_token(&context.file_token, id); + } + }); } fn websocket_connect( diff --git a/components/net/tests/fetch.rs b/components/net/tests/fetch.rs index 9416a81e8de..4c0149b8f1a 100644 --- a/components/net/tests/fetch.rs +++ b/components/net/tests/fetch.rs @@ -20,7 +20,7 @@ use headers::{AccessControlAllowMethods, AccessControlMaxAge, HeaderMapExt}; use headers::{CacheControl, ContentLength, ContentType, Expires, LastModified, Pragma, UserAgent}; use http::header::{self, HeaderMap, HeaderName, HeaderValue}; use http::{Method, StatusCode}; -use hyper::body::Body; +use hyper::Body; use hyper::{Request as HyperRequest, Response as HyperResponse}; use mime::{self, Mime}; use msg::constellation_msg::TEST_PIPELINE_ID; diff --git a/components/net/tests/http_cache.rs b/components/net/tests/http_cache.rs index 8dd09b650da..30a4052928a 100644 --- a/components/net/tests/http_cache.rs +++ b/components/net/tests/http_cache.rs @@ -10,7 +10,7 @@ use net_traits::request::{Origin, Referrer, Request}; use net_traits::response::{HttpsState, Response, ResponseBody}; use net_traits::{ResourceFetchTiming, ResourceTimingType}; use servo_url::ServoUrl; -use tokio2::sync::mpsc::unbounded_channel as unbounded; +use tokio::sync::mpsc::unbounded_channel as unbounded; #[test] fn test_refreshing_resource_sets_done_chan_the_appropriate_value() { diff --git a/components/net/tests/http_loader.rs b/components/net/tests/http_loader.rs index e9de2cb983f..e84b0ec3fc5 100644 --- a/components/net/tests/http_loader.rs +++ b/components/net/tests/http_loader.rs @@ -15,7 +15,6 @@ use devtools_traits::HttpResponse as DevtoolsHttpResponse; use devtools_traits::{ChromeToDevtoolsControlMsg, DevtoolsControlMsg, NetworkEvent}; use flate2::write::{DeflateEncoder, GzEncoder}; use flate2::Compression; -use futures::{self, Future, Stream}; use headers::authorization::Basic; use headers::{ Authorization, ContentLength, Date, HeaderMapExt, Host, StrictTransportSecurity, UserAgent, @@ -23,7 +22,7 @@ use headers::{ use http::header::{self, HeaderMap, HeaderValue}; use http::uri::Authority; use http::{Method, StatusCode}; -use hyper::body::Body; +use hyper::Body; use hyper::{Request as HyperRequest, Response as HyperResponse}; use ipc_channel::ipc; use ipc_channel::router::ROUTER; @@ -51,13 +50,6 @@ fn mock_origin() -> ImmutableOrigin { ServoUrl::parse("http://servo.org").unwrap().origin() } -fn read_response(req: HyperRequest<Body>) -> impl Future<Item = String, Error = ()> { - req.into_body() - .concat2() - .and_then(|body| futures::future::ok(str::from_utf8(&body).unwrap().to_owned())) - .map_err(|_| ()) -} - fn assert_cookie_for_domain( cookie_jar: &RwLock<CookieStorage>, domain: &str, @@ -521,28 +513,18 @@ fn test_load_should_decode_the_response_as_gzip_when_response_headers_have_conte #[test] fn test_load_doesnt_send_request_body_on_any_redirect() { + use hyper::body::HttpBody; + let post_handler = move |request: HyperRequest<Body>, response: &mut HyperResponse<Body>| { assert_eq!(request.method(), Method::GET); - read_response(request) - .and_then(|data| { - assert_eq!(data, ""); - futures::future::ok(()) - }) - .poll() - .unwrap(); + assert_eq!(request.size_hint().exact(), Some(0)); *response.body_mut() = b"Yay!".to_vec().into(); }; let (post_server, post_url) = make_server(post_handler); let post_redirect_url = post_url.clone(); let pre_handler = move |request: HyperRequest<Body>, response: &mut HyperResponse<Body>| { - read_response(request) - .and_then(|data| { - assert_eq!(data, "Body on POST"); - futures::future::ok(()) - }) - .poll() - .unwrap(); + assert_eq!(request.size_hint().exact(), Some(13)); response.headers_mut().insert( header::LOCATION, HeaderValue::from_str(&post_redirect_url.to_string()).unwrap(), diff --git a/components/net/tests/main.rs b/components/net/tests/main.rs index f814980f4aa..46f3eb1bb05 100644 --- a/components/net/tests/main.rs +++ b/components/net/tests/main.rs @@ -21,14 +21,17 @@ mod mime_classifier; mod resource_thread; mod subresource_integrity; +use core::convert::Infallible; +use core::pin::Pin; use crossbeam_channel::{unbounded, Sender}; use devtools_traits::DevtoolsControlMsg; use embedder_traits::resources::{self, Resource}; use embedder_traits::{EmbedderProxy, EventLoopWaker}; -use futures::{Future, Stream}; +use futures::future::ready; +use futures::StreamExt; use hyper::server::conn::Http; use hyper::server::Server as HyperServer; -use hyper::service::service_fn_ok; +use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request as HyperRequest, Response as HyperResponse}; use net::connector::{create_tls_config, ConnectionCerts, ExtraCerts, ALPN_H2_H1}; use net::fetch::cors_cache::CorsCache; @@ -40,20 +43,27 @@ use net_traits::filemanager_thread::FileTokenCheck; use net_traits::request::Request; use net_traits::response::Response; use net_traits::{FetchTaskTarget, ResourceFetchTiming, ResourceTimingType}; -use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; +use openssl::ssl::{Ssl, SslAcceptor, SslFiletype, SslMethod}; use servo_arc::Arc as ServoArc; use servo_url::ServoUrl; use std::net::TcpListener as StdTcpListener; use std::path::PathBuf; use std::sync::{Arc, Mutex, Weak}; use tokio::net::TcpListener; -use tokio::reactor::Handle; -use tokio::runtime::Runtime; -use tokio_openssl::SslAcceptorExt; +use tokio::net::TcpStream; +use tokio::runtime::{Builder, Runtime}; +use tokio_openssl::SslStream; +use tokio_stream::wrappers::TcpListenerStream; use tokio_test::block_on; lazy_static! { - pub static ref HANDLE: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap()); + pub static ref HANDLE: Mutex<Runtime> = Mutex::new( + Builder::new_multi_thread() + .enable_io() + .worker_threads(10) + .build() + .unwrap() + ); } const DEFAULT_USER_AGENT: &'static str = "Such Browser. Very Layout. Wow."; @@ -134,33 +144,34 @@ fn fetch(request: &mut Request, dc: Option<Sender<DevtoolsControlMsg>>) -> Respo fn fetch_with_context(request: &mut Request, mut context: &mut FetchContext) -> Response { let (sender, receiver) = unbounded(); let mut target = FetchResponseCollector { sender: sender }; - - block_on(methods::fetch(request, &mut target, &mut context)); - - receiver.recv().unwrap() + block_on(async move { + methods::fetch(request, &mut target, &mut context).await; + receiver.recv().unwrap() + }) } fn fetch_with_cors_cache(request: &mut Request, cache: &mut CorsCache) -> Response { let (sender, receiver) = unbounded(); let mut target = FetchResponseCollector { sender: sender }; - - block_on(methods::fetch_with_cors_cache( - request, - cache, - &mut target, - &mut new_fetch_context(None, None, None), - )); - - receiver.recv().unwrap() + block_on(async move { + methods::fetch_with_cors_cache( + request, + cache, + &mut target, + &mut new_fetch_context(None, None, None), + ) + .await; + receiver.recv().unwrap() + }) } pub(crate) struct Server { - pub close_channel: futures::sync::oneshot::Sender<()>, + pub close_channel: tokio::sync::oneshot::Sender<()>, } impl Server { fn close(self) { - self.close_channel.send(()).unwrap(); + self.close_channel.send(()).expect("err closing server:"); } } @@ -172,19 +183,26 @@ where let listener = StdTcpListener::bind("0.0.0.0:0").unwrap(); let url_string = format!("http://localhost:{}", listener.local_addr().unwrap().port()); let url = ServoUrl::parse(&url_string).unwrap(); - let (tx, rx) = futures::sync::oneshot::channel::<()>(); - let server = HyperServer::from_tcp(listener) - .unwrap() - .serve(move || { - let handler = handler.clone(); - service_fn_ok(move |req: HyperRequest<Body>| { - let mut response = HyperResponse::new(Vec::<u8>::new().into()); - handler(req, &mut response); - response + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let server = async move { + HyperServer::from_tcp(listener) + .unwrap() + .serve(make_service_fn(move |_| { + let handler = handler.clone(); + ready(Ok::<_, Infallible>(service_fn( + move |req: HyperRequest<Body>| { + let mut response = HyperResponse::new(Vec::<u8>::new().into()); + handler(req, &mut response); + ready(Ok::<_, Infallible>(response)) + }, + ))) + })) + .with_graceful_shutdown(async move { + rx.await.ok(); }) - }) - .with_graceful_shutdown(rx) - .map_err(|_| ()); + .await + .expect("Could not start server"); + }; HANDLE.lock().unwrap().spawn(server); let server = Server { close_channel: tx }; @@ -197,43 +215,64 @@ where { let handler = Arc::new(handler); let listener = StdTcpListener::bind("[::0]:0").unwrap(); - let listener = TcpListener::from_std(listener, &Handle::default()).unwrap(); + let listener = HANDLE + .lock() + .unwrap() + .block_on(async move { TcpListener::from_std(listener).unwrap() }); + let url_string = format!("http://localhost:{}", listener.local_addr().unwrap().port()); + let mut listener = TcpListenerStream::new(listener); + let url = ServoUrl::parse(&url_string).unwrap(); + let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let server = listener.incoming().map_err(|_| ()).for_each(move |sock| { - let mut tls_server_config = SslAcceptor::mozilla_intermediate_v5(SslMethod::tls()).unwrap(); - tls_server_config - .set_certificate_file(&cert_path, SslFiletype::PEM) - .unwrap(); - tls_server_config - .set_private_key_file(&key_path, SslFiletype::PEM) - .unwrap(); - - let handler = handler.clone(); - tls_server_config - .build() - .accept_async(sock) - .map_err(|_| ()) - .and_then(move |ssl| { - Http::new() - .serve_connection( - ssl, - service_fn_ok(move |req: HyperRequest<Body>| { - let mut response = HyperResponse::new(Vec::<u8>::new().into()); - handler(req, &mut response); - response - }), - ) - .map_err(|_| ()) - }) - }); + let server = async move { + loop { + let stream = tokio::select! { + stream = listener.next() => stream, + _ = &mut rx => break + }; + + let stream = match stream { + Some(stream) => stream.expect("Could not accept stream: "), + _ => break, + }; + + let stream = stream.into_std().unwrap(); + stream + .set_read_timeout(Some(std::time::Duration::new(5, 0))) + .unwrap(); + let stream = TcpStream::from_std(stream).unwrap(); + + let mut tls_server_config = + SslAcceptor::mozilla_intermediate_v5(SslMethod::tls()).unwrap(); + tls_server_config + .set_certificate_file(&cert_path, SslFiletype::PEM) + .unwrap(); + tls_server_config + .set_private_key_file(&key_path, SslFiletype::PEM) + .unwrap(); + + let tls_server_config = tls_server_config.build(); + let ssl = Ssl::new(tls_server_config.context()).unwrap(); + let mut stream = SslStream::new(ssl, stream).unwrap(); - let (tx, rx) = futures::sync::oneshot::channel::<()>(); - let server = server - .select(rx.map_err(|_| ())) - .map(|_| ()) - .map_err(|_| ()); + let _ = Pin::new(&mut stream).accept().await; + + let handler = handler.clone(); + + let _ = Http::new() + .serve_connection( + stream, + service_fn(move |req: HyperRequest<Body>| { + let mut response = HyperResponse::new(Body::empty()); + handler(req, &mut response); + ready(Ok::<_, Infallible>(response)) + }), + ) + .await; + } + }; HANDLE.lock().unwrap().spawn(server); diff --git a/components/net/tests/mime_classifier.rs b/components/net/tests/mime_classifier.rs index 3317b355d71..0fac4a027e2 100644 --- a/components/net/tests/mime_classifier.rs +++ b/components/net/tests/mime_classifier.rs @@ -43,7 +43,7 @@ fn test_sniff_mp4_matcher_long() { let matcher = Mp4Matcher; let mut data: [u8; 260] = [0; 260]; - &data[..11].clone_from_slice(&[ + let _ = &data[..11].clone_from_slice(&[ 0x00, 0x00, 0x01, 0x04, 0x66, 0x74, 0x79, 0x70, 0x6D, 0x70, 0x34, ]); diff --git a/components/net/websocket_loader.rs b/components/net/websocket_loader.rs index 2ad8c00a436..a73b48e1f41 100644 --- a/components/net/websocket_loader.rs +++ b/components/net/websocket_loader.rs @@ -19,9 +19,9 @@ use crate::http_loader::HttpState; use async_tungstenite::tokio::{client_async_tls_with_connector_and_config, ConnectStream}; use async_tungstenite::WebSocketStream; use embedder_traits::resources::{self, Resource}; -use futures03::future::TryFutureExt; -use futures03::sink::SinkExt; -use futures03::stream::StreamExt; +use futures::future::TryFutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; use http::header::{HeaderMap, HeaderName, HeaderValue}; use ipc_channel::ipc::{IpcReceiver, IpcSender}; use ipc_channel::router::ROUTER; diff --git a/components/net_traits/Cargo.toml b/components/net_traits/Cargo.toml index 0e663b91bfd..76fb2113fb7 100644 --- a/components/net_traits/Cargo.toml +++ b/components/net_traits/Cargo.toml @@ -16,10 +16,10 @@ doctest = false content-security-policy = { version = "0.4.0", features = ["serde"] } cookie = "0.11" embedder_traits = { path = "../embedder_traits" } -headers = "0.2" -http = "0.1" -hyper = "0.12" -hyper_serde = "0.11" +headers = "0.3" +http = "0.2" +hyper = "0.14" +hyper_serde = "0.12" ipc-channel = "0.14" lazy_static = "1" log = "0.4" diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs index de7871e8348..ac6ef9e3ad3 100644 --- a/components/net_traits/lib.rs +++ b/components/net_traits/lib.rs @@ -21,9 +21,9 @@ use crate::response::{HttpsState, Response, ResponseInit}; use crate::storage_thread::StorageThreadMsg; use cookie::Cookie; use headers::{ContentType, HeaderMapExt, ReferrerPolicy as ReferrerPolicyHeader}; +use http::StatusCode; use http::{Error as HttpError, HeaderMap}; use hyper::Error as HyperError; -use hyper::StatusCode; use hyper_serde::Serde; use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; use ipc_channel::router::ROUTER; @@ -759,7 +759,7 @@ pub enum NetworkError { impl NetworkError { pub fn from_hyper_error(error: &HyperError, cert_bytes: Option<Vec<u8>>) -> Self { let s = error.to_string(); - if s.contains("the handshake failed") { + if s.to_lowercase().contains("ssl") { NetworkError::SslValidation(s, cert_bytes.unwrap_or_default()) } else { NetworkError::Internal(s) diff --git a/components/net_traits/request.rs b/components/net_traits/request.rs index cfdd8edf399..443f4c57dab 100644 --- a/components/net_traits/request.rs +++ b/components/net_traits/request.rs @@ -8,7 +8,7 @@ use crate::ResourceTimingType; use content_security_policy::{self as csp, CspList}; use http::header::{HeaderName, AUTHORIZATION}; use http::HeaderMap; -use hyper::Method; +use http::Method; use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; use mime::Mime; use msg::constellation_msg::PipelineId; diff --git a/components/script/Cargo.toml b/components/script/Cargo.toml index 1c9464db740..539dffc906e 100644 --- a/components/script/Cargo.toml +++ b/components/script/Cargo.toml @@ -54,11 +54,10 @@ enum-iterator = "0.3" euclid = "0.20" fnv = "1.0" fxhash = "0.2" -headers = "0.2" +headers = "0.3" html5ever = "0.25" -http = "0.1" -hyper = "0.12" -hyper_serde = "0.11" +http = "0.2" +hyper_serde = "0.12" image = "0.23" indexmap = { version = "1.0.2", features = ["std"] } ipc-channel = "0.14" @@ -113,7 +112,7 @@ unicode-segmentation = "1.1.0" url = "2.0" utf-8 = "0.7" uuid = { version = "0.8", features = ["v4", "serde"] } -webdriver = "0.40" +webdriver = "0.44" webgpu = { path = "../webgpu" } webrender_api = { git = "https://github.com/servo/webrender" } webxr-api = { git = "https://github.com/servo/webxr", features = ["ipc"] } diff --git a/components/script/dom/bindings/trace.rs b/components/script/dom/bindings/trace.rs index 02b72fc3c2d..efba7c718d1 100644 --- a/components/script/dom/bindings/trace.rs +++ b/components/script/dom/bindings/trace.rs @@ -73,8 +73,8 @@ use euclid::Length as EuclidLength; use html5ever::buffer_queue::BufferQueue; use html5ever::{LocalName, Namespace, Prefix, QualName}; use http::header::HeaderMap; -use hyper::Method; -use hyper::StatusCode; +use http::Method; +use http::StatusCode; use indexmap::IndexMap; use ipc_channel::ipc::{IpcReceiver, IpcSender}; use js::glue::{CallObjectTracer, CallScriptTracer, CallStringTracer, CallValueTracer}; diff --git a/components/script/dom/htmlformelement.rs b/components/script/dom/htmlformelement.rs index d06fd85a426..80adec9718a 100644 --- a/components/script/dom/htmlformelement.rs +++ b/components/script/dom/htmlformelement.rs @@ -61,7 +61,7 @@ use dom_struct::dom_struct; use encoding_rs::{Encoding, UTF_8}; use headers::{ContentType, HeaderMapExt}; use html5ever::{LocalName, Prefix}; -use hyper::Method; +use http::Method; use mime::{self, Mime}; use net_traits::http_percent_encode; use net_traits::request::Referrer; diff --git a/components/script/dom/response.rs b/components/script/dom/response.rs index 85d20a62d06..ef183340a97 100644 --- a/components/script/dom/response.rs +++ b/components/script/dom/response.rs @@ -24,7 +24,7 @@ use crate::script_runtime::JSContext as SafeJSContext; use crate::script_runtime::StreamConsumer; use dom_struct::dom_struct; use http::header::HeaderMap as HyperHeaders; -use hyper::StatusCode; +use http::StatusCode; use hyper_serde::Serde; use js::jsapi::JSObject; use servo_url::ServoUrl; diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index b6adac383b7..1c6db3a5953 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -45,7 +45,7 @@ use headers::{ContentLength, ContentType, HeaderMapExt}; use html5ever::serialize; use html5ever::serialize::SerializeOpts; use http::header::{self, HeaderMap, HeaderName, HeaderValue}; -use hyper::Method; +use http::Method; use hyper_serde::Serde; use ipc_channel::ipc; use ipc_channel::router::ROUTER; diff --git a/components/script_traits/Cargo.toml b/components/script_traits/Cargo.toml index 14a709cea20..266d80d3317 100644 --- a/components/script_traits/Cargo.toml +++ b/components/script_traits/Cargo.toml @@ -20,9 +20,9 @@ devtools_traits = { path = "../devtools_traits" } embedder_traits = { path = "../embedder_traits" } euclid = "0.20" gfx_traits = { path = "../gfx_traits" } -http = "0.1" -hyper = "0.12" -hyper_serde = "0.11" +headers = "0.3" +http = "0.2" +hyper_serde = "0.12" ipc-channel = "0.14" keyboard-types = "0.5" libc = "0.2" @@ -41,7 +41,7 @@ smallvec = "0.6" style_traits = { path = "../style_traits", features = ["servo"] } time = "0.1.41" uuid = { version = "0.8", features = ["v4"] } -webdriver = "0.40" +webdriver = "0.44" webgpu = { path = "../webgpu" } webrender_api = { git = "https://github.com/servo/webrender" } webxr-api = { git = "https://github.com/servo/webxr", features = ["ipc"] } diff --git a/components/script_traits/lib.rs b/components/script_traits/lib.rs index be926307bd9..4d8ba59c0ea 100644 --- a/components/script_traits/lib.rs +++ b/components/script_traits/lib.rs @@ -34,7 +34,7 @@ use embedder_traits::EventLoopWaker; use euclid::{default::Point2D, Length, Rect, Scale, Size2D, UnknownUnit, Vector2D}; use gfx_traits::Epoch; use http::HeaderMap; -use hyper::Method; +use http::Method; use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; use ipc_channel::Error as IpcError; use keyboard_types::webdriver::Event as WebDriverInputEvent; diff --git a/components/webdriver_server/Cargo.toml b/components/webdriver_server/Cargo.toml index b568d1cfb09..d5b7957afe5 100644 --- a/components/webdriver_server/Cargo.toml +++ b/components/webdriver_server/Cargo.toml @@ -16,7 +16,8 @@ compositing = { path = "../compositing" } cookie = "0.11" crossbeam-channel = "0.4" euclid = "0.20" -hyper = "0.12" +headers = "0.3" +http = "0.2" image = "0.23" ipc-channel = "0.14" keyboard-types = "0.5" @@ -31,4 +32,4 @@ servo_config = { path = "../config" } servo_url = { path = "../url" } style_traits = { path = "../style_traits" } uuid = { version = "0.8", features = ["v4"] } -webdriver = "0.40" +webdriver = "0.44" diff --git a/components/webdriver_server/capabilities.rs b/components/webdriver_server/capabilities.rs index 5e9b877c84c..50b0be26acb 100644 --- a/components/webdriver_server/capabilities.rs +++ b/components/webdriver_server/capabilities.rs @@ -4,7 +4,7 @@ use serde_json::{Map, Value}; use webdriver::capabilities::{BrowserCapabilities, Capabilities}; -use webdriver::error::WebDriverResult; +use webdriver::error::{WebDriverError, WebDriverResult}; pub struct ServoCapabilities { pub browser_name: String, @@ -71,9 +71,16 @@ impl BrowserCapabilities for ServoCapabilities { Ok(self.accept_custom) } - fn validate_custom(&self, _: &str, _: &Value) -> WebDriverResult<()> { + fn validate_custom(&mut self, _: &str, _: &Value) -> WebDriverResult<()> { Ok(()) } + + fn web_socket_url( + &mut self, + _: &serde_json::Map<std::string::String, Value>, + ) -> Result<bool, WebDriverError> { + todo!() + } } fn get_platform_name() -> Option<String> { diff --git a/components/webdriver_server/lib.rs b/components/webdriver_server/lib.rs index cd54d1a8c8f..e4be9215a84 100644 --- a/components/webdriver_server/lib.rs +++ b/components/webdriver_server/lib.rs @@ -22,7 +22,7 @@ use capabilities::ServoCapabilities; use compositing::ConstellationMsg; use crossbeam_channel::{after, unbounded, Receiver, Sender}; use euclid::{Rect, Size2D}; -use hyper::Method; +use http::method::Method; use image::{DynamicImage, ImageFormat, RgbImage}; use ipc_channel::ipc::{self, IpcSender}; use ipc_channel::router::ROUTER; @@ -70,7 +70,7 @@ use webdriver::httpapi::WebDriverExtensionRoute; use webdriver::response::{CookieResponse, CookiesResponse}; use webdriver::response::{ElementRectResponse, NewSessionResponse, ValueResponse}; use webdriver::response::{TimeoutsResponse, WebDriverResponse, WindowRectResponse}; -use webdriver::server::{self, Session, WebDriverHandler}; +use webdriver::server::{self, Session, SessionTeardownKind, WebDriverHandler}; fn extension_routes() -> Vec<(Method, &'static str, ServoExtensionRoute)> { return vec![ @@ -103,6 +103,7 @@ fn cookie_msg_to_cookie(cookie: cookie::Cookie) -> Cookie { .map(|time| Date(time.to_timespec().sec as u64)), secure: cookie.secure().unwrap_or(false), http_only: cookie.http_only().unwrap_or(false), + same_site: cookie.same_site().map(|s| s.to_string()), } } @@ -112,7 +113,12 @@ pub fn start_server(port: u16, constellation_chan: Sender<ConstellationMsg>) { .name("WebDriverHttpServer".to_owned()) .spawn(move || { let address = SocketAddrV4::new("0.0.0.0".parse().unwrap(), port); - match server::start(SocketAddr::V4(address), handler, extension_routes()) { + match server::start( + "localhost".to_owned(), + SocketAddr::V4(address), + handler, + extension_routes(), + ) { Ok(listening) => info!("WebDriver server listening on {}", listening.socket), Err(_) => panic!("Unable to start WebDriver HTTPD server"), } @@ -1780,7 +1786,7 @@ impl WebDriverHandler<ServoExtensionRoute> for Handler { } } - fn delete_session(&mut self, _session: &Option<Session>) { + fn teardown_session(&mut self, _session: SessionTeardownKind) { self.session = None; } } |