aboutsummaryrefslogtreecommitdiffstats
path: root/components/net/websocket_loader.rs
blob: 056650778456feadf4262ad3e1a009be754768b5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use hyper::header::Host;
use net_traits::MessageData;
use net_traits::hosts::replace_hosts;
use net_traits::unwrap_websocket_protocol;
use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent};
use std::ascii::AsciiExt;
use std::sync::{Arc, Mutex};
use std::thread;
use util::task::spawn_named;
use websocket::client::receiver::Receiver;
use websocket::client::request::Url;
use websocket::client::sender::Sender;
use websocket::header::{Headers, Origin, WebSocketProtocol};
use websocket::message::Type;
use websocket::result::{WebSocketError, WebSocketResult};
use websocket::stream::WebSocketStream;
use websocket::ws::receiver::Receiver as WSReceiver;
use websocket::ws::sender::Sender as Sender_Object;
use websocket::ws::util::url::parse_url;
use websocket::{Client, Message};

/// *Establish a WebSocket Connection* as defined in RFC 6455.
fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool),
                                    origin: String, protocols: Vec<String>)
    -> WebSocketResult<(Headers, Sender<WebSocketStream>, Receiver<WebSocketStream>)> {

    let host = Host {
        hostname: resource_url.serialize_host().unwrap(),
        port: resource_url.port_or_default()
    };

    let mut request = try!(Client::connect(net_url));
    request.headers.set(Origin(origin));
    request.headers.set(host);
    if !protocols.is_empty() {
        request.headers.set(WebSocketProtocol(protocols.clone()));
    };

    let response = try!(request.send());
    try!(response.validate());

    {
       let protocol_in_use = unwrap_websocket_protocol(response.protocol());
        if let Some(protocol_name) = protocol_in_use {
                if !protocols.is_empty() && !protocols.iter().any(|p| p.eq_ignore_ascii_case(protocol_name)) {
                    return Err(WebSocketError::ProtocolError("Protocol in Use not in client-supplied protocol list"));
            };
        };
    }

    let headers = response.headers.clone();
    let (sender, receiver) = response.begin().split();
    Ok((headers, sender, receiver))

}

pub fn init(connect: WebSocketCommunicate, connect_data: WebSocketConnectData) {
    spawn_named(format!("WebSocket connection to {}", connect_data.resource_url), move || {
        // Step 8: Protocols.

        // Step 9.

        // URL that we actually fetch from the network, after applying the replacements
        // specified in the hosts file.
        let net_url_result = parse_url(&replace_hosts(&connect_data.resource_url));
        let net_url = match net_url_result {
            Ok(net_url) => net_url,
            Err(e) => {
                debug!("Failed to establish a WebSocket connection: {:?}", e);
                let _ = connect.event_sender.send(WebSocketNetworkEvent::Close);
                return;
            }
        };
        let channel = establish_a_websocket_connection(&connect_data.resource_url,
                                                       net_url,
                                                       connect_data.origin,
                                                       connect_data.protocols.clone());
        let (_, ws_sender, mut receiver) = match channel {
            Ok(channel) => {
                let _ = connect.event_sender.send(WebSocketNetworkEvent::ConnectionEstablished(channel.0.clone(),
                                                                                               connect_data.protocols));
                channel
            },
            Err(e) => {
                debug!("Failed to establish a WebSocket connection: {:?}", e);
                let _ = connect.event_sender.send(WebSocketNetworkEvent::Close);
                return;
            }

        };

        let ws_sender = Arc::new(Mutex::new(ws_sender));

        let ws_sender_incoming = ws_sender.clone();
        let resource_event_sender = connect.event_sender;
        thread::spawn(move || {
            for message in receiver.incoming_messages() {
                let message: Message = match message {
                    Ok(m) => m,
                    Err(_) => break,
                };
                let message = match message.opcode {
                    Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()),
                    Type::Binary => MessageData::Binary(message.payload.into_owned()),
                    Type::Ping => {
                        let pong = Message::pong(message.payload);
                        ws_sender_incoming.lock().unwrap().send_message(&pong).unwrap();
                        continue;
                    },
                    Type::Pong => continue,
                    Type::Close => {
                        ws_sender_incoming.lock().unwrap().send_message(&message).unwrap();
                        let _ = resource_event_sender.send(WebSocketNetworkEvent::Close);
                        break;
                    },
                };
                let _ = resource_event_sender.send(WebSocketNetworkEvent::MessageReceived(message));
            }
        });

        let ws_sender_outgoing = ws_sender.clone();
        let resource_action_receiver = connect.action_receiver;
        thread::spawn(move || {
            while let Ok(dom_action) = resource_action_receiver.recv() {
                match dom_action {
                    WebSocketDomAction::SendMessage(MessageData::Text(data)) => {
                        ws_sender_outgoing.lock().unwrap().send_message(&Message::text(data)).unwrap();
                    },
                    WebSocketDomAction::SendMessage(MessageData::Binary(data)) => {
                        ws_sender_outgoing.lock().unwrap().send_message(&Message::binary(data)).unwrap();
                    },
                    WebSocketDomAction::Close(code, reason) => {
                        ws_sender_outgoing.lock().unwrap()
                        .send_message(&Message::close_because(code, reason)).unwrap();
                    },
                }
            }
        });
    });
}