diff options
Diffstat (limited to 'components/net')
-rw-r--r-- | components/net/Cargo.toml | 1 | ||||
-rw-r--r-- | components/net/lib.rs | 2 | ||||
-rw-r--r-- | components/net/resource_task.rs | 10 | ||||
-rw-r--r-- | components/net/websocket_loader.rs | 125 |
4 files changed, 138 insertions, 0 deletions
diff --git a/components/net/Cargo.toml b/components/net/Cargo.toml index 7029d9ef0e1..8e87cee1361 100644 --- a/components/net/Cargo.toml +++ b/components/net/Cargo.toml @@ -40,3 +40,4 @@ flate2 = "0.2.0" uuid = "0.1.16" euclid = {version = "0.4", features = ["plugins"]} url = "0.5" +websocket = "0.14.0" diff --git a/components/net/lib.rs b/components/net/lib.rs index 00ee808809e..b445727ef41 100644 --- a/components/net/lib.rs +++ b/components/net/lib.rs @@ -28,6 +28,7 @@ extern crate time; extern crate url; extern crate util; extern crate uuid; +extern crate websocket; pub mod about_loader; pub mod cookie; @@ -41,6 +42,7 @@ pub mod mime_classifier; pub mod pub_domains; pub mod resource_task; pub mod storage_task; +pub mod websocket_loader; /// An implementation of the [Fetch spec](https://fetch.spec.whatwg.org/) pub mod fetch { diff --git a/components/net/resource_task.rs b/components/net/resource_task.rs index d5e0e273b5e..4599e8dd1d5 100644 --- a/components/net/resource_task.rs +++ b/components/net/resource_task.rs @@ -20,6 +20,7 @@ use mime_classifier::{ApacheBugFlag, MIMEClassifier, NoSniffFlag}; use net_traits::ProgressMsg::Done; use net_traits::{AsyncResponseTarget, Metadata, ProgressMsg, ResourceTask, ResponseAction}; use net_traits::{ControlMsg, CookieSource, LoadConsumer, LoadData, LoadResponse, ResourceId}; +use net_traits::{WebSocketCommunicate, WebSocketConnectData}; use std::borrow::ToOwned; use std::boxed::FnBox; use std::cell::Cell; @@ -29,6 +30,7 @@ use std::sync::{Arc, RwLock}; use url::Url; use util::opts; use util::task::spawn_named; +use websocket_loader; pub enum ProgressSender { Channel(IpcSender<ProgressMsg>), @@ -174,6 +176,8 @@ impl ResourceChannelManager { match self.from_client.recv().unwrap() { ControlMsg::Load(load_data, consumer, id_sender) => self.resource_manager.load(load_data, consumer, id_sender, control_sender.clone()), + ControlMsg::WebsocketConnect(connect, connect_data) => + self.resource_manager.websocket_connect(connect, connect_data), ControlMsg::SetCookiesForUrl(request, cookie_list, source) => self.resource_manager.set_cookies_for_url(request, cookie_list, source), ControlMsg::GetCookiesForUrl(url, consumer, source) => { @@ -350,4 +354,10 @@ impl ResourceManager { self.mime_classifier.clone(), cancel_listener)); } + + fn websocket_connect(&self, + connect: WebSocketCommunicate, + connect_data: WebSocketConnectData) { + websocket_loader::init(connect, connect_data); + } } diff --git a/components/net/websocket_loader.rs b/components/net/websocket_loader.rs new file mode 100644 index 00000000000..2ae54dc8eb2 --- /dev/null +++ b/components/net/websocket_loader.rs @@ -0,0 +1,125 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use hyper::header::Host; +use net_traits::MessageData; +use net_traits::hosts::replace_hosts; +use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent}; +use std::sync::{Arc, Mutex}; +use std::thread; +use util::task::spawn_named; +use websocket::client::receiver::Receiver; +use websocket::client::request::Url; +use websocket::client::sender::Sender; +use websocket::header::Origin; +use websocket::message::Type; +use websocket::result::WebSocketResult; +use websocket::stream::WebSocketStream; +use websocket::ws::receiver::Receiver as WSReceiver; +use websocket::ws::sender::Sender as Sender_Object; +use websocket::ws::util::url::parse_url; +use websocket::{Client, Message}; + +/// *Establish a WebSocket Connection* as defined in RFC 6455. +fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool), + origin: String) + -> WebSocketResult<(Sender<WebSocketStream>, Receiver<WebSocketStream>)> { + + let host = Host { + hostname: resource_url.serialize_host().unwrap(), + port: resource_url.port_or_default() + }; + + let mut request = try!(Client::connect(net_url)); + request.headers.set(Origin(origin)); + request.headers.set(host); + + let response = try!(request.send()); + try!(response.validate()); + + Ok(response.begin().split()) +} + +pub fn init(connect: WebSocketCommunicate, connect_data: WebSocketConnectData) { + spawn_named(format!("WebSocket connection to {}", connect_data.resource_url), move || { + // Step 8: Protocols. + + // Step 9. + + // URL that we actually fetch from the network, after applying the replacements + // specified in the hosts file. + let net_url_result = parse_url(&replace_hosts(&connect_data.resource_url)); + let net_url = match net_url_result { + Ok(net_url) => net_url, + Err(e) => { + debug!("Failed to establish a WebSocket connection: {:?}", e); + let _ = connect.event_sender.send(WebSocketNetworkEvent::Close); + return; + } + }; + let channel = establish_a_websocket_connection(&connect_data.resource_url, + net_url, + connect_data.origin); + let (ws_sender, mut receiver) = match channel { + Ok(channel) => { + let _ = connect.event_sender.send(WebSocketNetworkEvent::ConnectionEstablished); + channel + }, + Err(e) => { + debug!("Failed to establish a WebSocket connection: {:?}", e); + let _ = connect.event_sender.send(WebSocketNetworkEvent::Close); + return; + } + + }; + + let ws_sender = Arc::new(Mutex::new(ws_sender)); + + let ws_sender_incoming = ws_sender.clone(); + let resource_event_sender = connect.event_sender; + thread::spawn(move || { + for message in receiver.incoming_messages() { + let message: Message = match message { + Ok(m) => m, + Err(_) => break, + }; + let message = match message.opcode { + Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()), + Type::Binary => MessageData::Binary(message.payload.into_owned()), + Type::Ping => { + let pong = Message::pong(message.payload); + ws_sender_incoming.lock().unwrap().send_message(&pong).unwrap(); + continue; + }, + Type::Pong => continue, + Type::Close => { + ws_sender_incoming.lock().unwrap().send_message(&message).unwrap(); + let _ = resource_event_sender.send(WebSocketNetworkEvent::Close); + break; + }, + }; + let _ = resource_event_sender.send(WebSocketNetworkEvent::MessageReceived(message)); + } + }); + + let ws_sender_outgoing = ws_sender.clone(); + let resource_action_receiver = connect.action_receiver; + thread::spawn(move || { + while let Ok(dom_action) = resource_action_receiver.recv() { + match dom_action { + WebSocketDomAction::SendMessage(MessageData::Text(data)) => { + ws_sender_outgoing.lock().unwrap().send_message(&Message::text(data)).unwrap(); + }, + WebSocketDomAction::SendMessage(MessageData::Binary(data)) => { + ws_sender_outgoing.lock().unwrap().send_message(&Message::binary(data)).unwrap(); + }, + WebSocketDomAction::Close(code, reason) => { + ws_sender_outgoing.lock().unwrap() + .send_message(&Message::close_because(code, reason)).unwrap(); + }, + } + } + }); + }); +} |