aboutsummaryrefslogtreecommitdiffstats
path: root/components/net/websocket_loader.rs
diff options
context:
space:
mode:
authorNova Fallen <nfallen@seas.upenn.edu>2015-12-02 23:54:24 -0500
committerNova Fallen <nfallen@seas.upenn.edu>2015-12-08 02:04:40 -0500
commite8c8277f34f35ee23f70568c4f725a1ffcc0f66b (patch)
treeda9d122b957072283a926b27cae8e61b04a379e5 /components/net/websocket_loader.rs
parent9f9d3570fc27d08e2afff2b90444956c3c5a66a4 (diff)
downloadservo-e8c8277f34f35ee23f70568c4f725a1ffcc0f66b.tar.gz
servo-e8c8277f34f35ee23f70568c4f725a1ffcc0f66b.zip
move websocket creation to resource task
Diffstat (limited to 'components/net/websocket_loader.rs')
-rw-r--r--components/net/websocket_loader.rs125
1 files changed, 125 insertions, 0 deletions
diff --git a/components/net/websocket_loader.rs b/components/net/websocket_loader.rs
new file mode 100644
index 00000000000..2ae54dc8eb2
--- /dev/null
+++ b/components/net/websocket_loader.rs
@@ -0,0 +1,125 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use hyper::header::Host;
+use net_traits::MessageData;
+use net_traits::hosts::replace_hosts;
+use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent};
+use std::sync::{Arc, Mutex};
+use std::thread;
+use util::task::spawn_named;
+use websocket::client::receiver::Receiver;
+use websocket::client::request::Url;
+use websocket::client::sender::Sender;
+use websocket::header::Origin;
+use websocket::message::Type;
+use websocket::result::WebSocketResult;
+use websocket::stream::WebSocketStream;
+use websocket::ws::receiver::Receiver as WSReceiver;
+use websocket::ws::sender::Sender as Sender_Object;
+use websocket::ws::util::url::parse_url;
+use websocket::{Client, Message};
+
+/// *Establish a WebSocket Connection* as defined in RFC 6455.
+fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool),
+ origin: String)
+ -> WebSocketResult<(Sender<WebSocketStream>, Receiver<WebSocketStream>)> {
+
+ let host = Host {
+ hostname: resource_url.serialize_host().unwrap(),
+ port: resource_url.port_or_default()
+ };
+
+ let mut request = try!(Client::connect(net_url));
+ request.headers.set(Origin(origin));
+ request.headers.set(host);
+
+ let response = try!(request.send());
+ try!(response.validate());
+
+ Ok(response.begin().split())
+}
+
+pub fn init(connect: WebSocketCommunicate, connect_data: WebSocketConnectData) {
+ spawn_named(format!("WebSocket connection to {}", connect_data.resource_url), move || {
+ // Step 8: Protocols.
+
+ // Step 9.
+
+ // URL that we actually fetch from the network, after applying the replacements
+ // specified in the hosts file.
+ let net_url_result = parse_url(&replace_hosts(&connect_data.resource_url));
+ let net_url = match net_url_result {
+ Ok(net_url) => net_url,
+ Err(e) => {
+ debug!("Failed to establish a WebSocket connection: {:?}", e);
+ let _ = connect.event_sender.send(WebSocketNetworkEvent::Close);
+ return;
+ }
+ };
+ let channel = establish_a_websocket_connection(&connect_data.resource_url,
+ net_url,
+ connect_data.origin);
+ let (ws_sender, mut receiver) = match channel {
+ Ok(channel) => {
+ let _ = connect.event_sender.send(WebSocketNetworkEvent::ConnectionEstablished);
+ channel
+ },
+ Err(e) => {
+ debug!("Failed to establish a WebSocket connection: {:?}", e);
+ let _ = connect.event_sender.send(WebSocketNetworkEvent::Close);
+ return;
+ }
+
+ };
+
+ let ws_sender = Arc::new(Mutex::new(ws_sender));
+
+ let ws_sender_incoming = ws_sender.clone();
+ let resource_event_sender = connect.event_sender;
+ thread::spawn(move || {
+ for message in receiver.incoming_messages() {
+ let message: Message = match message {
+ Ok(m) => m,
+ Err(_) => break,
+ };
+ let message = match message.opcode {
+ Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()),
+ Type::Binary => MessageData::Binary(message.payload.into_owned()),
+ Type::Ping => {
+ let pong = Message::pong(message.payload);
+ ws_sender_incoming.lock().unwrap().send_message(&pong).unwrap();
+ continue;
+ },
+ Type::Pong => continue,
+ Type::Close => {
+ ws_sender_incoming.lock().unwrap().send_message(&message).unwrap();
+ let _ = resource_event_sender.send(WebSocketNetworkEvent::Close);
+ break;
+ },
+ };
+ let _ = resource_event_sender.send(WebSocketNetworkEvent::MessageReceived(message));
+ }
+ });
+
+ let ws_sender_outgoing = ws_sender.clone();
+ let resource_action_receiver = connect.action_receiver;
+ thread::spawn(move || {
+ while let Ok(dom_action) = resource_action_receiver.recv() {
+ match dom_action {
+ WebSocketDomAction::SendMessage(MessageData::Text(data)) => {
+ ws_sender_outgoing.lock().unwrap().send_message(&Message::text(data)).unwrap();
+ },
+ WebSocketDomAction::SendMessage(MessageData::Binary(data)) => {
+ ws_sender_outgoing.lock().unwrap().send_message(&Message::binary(data)).unwrap();
+ },
+ WebSocketDomAction::Close(code, reason) => {
+ ws_sender_outgoing.lock().unwrap()
+ .send_message(&Message::close_because(code, reason)).unwrap();
+ },
+ }
+ }
+ });
+ });
+}