diff options
Diffstat (limited to 'components/net/tests/main.rs')
-rw-r--r-- | components/net/tests/main.rs | 173 |
1 files changed, 106 insertions, 67 deletions
diff --git a/components/net/tests/main.rs b/components/net/tests/main.rs index f814980f4aa..46f3eb1bb05 100644 --- a/components/net/tests/main.rs +++ b/components/net/tests/main.rs @@ -21,14 +21,17 @@ mod mime_classifier; mod resource_thread; mod subresource_integrity; +use core::convert::Infallible; +use core::pin::Pin; use crossbeam_channel::{unbounded, Sender}; use devtools_traits::DevtoolsControlMsg; use embedder_traits::resources::{self, Resource}; use embedder_traits::{EmbedderProxy, EventLoopWaker}; -use futures::{Future, Stream}; +use futures::future::ready; +use futures::StreamExt; use hyper::server::conn::Http; use hyper::server::Server as HyperServer; -use hyper::service::service_fn_ok; +use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request as HyperRequest, Response as HyperResponse}; use net::connector::{create_tls_config, ConnectionCerts, ExtraCerts, ALPN_H2_H1}; use net::fetch::cors_cache::CorsCache; @@ -40,20 +43,27 @@ use net_traits::filemanager_thread::FileTokenCheck; use net_traits::request::Request; use net_traits::response::Response; use net_traits::{FetchTaskTarget, ResourceFetchTiming, ResourceTimingType}; -use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; +use openssl::ssl::{Ssl, SslAcceptor, SslFiletype, SslMethod}; use servo_arc::Arc as ServoArc; use servo_url::ServoUrl; use std::net::TcpListener as StdTcpListener; use std::path::PathBuf; use std::sync::{Arc, Mutex, Weak}; use tokio::net::TcpListener; -use tokio::reactor::Handle; -use tokio::runtime::Runtime; -use tokio_openssl::SslAcceptorExt; +use tokio::net::TcpStream; +use tokio::runtime::{Builder, Runtime}; +use tokio_openssl::SslStream; +use tokio_stream::wrappers::TcpListenerStream; use tokio_test::block_on; lazy_static! { - pub static ref HANDLE: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap()); + pub static ref HANDLE: Mutex<Runtime> = Mutex::new( + Builder::new_multi_thread() + .enable_io() + .worker_threads(10) + .build() + .unwrap() + ); } const DEFAULT_USER_AGENT: &'static str = "Such Browser. Very Layout. Wow."; @@ -134,33 +144,34 @@ fn fetch(request: &mut Request, dc: Option<Sender<DevtoolsControlMsg>>) -> Respo fn fetch_with_context(request: &mut Request, mut context: &mut FetchContext) -> Response { let (sender, receiver) = unbounded(); let mut target = FetchResponseCollector { sender: sender }; - - block_on(methods::fetch(request, &mut target, &mut context)); - - receiver.recv().unwrap() + block_on(async move { + methods::fetch(request, &mut target, &mut context).await; + receiver.recv().unwrap() + }) } fn fetch_with_cors_cache(request: &mut Request, cache: &mut CorsCache) -> Response { let (sender, receiver) = unbounded(); let mut target = FetchResponseCollector { sender: sender }; - - block_on(methods::fetch_with_cors_cache( - request, - cache, - &mut target, - &mut new_fetch_context(None, None, None), - )); - - receiver.recv().unwrap() + block_on(async move { + methods::fetch_with_cors_cache( + request, + cache, + &mut target, + &mut new_fetch_context(None, None, None), + ) + .await; + receiver.recv().unwrap() + }) } pub(crate) struct Server { - pub close_channel: futures::sync::oneshot::Sender<()>, + pub close_channel: tokio::sync::oneshot::Sender<()>, } impl Server { fn close(self) { - self.close_channel.send(()).unwrap(); + self.close_channel.send(()).expect("err closing server:"); } } @@ -172,19 +183,26 @@ where let listener = StdTcpListener::bind("0.0.0.0:0").unwrap(); let url_string = format!("http://localhost:{}", listener.local_addr().unwrap().port()); let url = ServoUrl::parse(&url_string).unwrap(); - let (tx, rx) = futures::sync::oneshot::channel::<()>(); - let server = HyperServer::from_tcp(listener) - .unwrap() - .serve(move || { - let handler = handler.clone(); - service_fn_ok(move |req: HyperRequest<Body>| { - let mut response = HyperResponse::new(Vec::<u8>::new().into()); - handler(req, &mut response); - response + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let server = async move { + HyperServer::from_tcp(listener) + .unwrap() + .serve(make_service_fn(move |_| { + let handler = handler.clone(); + ready(Ok::<_, Infallible>(service_fn( + move |req: HyperRequest<Body>| { + let mut response = HyperResponse::new(Vec::<u8>::new().into()); + handler(req, &mut response); + ready(Ok::<_, Infallible>(response)) + }, + ))) + })) + .with_graceful_shutdown(async move { + rx.await.ok(); }) - }) - .with_graceful_shutdown(rx) - .map_err(|_| ()); + .await + .expect("Could not start server"); + }; HANDLE.lock().unwrap().spawn(server); let server = Server { close_channel: tx }; @@ -197,43 +215,64 @@ where { let handler = Arc::new(handler); let listener = StdTcpListener::bind("[::0]:0").unwrap(); - let listener = TcpListener::from_std(listener, &Handle::default()).unwrap(); + let listener = HANDLE + .lock() + .unwrap() + .block_on(async move { TcpListener::from_std(listener).unwrap() }); + let url_string = format!("http://localhost:{}", listener.local_addr().unwrap().port()); + let mut listener = TcpListenerStream::new(listener); + let url = ServoUrl::parse(&url_string).unwrap(); + let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let server = listener.incoming().map_err(|_| ()).for_each(move |sock| { - let mut tls_server_config = SslAcceptor::mozilla_intermediate_v5(SslMethod::tls()).unwrap(); - tls_server_config - .set_certificate_file(&cert_path, SslFiletype::PEM) - .unwrap(); - tls_server_config - .set_private_key_file(&key_path, SslFiletype::PEM) - .unwrap(); - - let handler = handler.clone(); - tls_server_config - .build() - .accept_async(sock) - .map_err(|_| ()) - .and_then(move |ssl| { - Http::new() - .serve_connection( - ssl, - service_fn_ok(move |req: HyperRequest<Body>| { - let mut response = HyperResponse::new(Vec::<u8>::new().into()); - handler(req, &mut response); - response - }), - ) - .map_err(|_| ()) - }) - }); + let server = async move { + loop { + let stream = tokio::select! { + stream = listener.next() => stream, + _ = &mut rx => break + }; + + let stream = match stream { + Some(stream) => stream.expect("Could not accept stream: "), + _ => break, + }; + + let stream = stream.into_std().unwrap(); + stream + .set_read_timeout(Some(std::time::Duration::new(5, 0))) + .unwrap(); + let stream = TcpStream::from_std(stream).unwrap(); + + let mut tls_server_config = + SslAcceptor::mozilla_intermediate_v5(SslMethod::tls()).unwrap(); + tls_server_config + .set_certificate_file(&cert_path, SslFiletype::PEM) + .unwrap(); + tls_server_config + .set_private_key_file(&key_path, SslFiletype::PEM) + .unwrap(); + + let tls_server_config = tls_server_config.build(); + let ssl = Ssl::new(tls_server_config.context()).unwrap(); + let mut stream = SslStream::new(ssl, stream).unwrap(); - let (tx, rx) = futures::sync::oneshot::channel::<()>(); - let server = server - .select(rx.map_err(|_| ())) - .map(|_| ()) - .map_err(|_| ()); + let _ = Pin::new(&mut stream).accept().await; + + let handler = handler.clone(); + + let _ = Http::new() + .serve_connection( + stream, + service_fn(move |req: HyperRequest<Body>| { + let mut response = HyperResponse::new(Body::empty()); + handler(req, &mut response); + ready(Ok::<_, Infallible>(response)) + }), + ) + .await; + } + }; HANDLE.lock().unwrap().spawn(server); |