aboutsummaryrefslogtreecommitdiffstats
path: root/components
diff options
context:
space:
mode:
authorNova Fallen <nfallen@seas.upenn.edu>2015-12-02 23:54:24 -0500
committerNova Fallen <nfallen@seas.upenn.edu>2015-12-08 02:04:40 -0500
commite8c8277f34f35ee23f70568c4f725a1ffcc0f66b (patch)
treeda9d122b957072283a926b27cae8e61b04a379e5 /components
parent9f9d3570fc27d08e2afff2b90444956c3c5a66a4 (diff)
downloadservo-e8c8277f34f35ee23f70568c4f725a1ffcc0f66b.tar.gz
servo-e8c8277f34f35ee23f70568c4f725a1ffcc0f66b.zip
move websocket creation to resource task
Diffstat (limited to 'components')
-rw-r--r--components/net/Cargo.toml1
-rw-r--r--components/net/lib.rs2
-rw-r--r--components/net/resource_task.rs10
-rw-r--r--components/net/websocket_loader.rs125
-rw-r--r--components/net_traits/Cargo.toml1
-rw-r--r--components/net_traits/lib.rs34
-rw-r--r--components/script/dom/websocket.rs156
-rw-r--r--components/servo/Cargo.lock2
8 files changed, 230 insertions, 101 deletions
diff --git a/components/net/Cargo.toml b/components/net/Cargo.toml
index 7029d9ef0e1..8e87cee1361 100644
--- a/components/net/Cargo.toml
+++ b/components/net/Cargo.toml
@@ -40,3 +40,4 @@ flate2 = "0.2.0"
uuid = "0.1.16"
euclid = {version = "0.4", features = ["plugins"]}
url = "0.5"
+websocket = "0.14.0"
diff --git a/components/net/lib.rs b/components/net/lib.rs
index 00ee808809e..b445727ef41 100644
--- a/components/net/lib.rs
+++ b/components/net/lib.rs
@@ -28,6 +28,7 @@ extern crate time;
extern crate url;
extern crate util;
extern crate uuid;
+extern crate websocket;
pub mod about_loader;
pub mod cookie;
@@ -41,6 +42,7 @@ pub mod mime_classifier;
pub mod pub_domains;
pub mod resource_task;
pub mod storage_task;
+pub mod websocket_loader;
/// An implementation of the [Fetch spec](https://fetch.spec.whatwg.org/)
pub mod fetch {
diff --git a/components/net/resource_task.rs b/components/net/resource_task.rs
index d5e0e273b5e..4599e8dd1d5 100644
--- a/components/net/resource_task.rs
+++ b/components/net/resource_task.rs
@@ -20,6 +20,7 @@ use mime_classifier::{ApacheBugFlag, MIMEClassifier, NoSniffFlag};
use net_traits::ProgressMsg::Done;
use net_traits::{AsyncResponseTarget, Metadata, ProgressMsg, ResourceTask, ResponseAction};
use net_traits::{ControlMsg, CookieSource, LoadConsumer, LoadData, LoadResponse, ResourceId};
+use net_traits::{WebSocketCommunicate, WebSocketConnectData};
use std::borrow::ToOwned;
use std::boxed::FnBox;
use std::cell::Cell;
@@ -29,6 +30,7 @@ use std::sync::{Arc, RwLock};
use url::Url;
use util::opts;
use util::task::spawn_named;
+use websocket_loader;
pub enum ProgressSender {
Channel(IpcSender<ProgressMsg>),
@@ -174,6 +176,8 @@ impl ResourceChannelManager {
match self.from_client.recv().unwrap() {
ControlMsg::Load(load_data, consumer, id_sender) =>
self.resource_manager.load(load_data, consumer, id_sender, control_sender.clone()),
+ ControlMsg::WebsocketConnect(connect, connect_data) =>
+ self.resource_manager.websocket_connect(connect, connect_data),
ControlMsg::SetCookiesForUrl(request, cookie_list, source) =>
self.resource_manager.set_cookies_for_url(request, cookie_list, source),
ControlMsg::GetCookiesForUrl(url, consumer, source) => {
@@ -350,4 +354,10 @@ impl ResourceManager {
self.mime_classifier.clone(),
cancel_listener));
}
+
+ fn websocket_connect(&self,
+ connect: WebSocketCommunicate,
+ connect_data: WebSocketConnectData) {
+ websocket_loader::init(connect, connect_data);
+ }
}
diff --git a/components/net/websocket_loader.rs b/components/net/websocket_loader.rs
new file mode 100644
index 00000000000..2ae54dc8eb2
--- /dev/null
+++ b/components/net/websocket_loader.rs
@@ -0,0 +1,125 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use hyper::header::Host;
+use net_traits::MessageData;
+use net_traits::hosts::replace_hosts;
+use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent};
+use std::sync::{Arc, Mutex};
+use std::thread;
+use util::task::spawn_named;
+use websocket::client::receiver::Receiver;
+use websocket::client::request::Url;
+use websocket::client::sender::Sender;
+use websocket::header::Origin;
+use websocket::message::Type;
+use websocket::result::WebSocketResult;
+use websocket::stream::WebSocketStream;
+use websocket::ws::receiver::Receiver as WSReceiver;
+use websocket::ws::sender::Sender as Sender_Object;
+use websocket::ws::util::url::parse_url;
+use websocket::{Client, Message};
+
+/// *Establish a WebSocket Connection* as defined in RFC 6455.
+fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool),
+ origin: String)
+ -> WebSocketResult<(Sender<WebSocketStream>, Receiver<WebSocketStream>)> {
+
+ let host = Host {
+ hostname: resource_url.serialize_host().unwrap(),
+ port: resource_url.port_or_default()
+ };
+
+ let mut request = try!(Client::connect(net_url));
+ request.headers.set(Origin(origin));
+ request.headers.set(host);
+
+ let response = try!(request.send());
+ try!(response.validate());
+
+ Ok(response.begin().split())
+}
+
+pub fn init(connect: WebSocketCommunicate, connect_data: WebSocketConnectData) {
+ spawn_named(format!("WebSocket connection to {}", connect_data.resource_url), move || {
+ // Step 8: Protocols.
+
+ // Step 9.
+
+ // URL that we actually fetch from the network, after applying the replacements
+ // specified in the hosts file.
+ let net_url_result = parse_url(&replace_hosts(&connect_data.resource_url));
+ let net_url = match net_url_result {
+ Ok(net_url) => net_url,
+ Err(e) => {
+ debug!("Failed to establish a WebSocket connection: {:?}", e);
+ let _ = connect.event_sender.send(WebSocketNetworkEvent::Close);
+ return;
+ }
+ };
+ let channel = establish_a_websocket_connection(&connect_data.resource_url,
+ net_url,
+ connect_data.origin);
+ let (ws_sender, mut receiver) = match channel {
+ Ok(channel) => {
+ let _ = connect.event_sender.send(WebSocketNetworkEvent::ConnectionEstablished);
+ channel
+ },
+ Err(e) => {
+ debug!("Failed to establish a WebSocket connection: {:?}", e);
+ let _ = connect.event_sender.send(WebSocketNetworkEvent::Close);
+ return;
+ }
+
+ };
+
+ let ws_sender = Arc::new(Mutex::new(ws_sender));
+
+ let ws_sender_incoming = ws_sender.clone();
+ let resource_event_sender = connect.event_sender;
+ thread::spawn(move || {
+ for message in receiver.incoming_messages() {
+ let message: Message = match message {
+ Ok(m) => m,
+ Err(_) => break,
+ };
+ let message = match message.opcode {
+ Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()),
+ Type::Binary => MessageData::Binary(message.payload.into_owned()),
+ Type::Ping => {
+ let pong = Message::pong(message.payload);
+ ws_sender_incoming.lock().unwrap().send_message(&pong).unwrap();
+ continue;
+ },
+ Type::Pong => continue,
+ Type::Close => {
+ ws_sender_incoming.lock().unwrap().send_message(&message).unwrap();
+ let _ = resource_event_sender.send(WebSocketNetworkEvent::Close);
+ break;
+ },
+ };
+ let _ = resource_event_sender.send(WebSocketNetworkEvent::MessageReceived(message));
+ }
+ });
+
+ let ws_sender_outgoing = ws_sender.clone();
+ let resource_action_receiver = connect.action_receiver;
+ thread::spawn(move || {
+ while let Ok(dom_action) = resource_action_receiver.recv() {
+ match dom_action {
+ WebSocketDomAction::SendMessage(MessageData::Text(data)) => {
+ ws_sender_outgoing.lock().unwrap().send_message(&Message::text(data)).unwrap();
+ },
+ WebSocketDomAction::SendMessage(MessageData::Binary(data)) => {
+ ws_sender_outgoing.lock().unwrap().send_message(&Message::binary(data)).unwrap();
+ },
+ WebSocketDomAction::Close(code, reason) => {
+ ws_sender_outgoing.lock().unwrap()
+ .send_message(&Message::close_because(code, reason)).unwrap();
+ },
+ }
+ }
+ });
+ });
+}
diff --git a/components/net_traits/Cargo.toml b/components/net_traits/Cargo.toml
index 0822ff7054d..296ca750b57 100644
--- a/components/net_traits/Cargo.toml
+++ b/components/net_traits/Cargo.toml
@@ -30,3 +30,4 @@ image = "0.5.0"
serde = "0.6"
serde_macros = "0.6"
url = "0.5"
+websocket = "0.14.0"
diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs
index 267163fad53..5da54d7c241 100644
--- a/components/net_traits/lib.rs
+++ b/components/net_traits/lib.rs
@@ -23,6 +23,7 @@ extern crate serde;
extern crate stb_image;
extern crate url;
extern crate util;
+extern crate websocket;
use hyper::header::{ContentType, Headers};
use hyper::http::RawStatus;
@@ -225,10 +226,43 @@ pub enum IncludeSubdomains {
NotIncluded
}
+#[derive(HeapSizeOf, Deserialize, Serialize)]
+pub enum MessageData {
+ Text(String),
+ Binary(Vec<u8>),
+}
+
+#[derive(Deserialize, Serialize)]
+pub enum WebSocketDomAction {
+ SendMessage(MessageData),
+ Close(u16, String),
+}
+
+#[derive(Deserialize, Serialize)]
+pub enum WebSocketNetworkEvent {
+ ConnectionEstablished,
+ MessageReceived(MessageData),
+ Close,
+}
+
+#[derive(Deserialize, Serialize)]
+pub struct WebSocketCommunicate {
+ pub event_sender: IpcSender<WebSocketNetworkEvent>,
+ pub action_receiver: IpcReceiver<WebSocketDomAction>,
+}
+
+#[derive(Deserialize, Serialize)]
+pub struct WebSocketConnectData {
+ pub resource_url: Url,
+ pub origin: String,
+}
+
#[derive(Deserialize, Serialize)]
pub enum ControlMsg {
/// Request the data associated with a particular URL
Load(LoadData, LoadConsumer, Option<IpcSender<ResourceId>>),
+ /// Try to make a websocket connection to a URL.
+ WebsocketConnect(WebSocketCommunicate, WebSocketConnectData),
/// Store a set of cookies for a given originating URL
SetCookiesForUrl(Url, String, CookieSource),
/// Retrieve the stored cookies for a given URL
diff --git a/components/script/dom/websocket.rs b/components/script/dom/websocket.rs
index 79bdd0fdfd6..125ab9065ee 100644
--- a/components/script/dom/websocket.rs
+++ b/components/script/dom/websocket.rs
@@ -21,32 +21,25 @@ use dom::closeevent::CloseEvent;
use dom::event::{Event, EventBubbles, EventCancelable};
use dom::eventtarget::EventTarget;
use dom::messageevent::MessageEvent;
-use hyper::header::Host;
+use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use js::jsapi::{JSAutoCompartment, JSAutoRequest, RootedValue};
use js::jsapi::{JS_GetArrayBufferData, JS_NewArrayBuffer};
use js::jsval::UndefinedValue;
use libc::{uint32_t, uint8_t};
+use net_traits::ControlMsg::WebsocketConnect;
+use net_traits::MessageData;
use net_traits::hosts::replace_hosts;
+use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent};
use ref_slice::ref_slice;
use script_task::ScriptTaskEventCategory::WebSocketEvent;
use script_task::{CommonScriptMsg, Runnable};
use std::borrow::ToOwned;
use std::cell::Cell;
use std::ptr;
-use std::sync::{Arc, Mutex};
+use std::thread;
use util::str::DOMString;
-use util::task::spawn_named;
-use websocket::client::receiver::Receiver;
use websocket::client::request::Url;
-use websocket::client::sender::Sender;
-use websocket::header::Origin;
-use websocket::message::Type;
-use websocket::result::WebSocketResult;
-use websocket::stream::WebSocketStream;
-use websocket::ws::receiver::Receiver as WSReceiver;
-use websocket::ws::sender::Sender as Sender_Object;
use websocket::ws::util::url::parse_url;
-use websocket::{Client, Message};
#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
enum WebSocketRequestState {
@@ -56,14 +49,6 @@ enum WebSocketRequestState {
Closed = 3,
}
-no_jsmanaged_fields!(Sender<WebSocketStream>);
-
-#[derive(HeapSizeOf)]
-enum MessageData {
- Text(String),
- Binary(Vec<u8>),
-}
-
// list of blacklist ports according to
// http://mxr.mozilla.org/mozilla-central/source/netwerk/base/nsIOService.cpp#87
const BLOCKED_PORTS_LIST: &'static [u16] = &[
@@ -154,7 +139,7 @@ pub struct WebSocket {
buffered_amount: Cell<u32>,
clearing_buffer: Cell<bool>, //Flag to tell if there is a running task to clear buffered_amount
#[ignore_heap_size_of = "Defined in std"]
- sender: DOMRefCell<Option<Arc<Mutex<Sender<WebSocketStream>>>>>,
+ sender: DOMRefCell<Option<IpcSender<WebSocketDomAction>>>,
failed: Cell<bool>, //Flag to tell if websocket was closed due to failure
full: Cell<bool>, //Flag to tell if websocket queue is full
clean_close: Cell<bool>, //Flag to tell if the websocket closed cleanly (not due to full or fail)
@@ -163,29 +148,6 @@ pub struct WebSocket {
binary_type: Cell<BinaryType>,
}
-/// *Establish a WebSocket Connection* as defined in RFC 6455.
-fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool),
- origin: String)
- -> WebSocketResult<(Sender<WebSocketStream>, Receiver<WebSocketStream>)> {
- // URL that we actually fetch from the network, after applying the replacements
- // specified in the hosts file.
-
- let host = Host {
- hostname: resource_url.serialize_host().unwrap(),
- port: resource_url.port_or_default()
- };
-
- let mut request = try!(Client::connect(net_url));
- request.headers.set(Origin(origin));
- request.headers.set(host);
-
- let response = try!(request.send());
- try!(response.validate());
-
- Ok(response.begin().split())
-}
-
-
impl WebSocket {
fn new_inherited(global: GlobalRef, url: Url) -> WebSocket {
WebSocket {
@@ -217,8 +179,9 @@ impl WebSocket {
-> Fallible<Root<WebSocket>> {
// Step 1.
let resource_url = try!(Url::parse(&url).map_err(|_| Error::Syntax));
- let net_url = try!(parse_url(&replace_hosts(&resource_url)).map_err(|_| Error::Syntax));
-
+ // Although we do this replace and parse operation again in the resource task,
+ // we try here to be able to immediately throw a syntax error on failure.
+ let _ = try!(parse_url(&replace_hosts(&resource_url)).map_err(|_| Error::Syntax));
// Step 2: Disallow https -> ws connections.
// Step 3: Potentially block access to some ports.
@@ -257,62 +220,58 @@ impl WebSocket {
let address = Trusted::new(global.get_cx(), ws.r(), global.networking_task_source());
let origin = global.get_url().serialize();
+
+ let connect_data = WebSocketConnectData {
+ resource_url: resource_url.clone(),
+ origin: origin,
+ };
+
+ // Create the interface for communication with the resource task
+ let (dom_action_sender, resource_action_receiver):
+ (IpcSender<WebSocketDomAction>,
+ IpcReceiver<WebSocketDomAction>) = ipc::channel().unwrap();
+ let (resource_event_sender, dom_event_receiver):
+ (IpcSender<WebSocketNetworkEvent>,
+ IpcReceiver<WebSocketNetworkEvent>) = ipc::channel().unwrap();
+
+ let connect = WebSocketCommunicate {
+ event_sender: resource_event_sender,
+ action_receiver: resource_action_receiver,
+ };
+
+ let resource_task = global.resource_task();
+ let _ = resource_task.send(WebsocketConnect(connect, connect_data));
+
+ *ws.sender.borrow_mut() = Some(dom_action_sender);
+
+ let moved_address = address.clone();
let sender = global.networking_task_source();
- spawn_named(format!("WebSocket connection to {}", ws.Url()), move || {
- // Step 8: Protocols.
-
- // Step 9.
- let channel = establish_a_websocket_connection(&resource_url, net_url, origin);
- let (ws_sender, mut receiver) = match channel {
- Ok(channel) => channel,
- Err(e) => {
- debug!("Failed to establish a WebSocket connection: {:?}", e);
- let task = box CloseTask {
- addr: address,
- };
- sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)).unwrap();
- return;
- }
- };
- let ws_sender = Arc::new(Mutex::new(ws_sender));
- let open_task = box ConnectionEstablishedTask {
- addr: address.clone(),
- sender: ws_sender.clone(),
- };
- sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_task)).unwrap();
-
- for message in receiver.incoming_messages() {
- let message: Message = match message {
- Ok(m) => m,
- Err(_) => break,
- };
- let message = match message.opcode {
- Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()),
- Type::Binary => MessageData::Binary(message.payload.into_owned()),
- Type::Ping => {
- let pong = Message::pong(message.payload);
- ws_sender.lock().unwrap().send_message(&pong).unwrap();
- continue;
+ thread::spawn(move || {
+ while let Ok(event) = dom_event_receiver.recv() {
+ match event {
+ WebSocketNetworkEvent::ConnectionEstablished => {
+ let open_task = box ConnectionEstablishedTask {
+ addr: moved_address.clone(),
+ };
+ sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_task)).unwrap();
},
- Type::Pong => continue,
- Type::Close => {
- ws_sender.lock().unwrap().send_message(&message).unwrap();
+ WebSocketNetworkEvent::MessageReceived(message) => {
+ let message_task = box MessageReceivedTask {
+ address: moved_address.clone(),
+ message: message,
+ };
+ sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_task)).unwrap();
+ },
+ WebSocketNetworkEvent::Close => {
let task = box CloseTask {
- addr: address,
+ addr: moved_address.clone(),
};
sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)).unwrap();
- break;
},
- };
- let message_task = box MessageReceivedTask {
- address: address.clone(),
- message: message,
- };
- sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_task)).unwrap();
+ }
}
});
-
// Step 7.
Ok(ws)
}
@@ -408,7 +367,7 @@ impl WebSocketMethods for WebSocket {
if send_data {
let mut other_sender = self.sender.borrow_mut();
let my_sender = other_sender.as_mut().unwrap();
- let _ = my_sender.lock().unwrap().send_message(&Message::text(data.0));
+ let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Text(data.0)));
}
Ok(())
@@ -427,7 +386,7 @@ impl WebSocketMethods for WebSocket {
if send_data {
let mut other_sender = self.sender.borrow_mut();
let my_sender = other_sender.as_mut().unwrap();
- let _ = my_sender.lock().unwrap().send_message(&Message::binary(data.clone_bytes()));
+ let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Binary(data.clone_bytes())));
}
Ok(())
@@ -443,11 +402,10 @@ impl WebSocketMethods for WebSocket {
if let Some(sender) = sender.as_mut() {
let code: u16 = this.code.get();
let reason = this.reason.borrow().clone();
- let _ = sender.lock().unwrap().send_message(&Message::close_because(code, reason));
+ let _ = sender.send(WebSocketDomAction::Close(code, reason));
}
}
-
if let Some(code) = code {
//Fail if the supplied code isn't normal and isn't reserved for libraries, frameworks, and applications
if code != close_code::NORMAL && (code < 3000 || code > 4999) {
@@ -490,15 +448,11 @@ impl WebSocketMethods for WebSocket {
/// Task queued when *the WebSocket connection is established*.
struct ConnectionEstablishedTask {
addr: Trusted<WebSocket>,
- sender: Arc<Mutex<Sender<WebSocketStream>>>,
}
impl Runnable for ConnectionEstablishedTask {
fn handler(self: Box<Self>) {
let ws = self.addr.root();
-
- *ws.sender.borrow_mut() = Some(self.sender);
-
// Step 1: Protocols.
// Step 2.
diff --git a/components/servo/Cargo.lock b/components/servo/Cargo.lock
index 51d117f6198..a76580c6122 100644
--- a/components/servo/Cargo.lock
+++ b/components/servo/Cargo.lock
@@ -1156,6 +1156,7 @@ dependencies = [
"url 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"util 0.0.1",
"uuid 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
+ "websocket 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1204,6 +1205,7 @@ dependencies = [
"stb_image 0.2.0 (git+https://github.com/servo/rust-stb-image)",
"url 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"util 0.0.1",
+ "websocket 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]