diff options
author | Nova Fallen <nfallen@seas.upenn.edu> | 2015-12-02 23:54:24 -0500 |
---|---|---|
committer | Nova Fallen <nfallen@seas.upenn.edu> | 2015-12-08 02:04:40 -0500 |
commit | e8c8277f34f35ee23f70568c4f725a1ffcc0f66b (patch) | |
tree | da9d122b957072283a926b27cae8e61b04a379e5 /components | |
parent | 9f9d3570fc27d08e2afff2b90444956c3c5a66a4 (diff) | |
download | servo-e8c8277f34f35ee23f70568c4f725a1ffcc0f66b.tar.gz servo-e8c8277f34f35ee23f70568c4f725a1ffcc0f66b.zip |
move websocket creation to resource task
Diffstat (limited to 'components')
-rw-r--r-- | components/net/Cargo.toml | 1 | ||||
-rw-r--r-- | components/net/lib.rs | 2 | ||||
-rw-r--r-- | components/net/resource_task.rs | 10 | ||||
-rw-r--r-- | components/net/websocket_loader.rs | 125 | ||||
-rw-r--r-- | components/net_traits/Cargo.toml | 1 | ||||
-rw-r--r-- | components/net_traits/lib.rs | 34 | ||||
-rw-r--r-- | components/script/dom/websocket.rs | 156 | ||||
-rw-r--r-- | components/servo/Cargo.lock | 2 |
8 files changed, 230 insertions, 101 deletions
diff --git a/components/net/Cargo.toml b/components/net/Cargo.toml index 7029d9ef0e1..8e87cee1361 100644 --- a/components/net/Cargo.toml +++ b/components/net/Cargo.toml @@ -40,3 +40,4 @@ flate2 = "0.2.0" uuid = "0.1.16" euclid = {version = "0.4", features = ["plugins"]} url = "0.5" +websocket = "0.14.0" diff --git a/components/net/lib.rs b/components/net/lib.rs index 00ee808809e..b445727ef41 100644 --- a/components/net/lib.rs +++ b/components/net/lib.rs @@ -28,6 +28,7 @@ extern crate time; extern crate url; extern crate util; extern crate uuid; +extern crate websocket; pub mod about_loader; pub mod cookie; @@ -41,6 +42,7 @@ pub mod mime_classifier; pub mod pub_domains; pub mod resource_task; pub mod storage_task; +pub mod websocket_loader; /// An implementation of the [Fetch spec](https://fetch.spec.whatwg.org/) pub mod fetch { diff --git a/components/net/resource_task.rs b/components/net/resource_task.rs index d5e0e273b5e..4599e8dd1d5 100644 --- a/components/net/resource_task.rs +++ b/components/net/resource_task.rs @@ -20,6 +20,7 @@ use mime_classifier::{ApacheBugFlag, MIMEClassifier, NoSniffFlag}; use net_traits::ProgressMsg::Done; use net_traits::{AsyncResponseTarget, Metadata, ProgressMsg, ResourceTask, ResponseAction}; use net_traits::{ControlMsg, CookieSource, LoadConsumer, LoadData, LoadResponse, ResourceId}; +use net_traits::{WebSocketCommunicate, WebSocketConnectData}; use std::borrow::ToOwned; use std::boxed::FnBox; use std::cell::Cell; @@ -29,6 +30,7 @@ use std::sync::{Arc, RwLock}; use url::Url; use util::opts; use util::task::spawn_named; +use websocket_loader; pub enum ProgressSender { Channel(IpcSender<ProgressMsg>), @@ -174,6 +176,8 @@ impl ResourceChannelManager { match self.from_client.recv().unwrap() { ControlMsg::Load(load_data, consumer, id_sender) => self.resource_manager.load(load_data, consumer, id_sender, control_sender.clone()), + ControlMsg::WebsocketConnect(connect, connect_data) => + self.resource_manager.websocket_connect(connect, connect_data), ControlMsg::SetCookiesForUrl(request, cookie_list, source) => self.resource_manager.set_cookies_for_url(request, cookie_list, source), ControlMsg::GetCookiesForUrl(url, consumer, source) => { @@ -350,4 +354,10 @@ impl ResourceManager { self.mime_classifier.clone(), cancel_listener)); } + + fn websocket_connect(&self, + connect: WebSocketCommunicate, + connect_data: WebSocketConnectData) { + websocket_loader::init(connect, connect_data); + } } diff --git a/components/net/websocket_loader.rs b/components/net/websocket_loader.rs new file mode 100644 index 00000000000..2ae54dc8eb2 --- /dev/null +++ b/components/net/websocket_loader.rs @@ -0,0 +1,125 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use hyper::header::Host; +use net_traits::MessageData; +use net_traits::hosts::replace_hosts; +use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent}; +use std::sync::{Arc, Mutex}; +use std::thread; +use util::task::spawn_named; +use websocket::client::receiver::Receiver; +use websocket::client::request::Url; +use websocket::client::sender::Sender; +use websocket::header::Origin; +use websocket::message::Type; +use websocket::result::WebSocketResult; +use websocket::stream::WebSocketStream; +use websocket::ws::receiver::Receiver as WSReceiver; +use websocket::ws::sender::Sender as Sender_Object; +use websocket::ws::util::url::parse_url; +use websocket::{Client, Message}; + +/// *Establish a WebSocket Connection* as defined in RFC 6455. +fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool), + origin: String) + -> WebSocketResult<(Sender<WebSocketStream>, Receiver<WebSocketStream>)> { + + let host = Host { + hostname: resource_url.serialize_host().unwrap(), + port: resource_url.port_or_default() + }; + + let mut request = try!(Client::connect(net_url)); + request.headers.set(Origin(origin)); + request.headers.set(host); + + let response = try!(request.send()); + try!(response.validate()); + + Ok(response.begin().split()) +} + +pub fn init(connect: WebSocketCommunicate, connect_data: WebSocketConnectData) { + spawn_named(format!("WebSocket connection to {}", connect_data.resource_url), move || { + // Step 8: Protocols. + + // Step 9. + + // URL that we actually fetch from the network, after applying the replacements + // specified in the hosts file. + let net_url_result = parse_url(&replace_hosts(&connect_data.resource_url)); + let net_url = match net_url_result { + Ok(net_url) => net_url, + Err(e) => { + debug!("Failed to establish a WebSocket connection: {:?}", e); + let _ = connect.event_sender.send(WebSocketNetworkEvent::Close); + return; + } + }; + let channel = establish_a_websocket_connection(&connect_data.resource_url, + net_url, + connect_data.origin); + let (ws_sender, mut receiver) = match channel { + Ok(channel) => { + let _ = connect.event_sender.send(WebSocketNetworkEvent::ConnectionEstablished); + channel + }, + Err(e) => { + debug!("Failed to establish a WebSocket connection: {:?}", e); + let _ = connect.event_sender.send(WebSocketNetworkEvent::Close); + return; + } + + }; + + let ws_sender = Arc::new(Mutex::new(ws_sender)); + + let ws_sender_incoming = ws_sender.clone(); + let resource_event_sender = connect.event_sender; + thread::spawn(move || { + for message in receiver.incoming_messages() { + let message: Message = match message { + Ok(m) => m, + Err(_) => break, + }; + let message = match message.opcode { + Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()), + Type::Binary => MessageData::Binary(message.payload.into_owned()), + Type::Ping => { + let pong = Message::pong(message.payload); + ws_sender_incoming.lock().unwrap().send_message(&pong).unwrap(); + continue; + }, + Type::Pong => continue, + Type::Close => { + ws_sender_incoming.lock().unwrap().send_message(&message).unwrap(); + let _ = resource_event_sender.send(WebSocketNetworkEvent::Close); + break; + }, + }; + let _ = resource_event_sender.send(WebSocketNetworkEvent::MessageReceived(message)); + } + }); + + let ws_sender_outgoing = ws_sender.clone(); + let resource_action_receiver = connect.action_receiver; + thread::spawn(move || { + while let Ok(dom_action) = resource_action_receiver.recv() { + match dom_action { + WebSocketDomAction::SendMessage(MessageData::Text(data)) => { + ws_sender_outgoing.lock().unwrap().send_message(&Message::text(data)).unwrap(); + }, + WebSocketDomAction::SendMessage(MessageData::Binary(data)) => { + ws_sender_outgoing.lock().unwrap().send_message(&Message::binary(data)).unwrap(); + }, + WebSocketDomAction::Close(code, reason) => { + ws_sender_outgoing.lock().unwrap() + .send_message(&Message::close_because(code, reason)).unwrap(); + }, + } + } + }); + }); +} diff --git a/components/net_traits/Cargo.toml b/components/net_traits/Cargo.toml index 0822ff7054d..296ca750b57 100644 --- a/components/net_traits/Cargo.toml +++ b/components/net_traits/Cargo.toml @@ -30,3 +30,4 @@ image = "0.5.0" serde = "0.6" serde_macros = "0.6" url = "0.5" +websocket = "0.14.0" diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs index 267163fad53..5da54d7c241 100644 --- a/components/net_traits/lib.rs +++ b/components/net_traits/lib.rs @@ -23,6 +23,7 @@ extern crate serde; extern crate stb_image; extern crate url; extern crate util; +extern crate websocket; use hyper::header::{ContentType, Headers}; use hyper::http::RawStatus; @@ -225,10 +226,43 @@ pub enum IncludeSubdomains { NotIncluded } +#[derive(HeapSizeOf, Deserialize, Serialize)] +pub enum MessageData { + Text(String), + Binary(Vec<u8>), +} + +#[derive(Deserialize, Serialize)] +pub enum WebSocketDomAction { + SendMessage(MessageData), + Close(u16, String), +} + +#[derive(Deserialize, Serialize)] +pub enum WebSocketNetworkEvent { + ConnectionEstablished, + MessageReceived(MessageData), + Close, +} + +#[derive(Deserialize, Serialize)] +pub struct WebSocketCommunicate { + pub event_sender: IpcSender<WebSocketNetworkEvent>, + pub action_receiver: IpcReceiver<WebSocketDomAction>, +} + +#[derive(Deserialize, Serialize)] +pub struct WebSocketConnectData { + pub resource_url: Url, + pub origin: String, +} + #[derive(Deserialize, Serialize)] pub enum ControlMsg { /// Request the data associated with a particular URL Load(LoadData, LoadConsumer, Option<IpcSender<ResourceId>>), + /// Try to make a websocket connection to a URL. + WebsocketConnect(WebSocketCommunicate, WebSocketConnectData), /// Store a set of cookies for a given originating URL SetCookiesForUrl(Url, String, CookieSource), /// Retrieve the stored cookies for a given URL diff --git a/components/script/dom/websocket.rs b/components/script/dom/websocket.rs index 79bdd0fdfd6..125ab9065ee 100644 --- a/components/script/dom/websocket.rs +++ b/components/script/dom/websocket.rs @@ -21,32 +21,25 @@ use dom::closeevent::CloseEvent; use dom::event::{Event, EventBubbles, EventCancelable}; use dom::eventtarget::EventTarget; use dom::messageevent::MessageEvent; -use hyper::header::Host; +use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; use js::jsapi::{JSAutoCompartment, JSAutoRequest, RootedValue}; use js::jsapi::{JS_GetArrayBufferData, JS_NewArrayBuffer}; use js::jsval::UndefinedValue; use libc::{uint32_t, uint8_t}; +use net_traits::ControlMsg::WebsocketConnect; +use net_traits::MessageData; use net_traits::hosts::replace_hosts; +use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent}; use ref_slice::ref_slice; use script_task::ScriptTaskEventCategory::WebSocketEvent; use script_task::{CommonScriptMsg, Runnable}; use std::borrow::ToOwned; use std::cell::Cell; use std::ptr; -use std::sync::{Arc, Mutex}; +use std::thread; use util::str::DOMString; -use util::task::spawn_named; -use websocket::client::receiver::Receiver; use websocket::client::request::Url; -use websocket::client::sender::Sender; -use websocket::header::Origin; -use websocket::message::Type; -use websocket::result::WebSocketResult; -use websocket::stream::WebSocketStream; -use websocket::ws::receiver::Receiver as WSReceiver; -use websocket::ws::sender::Sender as Sender_Object; use websocket::ws::util::url::parse_url; -use websocket::{Client, Message}; #[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)] enum WebSocketRequestState { @@ -56,14 +49,6 @@ enum WebSocketRequestState { Closed = 3, } -no_jsmanaged_fields!(Sender<WebSocketStream>); - -#[derive(HeapSizeOf)] -enum MessageData { - Text(String), - Binary(Vec<u8>), -} - // list of blacklist ports according to // http://mxr.mozilla.org/mozilla-central/source/netwerk/base/nsIOService.cpp#87 const BLOCKED_PORTS_LIST: &'static [u16] = &[ @@ -154,7 +139,7 @@ pub struct WebSocket { buffered_amount: Cell<u32>, clearing_buffer: Cell<bool>, //Flag to tell if there is a running task to clear buffered_amount #[ignore_heap_size_of = "Defined in std"] - sender: DOMRefCell<Option<Arc<Mutex<Sender<WebSocketStream>>>>>, + sender: DOMRefCell<Option<IpcSender<WebSocketDomAction>>>, failed: Cell<bool>, //Flag to tell if websocket was closed due to failure full: Cell<bool>, //Flag to tell if websocket queue is full clean_close: Cell<bool>, //Flag to tell if the websocket closed cleanly (not due to full or fail) @@ -163,29 +148,6 @@ pub struct WebSocket { binary_type: Cell<BinaryType>, } -/// *Establish a WebSocket Connection* as defined in RFC 6455. -fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool), - origin: String) - -> WebSocketResult<(Sender<WebSocketStream>, Receiver<WebSocketStream>)> { - // URL that we actually fetch from the network, after applying the replacements - // specified in the hosts file. - - let host = Host { - hostname: resource_url.serialize_host().unwrap(), - port: resource_url.port_or_default() - }; - - let mut request = try!(Client::connect(net_url)); - request.headers.set(Origin(origin)); - request.headers.set(host); - - let response = try!(request.send()); - try!(response.validate()); - - Ok(response.begin().split()) -} - - impl WebSocket { fn new_inherited(global: GlobalRef, url: Url) -> WebSocket { WebSocket { @@ -217,8 +179,9 @@ impl WebSocket { -> Fallible<Root<WebSocket>> { // Step 1. let resource_url = try!(Url::parse(&url).map_err(|_| Error::Syntax)); - let net_url = try!(parse_url(&replace_hosts(&resource_url)).map_err(|_| Error::Syntax)); - + // Although we do this replace and parse operation again in the resource task, + // we try here to be able to immediately throw a syntax error on failure. + let _ = try!(parse_url(&replace_hosts(&resource_url)).map_err(|_| Error::Syntax)); // Step 2: Disallow https -> ws connections. // Step 3: Potentially block access to some ports. @@ -257,62 +220,58 @@ impl WebSocket { let address = Trusted::new(global.get_cx(), ws.r(), global.networking_task_source()); let origin = global.get_url().serialize(); + + let connect_data = WebSocketConnectData { + resource_url: resource_url.clone(), + origin: origin, + }; + + // Create the interface for communication with the resource task + let (dom_action_sender, resource_action_receiver): + (IpcSender<WebSocketDomAction>, + IpcReceiver<WebSocketDomAction>) = ipc::channel().unwrap(); + let (resource_event_sender, dom_event_receiver): + (IpcSender<WebSocketNetworkEvent>, + IpcReceiver<WebSocketNetworkEvent>) = ipc::channel().unwrap(); + + let connect = WebSocketCommunicate { + event_sender: resource_event_sender, + action_receiver: resource_action_receiver, + }; + + let resource_task = global.resource_task(); + let _ = resource_task.send(WebsocketConnect(connect, connect_data)); + + *ws.sender.borrow_mut() = Some(dom_action_sender); + + let moved_address = address.clone(); let sender = global.networking_task_source(); - spawn_named(format!("WebSocket connection to {}", ws.Url()), move || { - // Step 8: Protocols. - - // Step 9. - let channel = establish_a_websocket_connection(&resource_url, net_url, origin); - let (ws_sender, mut receiver) = match channel { - Ok(channel) => channel, - Err(e) => { - debug!("Failed to establish a WebSocket connection: {:?}", e); - let task = box CloseTask { - addr: address, - }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)).unwrap(); - return; - } - }; - let ws_sender = Arc::new(Mutex::new(ws_sender)); - let open_task = box ConnectionEstablishedTask { - addr: address.clone(), - sender: ws_sender.clone(), - }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_task)).unwrap(); - - for message in receiver.incoming_messages() { - let message: Message = match message { - Ok(m) => m, - Err(_) => break, - }; - let message = match message.opcode { - Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()), - Type::Binary => MessageData::Binary(message.payload.into_owned()), - Type::Ping => { - let pong = Message::pong(message.payload); - ws_sender.lock().unwrap().send_message(&pong).unwrap(); - continue; + thread::spawn(move || { + while let Ok(event) = dom_event_receiver.recv() { + match event { + WebSocketNetworkEvent::ConnectionEstablished => { + let open_task = box ConnectionEstablishedTask { + addr: moved_address.clone(), + }; + sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_task)).unwrap(); }, - Type::Pong => continue, - Type::Close => { - ws_sender.lock().unwrap().send_message(&message).unwrap(); + WebSocketNetworkEvent::MessageReceived(message) => { + let message_task = box MessageReceivedTask { + address: moved_address.clone(), + message: message, + }; + sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_task)).unwrap(); + }, + WebSocketNetworkEvent::Close => { let task = box CloseTask { - addr: address, + addr: moved_address.clone(), }; sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)).unwrap(); - break; }, - }; - let message_task = box MessageReceivedTask { - address: address.clone(), - message: message, - }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_task)).unwrap(); + } } }); - // Step 7. Ok(ws) } @@ -408,7 +367,7 @@ impl WebSocketMethods for WebSocket { if send_data { let mut other_sender = self.sender.borrow_mut(); let my_sender = other_sender.as_mut().unwrap(); - let _ = my_sender.lock().unwrap().send_message(&Message::text(data.0)); + let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Text(data.0))); } Ok(()) @@ -427,7 +386,7 @@ impl WebSocketMethods for WebSocket { if send_data { let mut other_sender = self.sender.borrow_mut(); let my_sender = other_sender.as_mut().unwrap(); - let _ = my_sender.lock().unwrap().send_message(&Message::binary(data.clone_bytes())); + let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Binary(data.clone_bytes()))); } Ok(()) @@ -443,11 +402,10 @@ impl WebSocketMethods for WebSocket { if let Some(sender) = sender.as_mut() { let code: u16 = this.code.get(); let reason = this.reason.borrow().clone(); - let _ = sender.lock().unwrap().send_message(&Message::close_because(code, reason)); + let _ = sender.send(WebSocketDomAction::Close(code, reason)); } } - if let Some(code) = code { //Fail if the supplied code isn't normal and isn't reserved for libraries, frameworks, and applications if code != close_code::NORMAL && (code < 3000 || code > 4999) { @@ -490,15 +448,11 @@ impl WebSocketMethods for WebSocket { /// Task queued when *the WebSocket connection is established*. struct ConnectionEstablishedTask { addr: Trusted<WebSocket>, - sender: Arc<Mutex<Sender<WebSocketStream>>>, } impl Runnable for ConnectionEstablishedTask { fn handler(self: Box<Self>) { let ws = self.addr.root(); - - *ws.sender.borrow_mut() = Some(self.sender); - // Step 1: Protocols. // Step 2. diff --git a/components/servo/Cargo.lock b/components/servo/Cargo.lock index 51d117f6198..a76580c6122 100644 --- a/components/servo/Cargo.lock +++ b/components/servo/Cargo.lock @@ -1156,6 +1156,7 @@ dependencies = [ "url 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "util 0.0.1", "uuid 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "websocket 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1204,6 +1205,7 @@ dependencies = [ "stb_image 0.2.0 (git+https://github.com/servo/rust-stb-image)", "url 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "util 0.0.1", + "websocket 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] |