diff options
Diffstat (limited to 'components/script/dom/websocket.rs')
-rw-r--r-- | components/script/dom/websocket.rs | 156 |
1 files changed, 55 insertions, 101 deletions
diff --git a/components/script/dom/websocket.rs b/components/script/dom/websocket.rs index 79bdd0fdfd6..125ab9065ee 100644 --- a/components/script/dom/websocket.rs +++ b/components/script/dom/websocket.rs @@ -21,32 +21,25 @@ use dom::closeevent::CloseEvent; use dom::event::{Event, EventBubbles, EventCancelable}; use dom::eventtarget::EventTarget; use dom::messageevent::MessageEvent; -use hyper::header::Host; +use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; use js::jsapi::{JSAutoCompartment, JSAutoRequest, RootedValue}; use js::jsapi::{JS_GetArrayBufferData, JS_NewArrayBuffer}; use js::jsval::UndefinedValue; use libc::{uint32_t, uint8_t}; +use net_traits::ControlMsg::WebsocketConnect; +use net_traits::MessageData; use net_traits::hosts::replace_hosts; +use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent}; use ref_slice::ref_slice; use script_task::ScriptTaskEventCategory::WebSocketEvent; use script_task::{CommonScriptMsg, Runnable}; use std::borrow::ToOwned; use std::cell::Cell; use std::ptr; -use std::sync::{Arc, Mutex}; +use std::thread; use util::str::DOMString; -use util::task::spawn_named; -use websocket::client::receiver::Receiver; use websocket::client::request::Url; -use websocket::client::sender::Sender; -use websocket::header::Origin; -use websocket::message::Type; -use websocket::result::WebSocketResult; -use websocket::stream::WebSocketStream; -use websocket::ws::receiver::Receiver as WSReceiver; -use websocket::ws::sender::Sender as Sender_Object; use websocket::ws::util::url::parse_url; -use websocket::{Client, Message}; #[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)] enum WebSocketRequestState { @@ -56,14 +49,6 @@ enum WebSocketRequestState { Closed = 3, } -no_jsmanaged_fields!(Sender<WebSocketStream>); - -#[derive(HeapSizeOf)] -enum MessageData { - Text(String), - Binary(Vec<u8>), -} - // list of blacklist ports according to // http://mxr.mozilla.org/mozilla-central/source/netwerk/base/nsIOService.cpp#87 const BLOCKED_PORTS_LIST: &'static [u16] = &[ @@ -154,7 +139,7 @@ pub struct WebSocket { buffered_amount: Cell<u32>, clearing_buffer: Cell<bool>, //Flag to tell if there is a running task to clear buffered_amount #[ignore_heap_size_of = "Defined in std"] - sender: DOMRefCell<Option<Arc<Mutex<Sender<WebSocketStream>>>>>, + sender: DOMRefCell<Option<IpcSender<WebSocketDomAction>>>, failed: Cell<bool>, //Flag to tell if websocket was closed due to failure full: Cell<bool>, //Flag to tell if websocket queue is full clean_close: Cell<bool>, //Flag to tell if the websocket closed cleanly (not due to full or fail) @@ -163,29 +148,6 @@ pub struct WebSocket { binary_type: Cell<BinaryType>, } -/// *Establish a WebSocket Connection* as defined in RFC 6455. -fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool), - origin: String) - -> WebSocketResult<(Sender<WebSocketStream>, Receiver<WebSocketStream>)> { - // URL that we actually fetch from the network, after applying the replacements - // specified in the hosts file. - - let host = Host { - hostname: resource_url.serialize_host().unwrap(), - port: resource_url.port_or_default() - }; - - let mut request = try!(Client::connect(net_url)); - request.headers.set(Origin(origin)); - request.headers.set(host); - - let response = try!(request.send()); - try!(response.validate()); - - Ok(response.begin().split()) -} - - impl WebSocket { fn new_inherited(global: GlobalRef, url: Url) -> WebSocket { WebSocket { @@ -217,8 +179,9 @@ impl WebSocket { -> Fallible<Root<WebSocket>> { // Step 1. let resource_url = try!(Url::parse(&url).map_err(|_| Error::Syntax)); - let net_url = try!(parse_url(&replace_hosts(&resource_url)).map_err(|_| Error::Syntax)); - + // Although we do this replace and parse operation again in the resource task, + // we try here to be able to immediately throw a syntax error on failure. + let _ = try!(parse_url(&replace_hosts(&resource_url)).map_err(|_| Error::Syntax)); // Step 2: Disallow https -> ws connections. // Step 3: Potentially block access to some ports. @@ -257,62 +220,58 @@ impl WebSocket { let address = Trusted::new(global.get_cx(), ws.r(), global.networking_task_source()); let origin = global.get_url().serialize(); + + let connect_data = WebSocketConnectData { + resource_url: resource_url.clone(), + origin: origin, + }; + + // Create the interface for communication with the resource task + let (dom_action_sender, resource_action_receiver): + (IpcSender<WebSocketDomAction>, + IpcReceiver<WebSocketDomAction>) = ipc::channel().unwrap(); + let (resource_event_sender, dom_event_receiver): + (IpcSender<WebSocketNetworkEvent>, + IpcReceiver<WebSocketNetworkEvent>) = ipc::channel().unwrap(); + + let connect = WebSocketCommunicate { + event_sender: resource_event_sender, + action_receiver: resource_action_receiver, + }; + + let resource_task = global.resource_task(); + let _ = resource_task.send(WebsocketConnect(connect, connect_data)); + + *ws.sender.borrow_mut() = Some(dom_action_sender); + + let moved_address = address.clone(); let sender = global.networking_task_source(); - spawn_named(format!("WebSocket connection to {}", ws.Url()), move || { - // Step 8: Protocols. - - // Step 9. - let channel = establish_a_websocket_connection(&resource_url, net_url, origin); - let (ws_sender, mut receiver) = match channel { - Ok(channel) => channel, - Err(e) => { - debug!("Failed to establish a WebSocket connection: {:?}", e); - let task = box CloseTask { - addr: address, - }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)).unwrap(); - return; - } - }; - let ws_sender = Arc::new(Mutex::new(ws_sender)); - let open_task = box ConnectionEstablishedTask { - addr: address.clone(), - sender: ws_sender.clone(), - }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_task)).unwrap(); - - for message in receiver.incoming_messages() { - let message: Message = match message { - Ok(m) => m, - Err(_) => break, - }; - let message = match message.opcode { - Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()), - Type::Binary => MessageData::Binary(message.payload.into_owned()), - Type::Ping => { - let pong = Message::pong(message.payload); - ws_sender.lock().unwrap().send_message(&pong).unwrap(); - continue; + thread::spawn(move || { + while let Ok(event) = dom_event_receiver.recv() { + match event { + WebSocketNetworkEvent::ConnectionEstablished => { + let open_task = box ConnectionEstablishedTask { + addr: moved_address.clone(), + }; + sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_task)).unwrap(); }, - Type::Pong => continue, - Type::Close => { - ws_sender.lock().unwrap().send_message(&message).unwrap(); + WebSocketNetworkEvent::MessageReceived(message) => { + let message_task = box MessageReceivedTask { + address: moved_address.clone(), + message: message, + }; + sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_task)).unwrap(); + }, + WebSocketNetworkEvent::Close => { let task = box CloseTask { - addr: address, + addr: moved_address.clone(), }; sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)).unwrap(); - break; }, - }; - let message_task = box MessageReceivedTask { - address: address.clone(), - message: message, - }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_task)).unwrap(); + } } }); - // Step 7. Ok(ws) } @@ -408,7 +367,7 @@ impl WebSocketMethods for WebSocket { if send_data { let mut other_sender = self.sender.borrow_mut(); let my_sender = other_sender.as_mut().unwrap(); - let _ = my_sender.lock().unwrap().send_message(&Message::text(data.0)); + let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Text(data.0))); } Ok(()) @@ -427,7 +386,7 @@ impl WebSocketMethods for WebSocket { if send_data { let mut other_sender = self.sender.borrow_mut(); let my_sender = other_sender.as_mut().unwrap(); - let _ = my_sender.lock().unwrap().send_message(&Message::binary(data.clone_bytes())); + let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Binary(data.clone_bytes()))); } Ok(()) @@ -443,11 +402,10 @@ impl WebSocketMethods for WebSocket { if let Some(sender) = sender.as_mut() { let code: u16 = this.code.get(); let reason = this.reason.borrow().clone(); - let _ = sender.lock().unwrap().send_message(&Message::close_because(code, reason)); + let _ = sender.send(WebSocketDomAction::Close(code, reason)); } } - if let Some(code) = code { //Fail if the supplied code isn't normal and isn't reserved for libraries, frameworks, and applications if code != close_code::NORMAL && (code < 3000 || code > 4999) { @@ -490,15 +448,11 @@ impl WebSocketMethods for WebSocket { /// Task queued when *the WebSocket connection is established*. struct ConnectionEstablishedTask { addr: Trusted<WebSocket>, - sender: Arc<Mutex<Sender<WebSocketStream>>>, } impl Runnable for ConnectionEstablishedTask { fn handler(self: Box<Self>) { let ws = self.addr.root(); - - *ws.sender.borrow_mut() = Some(self.sender); - // Step 1: Protocols. // Step 2. |