diff options
Diffstat (limited to 'components/script/dom/websocket.rs')
-rw-r--r-- | components/script/dom/websocket.rs | 447 |
1 files changed, 253 insertions, 194 deletions
diff --git a/components/script/dom/websocket.rs b/components/script/dom/websocket.rs index ec69eb93346..a3c9848ed9b 100644 --- a/components/script/dom/websocket.rs +++ b/components/script/dom/websocket.rs @@ -1,48 +1,48 @@ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -use dom::bindings::cell::DOMRefCell; -use dom::bindings::codegen::Bindings::BlobBinding::BlobMethods; -use dom::bindings::codegen::Bindings::EventHandlerBinding::EventHandlerNonNull; -use dom::bindings::codegen::Bindings::WebSocketBinding; -use dom::bindings::codegen::Bindings::WebSocketBinding::{BinaryType, WebSocketMethods}; -use dom::bindings::codegen::UnionTypes::StringOrStringSequence; -use dom::bindings::conversions::ToJSValConvertible; -use dom::bindings::error::{Error, ErrorResult, Fallible}; -use dom::bindings::inheritance::Castable; -use dom::bindings::js::Root; -use dom::bindings::refcounted::Trusted; -use dom::bindings::reflector::{DomObject, reflect_dom_object}; -use dom::bindings::str::{DOMString, USVString, is_token}; -use dom::blob::{Blob, BlobImpl}; -use dom::closeevent::CloseEvent; -use dom::event::{Event, EventBubbles, EventCancelable}; -use dom::eventtarget::EventTarget; -use dom::globalscope::GlobalScope; -use dom::messageevent::MessageEvent; -use dom::urlhelper::UrlHelper; + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use crate::dom::bindings::cell::DomRefCell; +use crate::dom::bindings::codegen::Bindings::BlobBinding::BlobMethods; +use crate::dom::bindings::codegen::Bindings::WebSocketBinding::{BinaryType, WebSocketMethods}; +use crate::dom::bindings::codegen::UnionTypes::StringOrStringSequence; +use crate::dom::bindings::conversions::ToJSValConvertible; +use crate::dom::bindings::error::{Error, ErrorResult, Fallible}; +use crate::dom::bindings::inheritance::Castable; +use crate::dom::bindings::refcounted::Trusted; +use crate::dom::bindings::reflector::{reflect_dom_object, DomObject}; +use crate::dom::bindings::root::DomRoot; +use crate::dom::bindings::str::{is_token, DOMString, USVString}; +use crate::dom::blob::Blob; +use crate::dom::closeevent::CloseEvent; +use crate::dom::event::{Event, EventBubbles, EventCancelable}; +use crate::dom::eventtarget::EventTarget; +use crate::dom::globalscope::GlobalScope; +use crate::dom::messageevent::MessageEvent; +use crate::script_runtime::CommonScriptMsg; +use crate::script_runtime::ScriptThreadEventCategory::WebSocketEvent; +use crate::task::{TaskCanceller, TaskOnce}; +use crate::task_source::websocket::WebsocketTaskSource; +use crate::task_source::TaskSource; use dom_struct::dom_struct; use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; -use js::jsapi::JSAutoCompartment; +use ipc_channel::router::ROUTER; +use js::jsapi::{JSAutoRealm, JSObject}; use js::jsval::UndefinedValue; -use js::typedarray::{ArrayBuffer, CreateWith}; -use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent}; -use net_traits::CoreResourceMsg::WebsocketConnect; +use js::rust::CustomAutoRooterGuard; +use js::typedarray::{ArrayBuffer, ArrayBufferView, CreateWith}; +use net_traits::request::{Referrer, RequestBuilder, RequestMode}; use net_traits::MessageData; -use script_runtime::CommonScriptMsg; -use script_runtime::ScriptThreadEventCategory::WebSocketEvent; -use script_thread::{Runnable, RunnableWrapper}; -use servo_url::ServoUrl; -use std::ascii::AsciiExt; +use net_traits::{CoreResourceMsg, FetchChannels}; +use net_traits::{WebSocketDomAction, WebSocketNetworkEvent}; +use profile_traits::ipc as ProfiledIpc; +use script_traits::serializable::BlobImpl; +use servo_url::{ImmutableOrigin, ServoUrl}; use std::borrow::ToOwned; use std::cell::Cell; use std::ptr; -use std::thread; -use task_source::TaskSource; -use task_source::networking::NetworkingTaskSource; -#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)] +#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)] enum WebSocketRequestState { Connecting = 0, Open = 1, @@ -68,30 +68,34 @@ mod close_code { pub const TLS_FAILED: u16 = 1015; } -pub fn close_the_websocket_connection(address: Trusted<WebSocket>, - task_source: &NetworkingTaskSource, - wrapper: &RunnableWrapper, - code: Option<u16>, - reason: String) { - let close_task = box CloseTask { +fn close_the_websocket_connection( + address: Trusted<WebSocket>, + task_source: &WebsocketTaskSource, + canceller: &TaskCanceller, + code: Option<u16>, + reason: String, +) { + let close_task = CloseTask { address: address, failed: false, code: code, reason: Some(reason), }; - task_source.queue_with_wrapper(close_task, &wrapper).unwrap(); + let _ = task_source.queue_with_canceller(close_task, &canceller); } -pub fn fail_the_websocket_connection(address: Trusted<WebSocket>, - task_source: &NetworkingTaskSource, - wrapper: &RunnableWrapper) { - let close_task = box CloseTask { +fn fail_the_websocket_connection( + address: Trusted<WebSocket>, + task_source: &WebsocketTaskSource, + canceller: &TaskCanceller, +) { + let close_task = CloseTask { address: address, failed: true, code: Some(close_code::ABNORMAL), reason: None, }; - task_source.queue_with_wrapper(close_task, &wrapper).unwrap(); + let _ = task_source.queue_with_canceller(close_task, &canceller); } #[dom_struct] @@ -101,36 +105,41 @@ pub struct WebSocket { ready_state: Cell<WebSocketRequestState>, buffered_amount: Cell<u64>, clearing_buffer: Cell<bool>, //Flag to tell if there is a running thread to clear buffered_amount - #[ignore_heap_size_of = "Defined in std"] - sender: DOMRefCell<Option<IpcSender<WebSocketDomAction>>>, + #[ignore_malloc_size_of = "Defined in std"] + sender: IpcSender<WebSocketDomAction>, binary_type: Cell<BinaryType>, - protocol: DOMRefCell<String>, //Subprotocol selected by server + protocol: DomRefCell<String>, //Subprotocol selected by server } impl WebSocket { - fn new_inherited(url: ServoUrl) -> WebSocket { + fn new_inherited(url: ServoUrl, sender: IpcSender<WebSocketDomAction>) -> WebSocket { WebSocket { eventtarget: EventTarget::new_inherited(), url: url, ready_state: Cell::new(WebSocketRequestState::Connecting), buffered_amount: Cell::new(0), clearing_buffer: Cell::new(false), - sender: DOMRefCell::new(None), + sender: sender, binary_type: Cell::new(BinaryType::Blob), - protocol: DOMRefCell::new("".to_owned()), + protocol: DomRefCell::new("".to_owned()), } } - fn new(global: &GlobalScope, url: ServoUrl) -> Root<WebSocket> { - reflect_dom_object(box WebSocket::new_inherited(url), - global, WebSocketBinding::Wrap) + fn new( + global: &GlobalScope, + url: ServoUrl, + sender: IpcSender<WebSocketDomAction>, + ) -> DomRoot<WebSocket> { + reflect_dom_object(Box::new(WebSocket::new_inherited(url, sender)), global) } - /// https://html.spec.whatwg.org/multipage/#dom-websocket - pub fn Constructor(global: &GlobalScope, - url: DOMString, - protocols: Option<StringOrStringSequence>) - -> Fallible<Root<WebSocket>> { + /// <https://html.spec.whatwg.org/multipage/#dom-websocket> + #[allow(non_snake_case)] + pub fn Constructor( + global: &GlobalScope, + url: DOMString, + protocols: Option<StringOrStringSequence>, + ) -> Fallible<DomRoot<WebSocket>> { // Steps 1-2. let url_record = ServoUrl::parse(&url).or(Err(Error::Syntax))?; @@ -146,13 +155,11 @@ impl WebSocket { } // Step 5. - let protocols = protocols.map_or(vec![], |p| { - match p { - StringOrStringSequence::String(string) => vec![string.into()], - StringOrStringSequence::StringSequence(seq) => { - seq.into_iter().map(String::from).collect() - }, - } + let protocols = protocols.map_or(vec![], |p| match p { + StringOrStringSequence::String(string) => vec![string.into()], + StringOrStringSequence::StringSequence(seq) => { + seq.into_iter().map(String::from).collect() + }, }); // Step 6. @@ -160,7 +167,10 @@ impl WebSocket { // https://tools.ietf.org/html/rfc6455#section-4.1 // Handshake requirements, step 10 - if protocols[i + 1..].iter().any(|p| p.eq_ignore_ascii_case(protocol)) { + if protocols[i + 1..] + .iter() + .any(|p| p.eq_ignore_ascii_case(protocol)) + { return Err(Error::Syntax); } @@ -170,63 +180,65 @@ impl WebSocket { } } - let ws = WebSocket::new(global, url_record.clone()); + // Create the interface for communication with the resource thread + let (dom_action_sender, resource_action_receiver): ( + IpcSender<WebSocketDomAction>, + IpcReceiver<WebSocketDomAction>, + ) = ipc::channel().unwrap(); + let (resource_event_sender, dom_event_receiver): ( + IpcSender<WebSocketNetworkEvent>, + ProfiledIpc::IpcReceiver<WebSocketNetworkEvent>, + ) = ProfiledIpc::channel(global.time_profiler_chan().clone()).unwrap(); + + let ws = WebSocket::new(global, url_record.clone(), dom_action_sender); let address = Trusted::new(&*ws); - let connect_data = WebSocketConnectData { - resource_url: url_record, - origin: UrlHelper::Origin(&global.get_url()).0, - protocols: protocols, - }; + // Step 8. + let request = RequestBuilder::new(url_record, Referrer::NoReferrer) + .origin(global.origin().immutable().clone()) + .mode(RequestMode::WebSocket { protocols }); - // Create the interface for communication with the resource thread - let (dom_action_sender, resource_action_receiver): - (IpcSender<WebSocketDomAction>, - IpcReceiver<WebSocketDomAction>) = ipc::channel().unwrap(); - let (resource_event_sender, dom_event_receiver): - (IpcSender<WebSocketNetworkEvent>, - IpcReceiver<WebSocketNetworkEvent>) = ipc::channel().unwrap(); - - let connect = WebSocketCommunicate { + let channels = FetchChannels::WebSocket { event_sender: resource_event_sender, action_receiver: resource_action_receiver, }; - - // Step 8. - let _ = global.core_resource_thread().send(WebsocketConnect(connect, connect_data)); - - *ws.sender.borrow_mut() = Some(dom_action_sender); - - let task_source = global.networking_task_source(); - let wrapper = global.get_runnable_wrapper(); - thread::spawn(move || { - while let Ok(event) = dom_event_receiver.recv() { - match event { - WebSocketNetworkEvent::ConnectionEstablished { protocol_in_use } => { - let open_thread = box ConnectionEstablishedTask { - address: address.clone(), - protocol_in_use, - }; - task_source.queue_with_wrapper(open_thread, &wrapper).unwrap(); - }, - WebSocketNetworkEvent::MessageReceived(message) => { - let message_thread = box MessageReceivedTask { - address: address.clone(), - message: message, - }; - task_source.queue_with_wrapper(message_thread, &wrapper).unwrap(); - }, - WebSocketNetworkEvent::Fail => { - fail_the_websocket_connection(address.clone(), - &task_source, &wrapper); - }, - WebSocketNetworkEvent::Close(code, reason) => { - close_the_websocket_connection(address.clone(), - &task_source, &wrapper, code, reason); - }, - } - } - }); + let _ = global + .core_resource_thread() + .send(CoreResourceMsg::Fetch(request, channels)); + + let task_source = global.websocket_task_source(); + let canceller = global.task_canceller(WebsocketTaskSource::NAME); + ROUTER.add_route( + dom_event_receiver.to_opaque(), + Box::new(move |message| match message.to().unwrap() { + WebSocketNetworkEvent::ConnectionEstablished { protocol_in_use } => { + let open_thread = ConnectionEstablishedTask { + address: address.clone(), + protocol_in_use, + }; + let _ = task_source.queue_with_canceller(open_thread, &canceller); + }, + WebSocketNetworkEvent::MessageReceived(message) => { + let message_thread = MessageReceivedTask { + address: address.clone(), + message: message, + }; + let _ = task_source.queue_with_canceller(message_thread, &canceller); + }, + WebSocketNetworkEvent::Fail => { + fail_the_websocket_connection(address.clone(), &task_source, &canceller); + }, + WebSocketNetworkEvent::Close(code, reason) => { + close_the_websocket_connection( + address.clone(), + &task_source, + &canceller, + code, + reason, + ); + }, + }), + ); // Step 7. Ok(ws) @@ -246,7 +258,7 @@ impl WebSocket { match data_byte_len.checked_add(self.buffered_amount.get()) { None => panic!(), - Some(new_amount) => self.buffered_amount.set(new_amount) + Some(new_amount) => self.buffered_amount.set(new_amount), }; if return_after_buffer { @@ -256,18 +268,27 @@ impl WebSocket { if !self.clearing_buffer.get() && self.ready_state.get() == WebSocketRequestState::Open { self.clearing_buffer.set(true); - let task = box BufferedAmountTask { - address: address, - }; + let task = Box::new(BufferedAmountTask { address: address }); + let pipeline_id = self.global().pipeline_id(); self.global() .script_chan() - .send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)) + // TODO: Use a dedicated `websocket-task-source` task source instead. + .send(CommonScriptMsg::Task( + WebSocketEvent, + task, + Some(pipeline_id), + WebsocketTaskSource::NAME, + )) .unwrap(); } Ok(true) } + + pub fn origin(&self) -> ImmutableOrigin { + self.url.origin() + } } impl WebSocketMethods for WebSocket { @@ -310,18 +331,18 @@ impl WebSocketMethods for WebSocket { // https://html.spec.whatwg.org/multipage/#dom-websocket-protocol fn Protocol(&self) -> DOMString { - DOMString::from(self.protocol.borrow().clone()) + DOMString::from(self.protocol.borrow().clone()) } // https://html.spec.whatwg.org/multipage/#dom-websocket-send fn Send(&self, data: USVString) -> ErrorResult { let data_byte_len = data.0.as_bytes().len() as u64; - let send_data = try!(self.send_impl(data_byte_len)); + let send_data = self.send_impl(data_byte_len)?; if send_data { - let mut other_sender = self.sender.borrow_mut(); - let my_sender = other_sender.as_mut().unwrap(); - let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Text(data.0))); + let _ = self + .sender + .send(WebSocketDomAction::SendMessage(MessageData::Text(data.0))); } Ok(()) @@ -334,15 +355,43 @@ impl WebSocketMethods for WebSocket { If the buffer limit is reached in the first place, there are likely other major problems */ let data_byte_len = blob.Size(); - let send_data = try!(self.send_impl(data_byte_len)); + let send_data = self.send_impl(data_byte_len)?; if send_data { - let mut other_sender = self.sender.borrow_mut(); - let my_sender = other_sender.as_mut().unwrap(); let bytes = blob.get_bytes().unwrap_or(vec![]); - let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Binary(bytes))); + let _ = self + .sender + .send(WebSocketDomAction::SendMessage(MessageData::Binary(bytes))); + } + + Ok(()) + } + + // https://html.spec.whatwg.org/multipage/#dom-websocket-send + fn Send__(&self, array: CustomAutoRooterGuard<ArrayBuffer>) -> ErrorResult { + let bytes = array.to_vec(); + let data_byte_len = bytes.len(); + let send_data = self.send_impl(data_byte_len as u64)?; + + if send_data { + let _ = self + .sender + .send(WebSocketDomAction::SendMessage(MessageData::Binary(bytes))); } + Ok(()) + } + + // https://html.spec.whatwg.org/multipage/#dom-websocket-send + fn Send___(&self, array: CustomAutoRooterGuard<ArrayBufferView>) -> ErrorResult { + let bytes = array.to_vec(); + let data_byte_len = bytes.len(); + let send_data = self.send_impl(data_byte_len as u64)?; + if send_data { + let _ = self + .sender + .send(WebSocketDomAction::SendMessage(MessageData::Binary(bytes))); + } Ok(()) } @@ -355,50 +404,54 @@ impl WebSocketMethods for WebSocket { } } if let Some(ref reason) = reason { - if reason.0.as_bytes().len() > 123 { //reason cannot be larger than 123 bytes + if reason.0.as_bytes().len() > 123 { + //reason cannot be larger than 123 bytes return Err(Error::Syntax); } } match self.ready_state.get() { - WebSocketRequestState::Closing | WebSocketRequestState::Closed => {} //Do nothing - WebSocketRequestState::Connecting => { //Connection is not yet established + WebSocketRequestState::Closing | WebSocketRequestState::Closed => {}, //Do nothing + WebSocketRequestState::Connecting => { + //Connection is not yet established /*By setting the state to closing, the open function - will abort connecting the websocket*/ + will abort connecting the websocket*/ self.ready_state.set(WebSocketRequestState::Closing); let address = Trusted::new(self); - let task_source = self.global().networking_task_source(); - fail_the_websocket_connection(address, &task_source, &self.global().get_runnable_wrapper()); - } + // TODO: use a dedicated task source, + // https://html.spec.whatwg.org/multipage/#websocket-task-source + // When making the switch, also update the task_canceller call. + let task_source = self.global().websocket_task_source(); + fail_the_websocket_connection( + address, + &task_source, + &self.global().task_canceller(WebsocketTaskSource::NAME), + ); + }, WebSocketRequestState::Open => { self.ready_state.set(WebSocketRequestState::Closing); // Kick off _Start the WebSocket Closing Handshake_ // https://tools.ietf.org/html/rfc6455#section-7.1.2 let reason = reason.map(|reason| reason.0); - let mut other_sender = self.sender.borrow_mut(); - let my_sender = other_sender.as_mut().unwrap(); - let _ = my_sender.send(WebSocketDomAction::Close(code, reason)); - } + let _ = self.sender.send(WebSocketDomAction::Close(code, reason)); + }, } Ok(()) //Return Ok } } - /// Task queued when *the WebSocket connection is established*. -/// https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol:concept-websocket-established +/// <https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol:concept-websocket-established> struct ConnectionEstablishedTask { address: Trusted<WebSocket>, protocol_in_use: Option<String>, } -impl Runnable for ConnectionEstablishedTask { - fn name(&self) -> &'static str { "ConnectionEstablishedTask" } - - /// https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol:concept-websocket-established - fn handler(self: Box<Self>) { +impl TaskOnce for ConnectionEstablishedTask { + /// <https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol:concept-websocket-established> + fn run_once(self) { let ws = self.address.root(); // Step 1. @@ -421,15 +474,13 @@ struct BufferedAmountTask { address: Trusted<WebSocket>, } -impl Runnable for BufferedAmountTask { +impl TaskOnce for BufferedAmountTask { // See https://html.spec.whatwg.org/multipage/#dom-websocket-bufferedamount // // To be compliant with standards, we need to reset bufferedAmount only when the event loop // reaches step 1. In our implementation, the bytes will already have been sent on a background // thread. - fn name(&self) -> &'static str { "BufferedAmountTask" } - - fn handler(self: Box<Self>) { + fn run_once(self) { let ws = self.address.root(); ws.buffered_amount.set(0); @@ -444,10 +495,8 @@ struct CloseTask { reason: Option<String>, } -impl Runnable for CloseTask { - fn name(&self) -> &'static str { "CloseTask" } - - fn handler(self: Box<Self>) { +impl TaskOnce for CloseTask { + fn run_once(self) { let ws = self.address.root(); if ws.ready_state.get() == WebSocketRequestState::Closed { @@ -470,13 +519,15 @@ impl Runnable for CloseTask { let clean_close = !self.failed; let code = self.code.unwrap_or(close_code::NO_STATUS); let reason = DOMString::from(self.reason.unwrap_or("".to_owned())); - let close_event = CloseEvent::new(&ws.global(), - atom!("close"), - EventBubbles::DoesNotBubble, - EventCancelable::NotCancelable, - clean_close, - code, - reason); + let close_event = CloseEvent::new( + &ws.global(), + atom!("close"), + EventBubbles::DoesNotBubble, + EventCancelable::NotCancelable, + clean_close, + code, + reason, + ); close_event.upcast::<Event>().fire(ws.upcast()); } } @@ -486,14 +537,15 @@ struct MessageReceivedTask { message: MessageData, } -impl Runnable for MessageReceivedTask { - fn name(&self) -> &'static str { "MessageReceivedTask" } - +impl TaskOnce for MessageReceivedTask { #[allow(unsafe_code)] - fn handler(self: Box<Self>) { + fn run_once(self) { let ws = self.address.root(); - debug!("MessageReceivedTask::handler({:p}): readyState={:?}", &*ws, - ws.ready_state.get()); + debug!( + "MessageReceivedTask::handler({:p}): readyState={:?}", + &*ws, + ws.ready_state.get() + ); // Step 1. if ws.ready_state.get() != WebSocketRequestState::Open { @@ -505,30 +557,37 @@ impl Runnable for MessageReceivedTask { // global.get_cx() returns a valid `JSContext` pointer, so this is safe. unsafe { let cx = global.get_cx(); - let _ac = JSAutoCompartment::new(cx, ws.reflector().get_jsobject().get()); - rooted!(in(cx) let mut message = UndefinedValue()); + let _ac = JSAutoRealm::new(*cx, ws.reflector().get_jsobject().get()); + rooted!(in(*cx) let mut message = UndefinedValue()); match self.message { - MessageData::Text(text) => text.to_jsval(cx, message.handle_mut()), - MessageData::Binary(data) => { - match ws.binary_type.get() { - BinaryType::Blob => { - let blob = Blob::new(&global, BlobImpl::new_from_bytes(data), "".to_owned()); - blob.to_jsval(cx, message.handle_mut()); - } - BinaryType::Arraybuffer => { - rooted!(in(cx) let mut array_buffer = ptr::null_mut()); - assert!(ArrayBuffer::create(cx, - CreateWith::Slice(&data), - array_buffer.handle_mut()) - .is_ok()); - - (*array_buffer).to_jsval(cx, message.handle_mut()); - } - - } + MessageData::Text(text) => text.to_jsval(*cx, message.handle_mut()), + MessageData::Binary(data) => match ws.binary_type.get() { + BinaryType::Blob => { + let blob = + Blob::new(&global, BlobImpl::new_from_bytes(data, "".to_owned())); + blob.to_jsval(*cx, message.handle_mut()); + }, + BinaryType::Arraybuffer => { + rooted!(in(*cx) let mut array_buffer = ptr::null_mut::<JSObject>()); + assert!(ArrayBuffer::create( + *cx, + CreateWith::Slice(&data), + array_buffer.handle_mut() + ) + .is_ok()); + + (*array_buffer).to_jsval(*cx, message.handle_mut()); + }, }, } - MessageEvent::dispatch_jsval(ws.upcast(), &global, message.handle()); + MessageEvent::dispatch_jsval( + ws.upcast(), + &global, + message.handle(), + Some(&ws.origin().ascii_serialization()), + None, + vec![], + ); } } } |