/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use cookie::Cookie; use fetch::methods::{should_be_blocked_due_to_bad_port, should_be_blocked_due_to_nosniff}; use hosts::replace_host; use http_loader::{HttpState, is_redirect_status, set_default_accept}; use http_loader::{set_default_accept_language, set_request_cookies}; use hyper::buffer::BufReader; use hyper::header::{CacheControl, CacheDirective, Connection, ConnectionOption}; use hyper::header::{Headers, Host, SetCookie, Pragma, Protocol, ProtocolName, Upgrade}; use hyper::http::h1::{LINE_ENDING, parse_response}; use hyper::method::Method; use hyper::net::HttpStream; use hyper::status::StatusCode; use hyper::version::HttpVersion; use ipc_channel::ipc::{IpcReceiver, IpcSender}; use net_traits::{CookieSource, MessageData, NetworkError}; use net_traits::{WebSocketDomAction, WebSocketNetworkEvent}; use net_traits::request::{Destination, RequestInit, RequestMode}; use servo_url::ServoUrl; use std::io::{self, Write}; use std::net::TcpStream; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use url::Position; use websocket::{Message, Receiver as WSReceiver, Sender as WSSender}; use websocket::header::{Origin, WebSocketAccept, WebSocketKey, WebSocketProtocol, WebSocketVersion}; use websocket::message::Type as MessageType; use websocket::receiver::Receiver; use websocket::sender::Sender; pub fn init( req_init: RequestInit, resource_event_sender: IpcSender, dom_action_receiver: IpcReceiver, http_state: Arc ) { thread::Builder::new().name(format!("WebSocket connection to {}", req_init.url)).spawn(move || { let channel = establish_a_websocket_connection(req_init, &http_state); let (ws_sender, mut receiver) = match channel { Ok((protocol_in_use, sender, receiver)) => { let _ = resource_event_sender.send(WebSocketNetworkEvent::ConnectionEstablished { protocol_in_use }); (sender, receiver) }, Err(e) => { debug!("Failed to establish a WebSocket connection: {:?}", e); let _ = resource_event_sender.send(WebSocketNetworkEvent::Fail); return; } }; let initiated_close = Arc::new(AtomicBool::new(false)); let ws_sender = Arc::new(Mutex::new(ws_sender)); let initiated_close_incoming = initiated_close.clone(); let ws_sender_incoming = ws_sender.clone(); thread::spawn(move || { for message in receiver.incoming_messages() { let message: Message = match message { Ok(m) => m, Err(e) => { debug!("Error receiving incoming WebSocket message: {:?}", e); let _ = resource_event_sender.send(WebSocketNetworkEvent::Fail); break; } }; let message = match message.opcode { MessageType::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()), MessageType::Binary => MessageData::Binary(message.payload.into_owned()), MessageType::Ping => { let pong = Message::pong(message.payload); ws_sender_incoming.lock().unwrap().send_message(&pong).unwrap(); continue; }, MessageType::Pong => continue, MessageType::Close => { if !initiated_close_incoming.fetch_or(true, Ordering::SeqCst) { ws_sender_incoming.lock().unwrap().send_message(&message).unwrap(); } let code = message.cd_status_code; let reason = String::from_utf8_lossy(&message.payload).into_owned(); let _ = resource_event_sender.send(WebSocketNetworkEvent::Close(code, reason)); break; }, }; let _ = resource_event_sender.send(WebSocketNetworkEvent::MessageReceived(message)); } }); while let Ok(dom_action) = dom_action_receiver.recv() { match dom_action { WebSocketDomAction::SendMessage(MessageData::Text(data)) => { ws_sender.lock().unwrap().send_message(&Message::text(data)).unwrap(); }, WebSocketDomAction::SendMessage(MessageData::Binary(data)) => { ws_sender.lock().unwrap().send_message(&Message::binary(data)).unwrap(); }, WebSocketDomAction::Close(code, reason) => { if !initiated_close.fetch_or(true, Ordering::SeqCst) { let message = match code { Some(code) => Message::close_because(code, reason.unwrap_or("".to_owned())), None => Message::close() }; ws_sender.lock().unwrap().send_message(&message).unwrap(); } }, } } }).expect("Thread spawning failed"); } type Stream = HttpStream; // https://fetch.spec.whatwg.org/#concept-websocket-connection-obtain fn obtain_a_websocket_connection(url: &ServoUrl) -> Result { // Step 1. let host = url.host_str().unwrap(); // Step 2. let port = url.port_or_known_default().unwrap(); // Step 3. // We did not replace the scheme by "http" or "https" in step 1 of // establish_a_websocket_connection. let secure = match url.scheme() { "ws" => false, "wss" => true, _ => panic!("URL's scheme should be ws or wss"), }; if secure { return Err(NetworkError::Internal("WSS is disabled for now.".into())); } // Steps 4-5. let host = replace_host(host); let tcp_stream = TcpStream::connect((&*host, port)).map_err(|e| { NetworkError::Internal(format!("Could not connect to host: {}", e)) })?; Ok(HttpStream(tcp_stream)) } // https://fetch.spec.whatwg.org/#concept-websocket-establish fn establish_a_websocket_connection( req_init: RequestInit, http_state: &HttpState ) -> Result<(Option, Sender, Receiver), NetworkError> { let protocols = match req_init.mode { RequestMode::WebSocket { protocols } => protocols.clone(), _ => panic!("Received a RequestInit with a non-websocket mode in websocket_loader"), }; // Steps 1 is not really applicable here, given we don't exactly go // through the same infrastructure as the Fetch spec. // Step 2, slimmed down because we don't go through the whole Fetch infra. let mut headers = Headers::new(); // Step 3. headers.set(Upgrade(vec![Protocol::new(ProtocolName::WebSocket, None)])); // Step 4. headers.set(Connection(vec![ConnectionOption::ConnectionHeader("upgrade".into())])); // Step 5. let key_value = WebSocketKey::new(); // Step 6. headers.set(key_value); // Step 7. headers.set(WebSocketVersion::WebSocket13); // Step 8. if !protocols.is_empty() { headers.set(WebSocketProtocol(protocols.clone())); } // Steps 9-10. // TODO: handle permessage-deflate extension. // Step 11 and network error check from step 12. let response = fetch(req_init.url, req_init.origin.ascii_serialization(), headers, http_state)?; // Step 12, the status code check. if response.status != StatusCode::SwitchingProtocols { return Err(NetworkError::Internal("Response's status should be 101.".into())); } // Step 13. if !protocols.is_empty() { if response.headers.get::().map_or(true, |protocols| protocols.is_empty()) { return Err(NetworkError::Internal( "Response's Sec-WebSocket-Protocol header is missing, malformed or empty.".into())); } } // Step 14.2. let upgrade_header = response.headers.get::().ok_or_else(|| { NetworkError::Internal("Response should have an Upgrade header.".into()) })?; if upgrade_header.len() != 1 { return Err(NetworkError::Internal("Response's Upgrade header should have only one value.".into())); } if upgrade_header[0].name != ProtocolName::WebSocket { return Err(NetworkError::Internal("Response's Upgrade header value should be \"websocket\".".into())); } // Step 14.3. let connection_header = response.headers.get::().ok_or_else(|| { NetworkError::Internal("Response should have a Connection header.".into()) })?; let connection_includes_upgrade = connection_header.iter().any(|option| { match *option { ConnectionOption::ConnectionHeader(ref option) => *option == "upgrade", _ => false, } }); if !connection_includes_upgrade { return Err(NetworkError::Internal("Response's Connection header value should include \"upgrade\".".into())); } // Step 14.4. let accept_header = response.headers.get::().ok_or_else(|| { NetworkError::Internal("Response should have a Sec-Websocket-Accept header.".into()) })?; if *accept_header != WebSocketAccept::new(&key_value) { return Err(NetworkError::Internal( "Response's Sec-WebSocket-Accept header value did not match the sent key.".into())); } // Step 14.5. // TODO: handle permessage-deflate extension. // We don't support any extension, so we fail at the mere presence of // a Sec-WebSocket-Extensions header. if response.headers.get_raw("Sec-WebSocket-Extensions").is_some() { return Err(NetworkError::Internal( "Response's Sec-WebSocket-Extensions header value included unsupported extensions.".into())); } // Step 14.6. let protocol_in_use = if let Some(response_protocols) = response.headers.get::() { for replied in &**response_protocols { if !protocols.iter().any(|requested| requested.eq_ignore_ascii_case(replied)) { return Err(NetworkError::Internal( "Response's Sec-WebSocket-Protocols contain values that were not requested.".into())); } } response_protocols.first().cloned() } else { None }; let sender = Sender::new(response.writer, true); let receiver = Receiver::new(response.reader, false); Ok((protocol_in_use, sender, receiver)) } struct Response { status: StatusCode, headers: Headers, reader: BufReader, writer: Stream, } // https://fetch.spec.whatwg.org/#concept-fetch fn fetch(url: ServoUrl, origin: String, mut headers: Headers, http_state: &HttpState) -> Result { // Step 1. // TODO: handle request's window. // Step 2. // TODO: handle request's origin. // Step 3. set_default_accept(Destination::None, &mut headers); // Step 4. set_default_accept_language(&mut headers); // Step 5. // TODO: handle request's priority. // Step 6. // Not applicable: not a navigation request. // Step 7. // We know this is a subresource request. { // Step 7.1. // Not applicable: client hints list is empty. // Steps 7.2-3. // TODO: handle fetch groups. } // Step 8. main_fetch(url, origin, headers, http_state) } // https://fetch.spec.whatwg.org/#concept-main-fetch fn main_fetch(url: ServoUrl, origin: String, mut headers: Headers, http_state: &HttpState) -> Result { // Step 1. let mut response = None; // Step 2. // Not applicable: request’s local-URLs-only flag is unset. // Step 3. // TODO: handle content security policy violations. // Step 4. // TODO: handle upgrade to a potentially secure URL. // Step 5. if should_be_blocked_due_to_bad_port(&url) { response = Some(Err(NetworkError::Internal("Request should be blocked due to bad port.".into()))); } // TODO: handle blocking as mixed content. // TODO: handle blocking by content security policy. // Steps 6-8. // TODO: handle request's referrer policy. // Step 9. // Not applicable: request's current URL's scheme is not "ftp". // Step 10. // TODO: handle known HSTS host domain. // Step 11. // Not applicable: request's synchronous flag is set. // Step 12. let mut response = response.unwrap_or_else(|| { // We must run the first sequence of substeps, given request's mode // is "websocket". // Step 12.1. // Not applicable: the response is never exposed to the Web so it // doesn't need to be filtered at all. // Step 12.2. scheme_fetch(&url, origin, &mut headers, http_state) }); // Step 13. // Not applicable: recursive flag is unset. // Step 14. // Not applicable: the response is never exposed to the Web so it doesn't // need to be filtered at all. // Steps 15-16. // Not applicable: no need to maintain an internal response. // Step 17. if response.is_ok() { // TODO: handle blocking as mixed content. // TODO: handle blocking by content security policy. // Not applicable: blocking due to MIME type matters only for scripts. if should_be_blocked_due_to_nosniff(Destination::None, &headers) { response = Err(NetworkError::Internal("Request should be blocked due to nosniff.".into())); } } // Step 18. // Not applicable: we don't care about the body at all. // Step 19. // Not applicable: request's integrity metadata is the empty string. // Step 20. // TODO: wait for response's body here, maybe? response } // https://fetch.spec.whatwg.org/#concept-scheme-fetch fn scheme_fetch(url: &ServoUrl, origin: String, headers: &mut Headers, http_state: &HttpState) -> Result { // In the case of a WebSocket request, HTTP fetch is always used. http_fetch(url, origin, headers, http_state) } // https://fetch.spec.whatwg.org/#concept-http-fetch fn http_fetch(url: &ServoUrl, origin: String, headers: &mut Headers, http_state: &HttpState) -> Result { // Step 1. // Not applicable: with step 3 being useless here, this one is too. // Step 2. // Not applicable: we don't need to maintain an internal response. // Step 3. // Not applicable: request's service-workers mode is "none". // Step 4. // There cannot be a response yet at this point. let mut response = { // Step 4.1. // Not applicable: CORS-preflight flag is unset. // Step 4.2. // Not applicable: request's redirect mode is "error". // Step 4.3. let response = http_network_or_cache_fetch(url, origin, headers, http_state); // Step 4.4. // Not applicable: CORS flag is unset. response }; // Step 5. if response.as_ref().ok().map_or(false, |response| is_redirect_status(response.status)) { // Step 5.1. // Not applicable: the connection does not use HTTP/2. // Steps 5.2-4. // Not applicable: matters only if request's redirect mode is not "error". // Step 5.5. // Request's redirect mode is "error". response = Err(NetworkError::Internal("Response should not be a redirection.".into())); } // Step 6. response } // https://fetch.spec.whatwg.org/#concept-http-network-or-cache-fetch fn http_network_or_cache_fetch(url: &ServoUrl, origin: String, headers: &mut Headers, http_state: &HttpState) -> Result { // Steps 1-3. // Not applicable: we don't even have a request yet, and there is no body // in a WebSocket request. // Step 4. // Not applicable: credentials flag is always set // because credentials mode is "include." // Steps 5-9. // Not applicable: there is no body in a WebSocket request. // Step 10. // TODO: handle header Referer. // Step 11. // Request's mode is "websocket". headers.set(Origin(origin)); // Step 12. // TODO: handle header User-Agent. // Steps 13-14. // Not applicable: request's cache mode is "no-store". // Step 15. { // Step 15.1. // We know there is no Pragma header yet. headers.set(Pragma::NoCache); // Step 15.2. // We know there is no Cache-Control header yet. headers.set(CacheControl(vec![CacheDirective::NoCache])); } // Step 16. // TODO: handle Accept-Encoding. // Not applicable: Connection header is already present. // TODO: handle DNT. headers.set(Host { hostname: url.host_str().unwrap().to_owned(), port: url.port(), }); // Step 17. // Credentials flag is set. { // Step 17.1. // TODO: handle user agent configured to block cookies. set_request_cookies(&url, headers, &http_state.cookie_jar); // Steps 17.2-6. // Not applicable: request has no Authorization header. } // Step 18. // TODO: proxy-authentication entry. // Step 19. // Not applicable: with step 21 being useless, this one is too. // Step 20. // Not applicable: revalidatingFlag is only useful if step 21 is. // Step 21. // Not applicable: cache mode is "no-store". // Step 22. // There is no response yet. let response = { // Step 22.1. // Not applicable: cache mode is "no-store". // Step 22.2. let forward_response = http_network_fetch(url, headers, http_state); // Step 22.3. // Not applicable: request's method is not unsafe. // Step 22.4. // Not applicable: revalidatingFlag is unset. // Step 22.5. // There is no response yet and the response should not be cached. forward_response }; // Step 23. // TODO: handle 401 status when request's window is not "no-window". // Step 24. // TODO: handle 407 status when request's window is not "no-window". // Step 25. // Not applicable: authentication-fetch flag is unset. // Step 26. response } // https://fetch.spec.whatwg.org/#concept-http-network-fetch fn http_network_fetch(url: &ServoUrl, headers: &Headers, http_state: &HttpState) -> Result { // Step 1. // Not applicable: credentials flag is set. // Steps 2-3. // Request's mode is "websocket". let connection = obtain_a_websocket_connection(url)?; // Step 4. // Not applicable: request’s body is null. // Step 5. let response = make_request(connection, url, headers)?; // Steps 6-12. // Not applicable: correct WebSocket responses don't have a body. // Step 13. // TODO: handle response's CSP list. // Step 14. // Not applicable: request's cache mode is "no-store". // Step 15. if let Some(cookies) = response.headers.get::() { let mut jar = http_state.cookie_jar.write().unwrap(); for cookie in &**cookies { if let Some(cookie) = Cookie::from_cookie_string(cookie.clone(), url, CookieSource::HTTP) { jar.push(cookie, url, CookieSource::HTTP); } } } // Step 16. // Not applicable: correct WebSocket responses don't have a body. // Step 17. Ok(response) } fn make_request(mut stream: Stream, url: &ServoUrl, headers: &Headers) -> Result { write_request(&mut stream, url, headers).map_err(|e| { NetworkError::Internal(format!("Request could not be sent: {}", e)) })?; // FIXME: Stream isn't supposed to be cloned. let writer = stream.clone(); // FIXME: BufReader from hyper isn't supposed to be used. let mut reader = BufReader::new(stream); let head = parse_response(&mut reader).map_err(|e| { NetworkError::Internal(format!("Response could not be read: {}", e)) })?; // This isn't in the spec, but this is the correct thing to do for WebSocket requests. if head.version != HttpVersion::Http11 { return Err(NetworkError::Internal("Response's HTTP version should be HTTP/1.1.".into())); } // FIXME: StatusCode::from_u16 isn't supposed to be used. let status = StatusCode::from_u16(head.subject.0); Ok(Response { status: status, headers: head.headers, reader: reader, writer: writer, }) } fn write_request(stream: &mut Stream, url: &ServoUrl, headers: &Headers) -> io::Result<()> { // Write "GET /foo/bar HTTP/1.1\r\n". let method = Method::Get; let request_uri = &url.as_url()[Position::BeforePath..Position::AfterQuery]; let version = HttpVersion::Http11; write!(stream, "{} {} {}{}", method, request_uri, version, LINE_ENDING)?; // Write the headers. write!(stream, "{}{}", headers, LINE_ENDING) }