diff options
Diffstat (limited to 'components/script')
-rw-r--r-- | components/script/dom/bindings/structuredclone.rs | 8 | ||||
-rw-r--r-- | components/script/dom/globalscope.rs | 203 | ||||
-rw-r--r-- | components/script/dom/messageport.rs | 85 | ||||
-rw-r--r-- | components/script/dom/readablestream.rs | 275 | ||||
-rw-r--r-- | components/script/dom/underlyingsourcecontainer.rs | 51 | ||||
-rw-r--r-- | components/script/dom/writablestream.rs | 146 | ||||
-rw-r--r-- | components/script/dom/writablestreamdefaultcontroller.rs | 152 |
7 files changed, 864 insertions, 56 deletions
diff --git a/components/script/dom/bindings/structuredclone.rs b/components/script/dom/bindings/structuredclone.rs index fb701406762..e53318516a9 100644 --- a/components/script/dom/bindings/structuredclone.rs +++ b/components/script/dom/bindings/structuredclone.rs @@ -41,6 +41,7 @@ use crate::dom::dompoint::DOMPoint; use crate::dom::dompointreadonly::DOMPointReadOnly; use crate::dom::globalscope::GlobalScope; use crate::dom::messageport::MessagePort; +use crate::dom::readablestream::ReadableStream; use crate::realms::{AlreadyInRealm, InRealm, enter_realm}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; @@ -57,6 +58,7 @@ pub(super) enum StructuredCloneTags { Principals = 0xFFFF8003, DomPointReadOnly = 0xFFFF8004, DomPoint = 0xFFFF8005, + ReadableStream = 0xFFFF8006, Max = 0xFFFFFFFF, } @@ -74,6 +76,7 @@ impl From<TransferrableInterface> for StructuredCloneTags { fn from(v: TransferrableInterface) -> Self { match v { TransferrableInterface::MessagePort => StructuredCloneTags::MessagePort, + TransferrableInterface::ReadableStream => StructuredCloneTags::ReadableStream, } } } @@ -250,6 +253,7 @@ fn receiver_for_type( ) -> fn(&GlobalScope, &mut StructuredDataReader, u64, RawMutableHandleObject) -> Result<(), ()> { match val { TransferrableInterface::MessagePort => receive_object::<MessagePort>, + TransferrableInterface::ReadableStream => receive_object::<ReadableStream>, } } @@ -375,6 +379,7 @@ type TransferOperation = unsafe fn( fn transfer_for_type(val: TransferrableInterface) -> TransferOperation { match val { TransferrableInterface::MessagePort => try_transfer::<MessagePort>, + TransferrableInterface::ReadableStream => try_transfer::<ReadableStream>, } } @@ -421,6 +426,7 @@ unsafe fn can_transfer_for_type( } match transferable { TransferrableInterface::MessagePort => can_transfer::<MessagePort>(obj, cx), + TransferrableInterface::ReadableStream => can_transfer::<ReadableStream>(obj, cx), } } @@ -507,6 +513,7 @@ pub(crate) struct StructuredDataReader { pub(crate) blob_impls: Option<HashMap<BlobId, BlobImpl>>, /// A map of serialized points. pub(crate) points: Option<HashMap<DomPointId, DomPoint>>, + pub(crate) readable_streams: Option<Vec<DomRoot<ReadableStream>>>, } /// A data holder for transferred and serialized objects. @@ -597,6 +604,7 @@ pub(crate) fn read( blob_impls: data.blobs.take(), points: data.points.take(), errors: DOMErrorRecord { message: None }, + readable_streams: None, }; let sc_reader_ptr = &mut sc_reader as *mut _; unsafe { diff --git a/components/script/dom/globalscope.rs b/components/script/dom/globalscope.rs index b488daf24e4..e56f4693e35 100644 --- a/components/script/dom/globalscope.rs +++ b/components/script/dom/globalscope.rs @@ -121,7 +121,7 @@ use crate::dom::paintworkletglobalscope::PaintWorkletGlobalScope; use crate::dom::performance::Performance; use crate::dom::performanceobserver::VALID_ENTRY_TYPES; use crate::dom::promise::Promise; -use crate::dom::readablestream::ReadableStream; +use crate::dom::readablestream::{CrossRealmTransformReadable, ReadableStream}; use crate::dom::serviceworker::ServiceWorker; use crate::dom::serviceworkerregistration::ServiceWorkerRegistration; use crate::dom::trustedtypepolicyfactory::TrustedTypePolicyFactory; @@ -133,6 +133,7 @@ use crate::dom::webgpu::identityhub::IdentityHub; use crate::dom::window::Window; use crate::dom::workerglobalscope::WorkerGlobalScope; use crate::dom::workletglobalscope::WorkletGlobalScope; +use crate::dom::writablestream::CrossRealmTransformWritable; use crate::messaging::{CommonScriptMsg, ScriptEventLoopReceiver, ScriptEventLoopSender}; use crate::microtask::{Microtask, MicrotaskQueue, UserMicrotask}; use crate::network_listener::{NetworkListener, PreInvoke}; @@ -456,6 +457,13 @@ pub(crate) struct ManagedMessagePort { pending: bool, /// Has the port been closed? If closed, it can be dropped and later GC'ed. closed: bool, + /// Note: it may seem strange to use a pair of options, versus for example an enum. + /// But it looks like tranform streams will require both of those in their transfer. + /// This will be resolved when we reach that point of the implementation. + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable> + cross_realm_transform_readable: Option<CrossRealmTransformReadable>, + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + cross_realm_transform_writable: Option<CrossRealmTransformWritable>, } /// State representing whether this global is currently managing broadcast channels. @@ -940,6 +948,11 @@ impl GlobalScope { *self.broadcast_channel_state.borrow_mut() = BroadcastChannelState::UnManaged; } + /// <https://html.spec.whatwg.org/multipage/#disentangle> + pub(crate) fn disentangle_port(&self, _port: &MessagePort) { + // TODO: #36465 + } + /// <https://html.spec.whatwg.org/multipage/#entangle> pub(crate) fn entangle_ports(&self, port1: MessagePortId, port2: MessagePortId) { if let MessagePortState::Managed(_id, message_ports) = @@ -1007,10 +1020,10 @@ impl GlobalScope { /// <https://html.spec.whatwg.org/multipage/#dom-messageport-start> pub(crate) fn start_message_port(&self, port_id: &MessagePortId) { - if let MessagePortState::Managed(_id, message_ports) = + let message_buffer = if let MessagePortState::Managed(_id, message_ports) = &mut *self.message_port_state.borrow_mut() { - let message_buffer = match message_ports.get_mut(port_id) { + match message_ports.get_mut(port_id) { None => panic!("start_message_port called on a unknown port."), Some(managed_port) => { if let Some(port_impl) = managed_port.port_impl.as_mut() { @@ -1019,21 +1032,14 @@ impl GlobalScope { panic!("managed-port has no port-impl."); } }, - }; - if let Some(message_buffer) = message_buffer { - for task in message_buffer { - let port_id = *port_id; - let this = Trusted::new(self); - self.task_manager().port_message_queue().queue( - task!(process_pending_port_messages: move || { - let target_global = this.root(); - target_global.route_task_to_port(port_id, task, CanGc::note()); - }), - ); - } } } else { - warn!("start_message_port called on a global not managing any ports.") + return warn!("start_message_port called on a global not managing any ports."); + }; + if let Some(message_buffer) = message_buffer { + for task in message_buffer { + self.route_task_to_port(*port_id, task, CanGc::note()); + } } } @@ -1208,13 +1214,64 @@ impl GlobalScope { } } - /// Route the task to be handled by the relevant port. + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable> + /// The "Add a handler for port’s message event with the following steps:" + /// and "Add a handler for port’s messageerror event with the following steps:" part. + pub(crate) fn note_cross_realm_transform_readable( + &self, + cross_realm_transform_readable: &CrossRealmTransformReadable, + port_id: &MessagePortId, + ) { + let MessagePortState::Managed(_id, message_ports) = + &mut *self.message_port_state.borrow_mut() + else { + unreachable!( + "Cross realm transform readable must be called on a global managing ports" + ); + }; + + let Some(managed_port) = message_ports.get_mut(port_id) else { + unreachable!("Cross realm transform readable must match a managed port"); + }; + + managed_port.cross_realm_transform_readable = Some(cross_realm_transform_readable.clone()); + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + /// The "Add a handler for port’s message event with the following steps:" + /// and "Add a handler for port’s messageerror event with the following steps:" part. + pub(crate) fn note_cross_realm_transform_writable( + &self, + cross_realm_transform_writable: &CrossRealmTransformWritable, + port_id: &MessagePortId, + ) { + let MessagePortState::Managed(_id, message_ports) = + &mut *self.message_port_state.borrow_mut() + else { + unreachable!( + "Cross realm transform writable must be called on a global managing ports" + ); + }; + + let Some(managed_port) = message_ports.get_mut(port_id) else { + unreachable!("Cross realm transform writable must match a managed port"); + }; + + managed_port.cross_realm_transform_writable = Some(cross_realm_transform_writable.clone()); + } + + /// Custom routing logic, followed by the task steps of + /// <https://html.spec.whatwg.org/multipage/#message-port-post-message-steps> pub(crate) fn route_task_to_port( &self, port_id: MessagePortId, task: PortMessageTask, can_gc: CanGc, ) { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut cross_realm_transform_readable = None); + rooted!(in(*cx) let mut cross_realm_transform_writable = None); + let should_dispatch = if let MessagePortState::Managed(_id, message_ports) = &mut *self.message_port_state.borrow_mut() { @@ -1228,9 +1285,14 @@ impl GlobalScope { // If the port is not enabled yet, or if is awaiting the completion of it's transfer, // the task will be buffered and dispatched upon enablement or completion of the transfer. if let Some(port_impl) = managed_port.port_impl.as_mut() { - port_impl.handle_incoming(task).map(|to_dispatch| { + let to_dispatch = port_impl.handle_incoming(task).map(|to_dispatch| { (DomRoot::from_ref(&*managed_port.dom_port), to_dispatch) - }) + }); + cross_realm_transform_readable + .set(managed_port.cross_realm_transform_readable.clone()); + cross_realm_transform_writable + .set(managed_port.cross_realm_transform_writable.clone()); + to_dispatch } else { panic!("managed-port has no port-impl."); } @@ -1240,24 +1302,93 @@ impl GlobalScope { self.re_route_port_task(port_id, task); return; }; + + // Add a task that runs the following steps to the port message queue of targetPort: + // Note: we are in the task, and running the relevant steps. + + // Let finalTargetPort be the MessagePort in whose port message queue the task now finds itself. if let Some((dom_port, PortMessageTask { origin, data })) = should_dispatch { - // Substep 3-4 - rooted!(in(*GlobalScope::get_cx()) let mut message_clone = UndefinedValue()); + // Let messageEventTarget be finalTargetPort's message event target. + let message_event_target = dom_port.upcast(); + + // Let targetRealm be finalTargetPort's relevant realm. + // Done via the routing logic here and in the constellation: `self` is the target realm. + + // Let messageClone be deserializeRecord.[[Deserialized]]. + // Re-ordered because we need to pass it to `structuredclone::read`. + rooted!(in(*cx) let mut message_clone = UndefinedValue()); + + // Note: if this port is used to transfer a stream, we handle the events in Rust. + let has_cross_realm_tansform = cross_realm_transform_readable.is_some() || + cross_realm_transform_writable.is_some(); + + let realm = enter_realm(self); + let comp = InRealm::Entered(&realm); + + // Note: this is necessary, on top of entering the realm above, + // for the call to `GlobalScope::incumbent`, + // in `MessagePort::post_message_impl` to succeed. + let _aes = AutoEntryScript::new(self); + + // Let deserializeRecord be StructuredDeserializeWithTransfer(serializeWithTransferResult, targetRealm). + // Let newPorts be a new frozen array + // consisting of all MessagePort objects in deserializeRecord.[[TransferredValues]], + // if any, maintaining their relative order. + // Note: both done in `structuredclone::read`. if let Ok(ports) = structuredclone::read(self, data, message_clone.handle_mut()) { - // Substep 6 - // Dispatch the event, using the dom message-port. - MessageEvent::dispatch_jsval( - dom_port.upcast(), - self, - message_clone.handle(), - Some(&origin.ascii_serialization()), - None, - ports, - can_gc, - ); + // Add a handler for port’s message event with the following steps: + // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable> + if let Some(transform) = cross_realm_transform_readable.as_ref() { + transform.handle_message( + cx, + self, + &dom_port, + message_clone.handle(), + comp, + can_gc, + ); + } + + // Add a handler for port’s message event with the following steps: + // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + if let Some(transform) = cross_realm_transform_writable.as_ref() { + transform.handle_message(cx, self, message_clone.handle(), comp, can_gc); + } + + if !has_cross_realm_tansform { + // Fire an event named message at messageEventTarget, + // using MessageEvent, + // with the data attribute initialized to messageClone + // and the ports attribute initialized to newPorts. + MessageEvent::dispatch_jsval( + message_event_target, + self, + message_clone.handle(), + Some(&origin.ascii_serialization()), + None, + ports, + can_gc, + ); + } } else { - // Step 4, fire messageerror event. - MessageEvent::dispatch_error(dom_port.upcast(), self, can_gc); + // Add a handler for port’s messageerror event with the following steps: + // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable> + if let Some(transform) = cross_realm_transform_readable.as_ref() { + transform.handle_error(cx, self, &dom_port, comp, can_gc); + } + + // Add a handler for port’s messageerror event with the following steps: + // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + if let Some(transform) = cross_realm_transform_writable.as_ref() { + transform.handle_error(cx, self, &dom_port, comp, can_gc); + } + + if !has_cross_realm_tansform { + // If this throws an exception, catch it, + // fire an event named messageerror at messageEventTarget, + // using MessageEvent, and then return. + MessageEvent::dispatch_error(message_event_target, self, can_gc); + } } } } @@ -1449,6 +1580,8 @@ impl GlobalScope { dom_port: Dom::from_ref(dom_port), pending: true, closed: false, + cross_realm_transform_readable: None, + cross_realm_transform_writable: None, }, ); @@ -1471,6 +1604,8 @@ impl GlobalScope { dom_port: Dom::from_ref(dom_port), pending: false, closed: false, + cross_realm_transform_readable: None, + cross_realm_transform_writable: None, }, ); let _ = self.script_to_constellation_chan().send( diff --git a/components/script/dom/messageport.rs b/components/script/dom/messageport.rs index d299ab8156f..3e25602b365 100644 --- a/components/script/dom/messageport.rs +++ b/components/script/dom/messageport.rs @@ -5,12 +5,14 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::num::NonZeroU32; +use std::ptr; use std::rc::Rc; use base::id::{MessagePortId, MessagePortIndex, PipelineNamespaceId}; use constellation_traits::{MessagePortImpl, PortMessageTask}; use dom_struct::dom_struct; -use js::jsapi::{Heap, JSObject}; +use js::jsapi::{Heap, JS_NewObject, JSObject}; +use js::jsval::UndefinedValue; use js::rust::{CustomAutoRooter, CustomAutoRooterGuard, HandleValue}; use crate::dom::bindings::codegen::Bindings::EventHandlerBinding::EventHandlerNonNull; @@ -25,8 +27,10 @@ use crate::dom::bindings::root::DomRoot; use crate::dom::bindings::structuredclone::{self, StructuredData, StructuredDataReader}; use crate::dom::bindings::trace::RootedTraceableBox; use crate::dom::bindings::transferable::{ExtractComponents, IdFromComponents, Transferable}; +use crate::dom::bindings::utils::set_dictionary_property; use crate::dom::eventtarget::EventTarget; use crate::dom::globalscope::GlobalScope; +use crate::js::conversions::ToJSValConvertible; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; #[dom_struct] @@ -57,7 +61,7 @@ impl MessagePort { } /// Create a new port for an incoming transfer-received one. - fn new_transferred( + pub(crate) fn new_transferred( owner: &GlobalScope, transferred_port: MessagePortId, entangled_port: Option<MessagePortId>, @@ -156,6 +160,83 @@ impl MessagePort { .post_messageport_msg(*self.message_port_id(), task); Ok(()) } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-crossrealmtransformsenderror> + pub(crate) fn cross_realm_transform_send_error(&self, error: HandleValue, can_gc: CanGc) { + // Perform PackAndPostMessage(port, "error", error), + // discarding the result. + let _ = self.pack_and_post_message("error", error, can_gc); + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-packandpostmessagehandlingerror> + #[allow(unsafe_code)] + pub(crate) fn pack_and_post_message_handling_error( + &self, + type_: &str, + value: HandleValue, + can_gc: CanGc, + ) -> ErrorResult { + // Let result be PackAndPostMessage(port, type, value). + let result = self.pack_and_post_message(type_, value, can_gc); + + // If result is an abrupt completion, + if result.is_err() { + // Perform ! CrossRealmTransformSendError(port, result.[[Value]]). + // Note: we send UndefinedValue across, + // because somehow sending an error results in another error. + // The Error::DataClone, which is the only one that is sent across, + // will be created upon receipt. + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rooted_error = UndefinedValue()); + self.cross_realm_transform_send_error(rooted_error.handle(), can_gc); + } + + result + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-packandpostmessage> + #[allow(unsafe_code)] + pub(crate) fn pack_and_post_message( + &self, + type_: &str, + value: HandleValue, + _can_gc: CanGc, + ) -> ErrorResult { + let cx = GlobalScope::get_cx(); + + // Let message be OrdinaryObjectCreate(null). + rooted!(in(*cx) let mut message = unsafe { JS_NewObject(*cx, ptr::null()) }); + rooted!(in(*cx) let mut type_string = UndefinedValue()); + unsafe { + type_.to_jsval(*cx, type_string.handle_mut()); + } + + // Perform ! CreateDataProperty(message, "type", type). + unsafe { + set_dictionary_property(*cx, message.handle(), "type", type_string.handle()) + .expect("Setting the message type should not fail."); + } + + // Perform ! CreateDataProperty(message, "value", value). + unsafe { + set_dictionary_property(*cx, message.handle(), "value", value) + .expect("Setting the message value should not fail."); + } + + // Let targetPort be the port with which port is entangled, if any; otherwise let it be null. + // Done in `global.post_messageport_msg`. + + // Let options be «[ "transfer" → « » ]». + let mut rooted = CustomAutoRooter::new(vec![]); + let transfer = CustomAutoRooterGuard::new(*cx, &mut rooted); + + // Run the message port post message steps providing targetPort, message, and options. + rooted!(in(*cx) let mut message_val = UndefinedValue()); + unsafe { + message.to_jsval(*cx, message_val.handle_mut()); + } + self.post_message_impl(cx, message_val.handle(), transfer) + } } impl Transferable for MessagePort { diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs index b5048a4644d..933a14ae317 100644 --- a/components/script/dom/readablestream.rs +++ b/components/script/dom/readablestream.rs @@ -6,6 +6,7 @@ use std::cell::{Cell, RefCell}; use std::collections::VecDeque; use std::ptr::{self}; use std::rc::Rc; +use std::collections::HashMap; use dom_struct::dom_struct; use js::conversions::ToJSValConvertible; @@ -21,6 +22,10 @@ use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStra use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{ ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode, StreamPipeOptions }; +use script_bindings::str::DOMString; + +use crate::dom::domexception::{DOMErrorName, DOMException}; +use script_bindings::conversions::StringificationBehavior; use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods; use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods; use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource; @@ -45,10 +50,16 @@ use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm; use crate::dom::types::DefaultTeeUnderlyingSource; use crate::dom::underlyingsourcecontainer::UnderlyingSourceType; use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter; +use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods; +use crate::dom::messageport::MessagePort; use crate::js::conversions::FromJSValConvertible; use crate::realms::{enter_realm, InRealm}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; +use base::id::MessagePortId; +use constellation_traits::MessagePortImpl; +use crate::dom::bindings::transferable::Transferable; +use crate::dom::bindings::structuredclone::{StructuredData, StructuredDataReader}; use super::bindings::buffer_source::HeapBufferSource; use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions; @@ -1769,6 +1780,49 @@ impl ReadableStream { // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize). controller.setup(global, stream, can_gc) } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable> + fn setup_cross_realm_transform_readable( + &self, + cx: SafeJSContext, + port: &MessagePort, + can_gc: CanGc, + ) { + let port_id = port.message_port_id(); + let global = self.global(); + + // Perform ! InitializeReadableStream(stream). + // Done in `new_inherited`. + + // Let sizeAlgorithm be an algorithm that returns 1. + let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc); + + // Note: other algorithms defined in the underlying source container. + + // Let controller be a new ReadableStreamDefaultController. + let controller = ReadableStreamDefaultController::new( + &self.global(), + UnderlyingSourceType::Transfer(Dom::from_ref(port)), + 0., + size_algorithm, + can_gc, + ); + + // Add a handler for port’s message event with the following steps: + // Add a handler for port’s messageerror event with the following steps: + rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable { + controller: Dom::from_ref(&controller), + }); + global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id); + + // Enable port’s port message queue. + port.Start(); + + // Perform ! SetUpReadableStreamDefaultController + controller + .setup(DomRoot::from_ref(self), can_gc) + .expect("Setting up controller for transfer cannot fail."); + } } impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream { @@ -1942,6 +1996,145 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream { } #[allow(unsafe_code)] +/// The initial steps for the message handler for both readable and writable cross realm transforms. +/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable> +/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> +pub(crate) unsafe fn get_type_and_value_from_message( + cx: SafeJSContext, + data: SafeHandleValue, + value: SafeMutableHandleValue, + can_gc: CanGc, +) -> DOMString { + // Let data be the data of the message. + // Note: we are passed the data as argument, + // which originates in the return value of `structuredclone::read`. + + // Assert: data is an Object. + assert!(data.is_object()); + rooted!(in(*cx) let data_object = data.to_object()); + + // Let type be ! Get(data, "type"). + rooted!(in(*cx) let mut type_ = UndefinedValue()); + get_dictionary_property( + *cx, + data_object.handle(), + "type", + type_.handle_mut(), + can_gc, + ) + .expect("Getting the type should not fail."); + + // Let value be ! Get(data, "value"). + get_dictionary_property(*cx, data_object.handle(), "value", value, can_gc) + .expect("Getting the value should not fail."); + + // Assert: type is a String. + let result = unsafe { + DOMString::from_jsval(*cx, type_.handle(), StringificationBehavior::Empty) + .expect("The type of the message should be a string") + }; + let ConversionResult::Success(type_string) = result else { + unreachable!("The type of the message should be a string"); + }; + + type_string +} + +impl js::gc::Rootable for CrossRealmTransformReadable {} + +/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable> +/// A wrapper to handle `message` and `messageerror` events +/// for the port used by the transfered stream. +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +pub(crate) struct CrossRealmTransformReadable { + /// The controller used in the algorithm. + controller: Dom<ReadableStreamDefaultController>, +} + +impl CrossRealmTransformReadable { + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable> + /// Add a handler for port’s message event with the following steps: + #[allow(unsafe_code)] + pub(crate) fn handle_message( + &self, + cx: SafeJSContext, + global: &GlobalScope, + port: &MessagePort, + message: SafeHandleValue, + _realm: InRealm, + can_gc: CanGc, + ) { + rooted!(in(*cx) let mut value = UndefinedValue()); + let type_string = + unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) }; + + // If type is "chunk", + if type_string == "chunk" { + // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value). + self.controller + .enqueue(cx, value.handle(), can_gc) + .expect("Enqueing a chunk should not fail."); + } + + // Otherwise, if type is "close", + if type_string == "close" { + // Perform ! ReadableStreamDefaultControllerClose(controller). + self.controller.close(can_gc); + + // Disentangle port. + global.disentangle_port(port); + } + + // Otherwise, if type is "error", + if type_string == "error" { + if value.is_undefined() { + // Note: for DataClone errors, we send UndefinedValue across, + // because somehow sending the error results in another error. + // The error is then created here. + rooted!(in(*cx) let mut rooted_error = UndefinedValue()); + Error::DataClone(None).to_jsval(cx, global, rooted_error.handle_mut(), can_gc); + + // Perform ! ReadableStreamDefaultControllerError(controller, value). + self.controller.error(rooted_error.handle(), can_gc); + } else { + // Perform ! ReadableStreamDefaultControllerError(controller, value). + self.controller.error(value.handle(), can_gc); + } + + // Disentangle port. + global.disentangle_port(port); + } + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + /// Add a handler for port’s messageerror event with the following steps: + #[allow(unsafe_code)] + pub(crate) fn handle_error( + &self, + cx: SafeJSContext, + global: &GlobalScope, + port: &MessagePort, + _realm: InRealm, + can_gc: CanGc, + ) { + // Let error be a new "DataCloneError" DOMException. + let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc); + rooted!(in(*cx) let mut rooted_error = UndefinedValue()); + unsafe { error.to_jsval(*cx, rooted_error.handle_mut()) }; + + // Perform ! CrossRealmTransformSendError(port, error). + port.cross_realm_transform_send_error(rooted_error.handle(), can_gc); + + // Perform ! ReadableStreamDefaultControllerError(controller, error). + self.controller.error(rooted_error.handle(), can_gc); + + // Disentangle port. + global.disentangle_port(port); + } +} + +#[allow(unsafe_code)] /// Get the `done` property of an object that a read promise resolved to. pub(crate) fn get_read_promise_done( cx: SafeJSContext, @@ -1994,3 +2187,85 @@ pub(crate) fn get_read_promise_bytes( } } } + +/// <https://streams.spec.whatwg.org/#rs-transfer> +impl Transferable for ReadableStream { + type Id = MessagePortId; + type Data = MessagePortImpl; + + /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A1%E2%91%A0> + fn transfer(&self) -> Result<(MessagePortId, MessagePortImpl), ()> { + // If ! IsReadableStreamLocked(value) is true, throw a "DataCloneError" DOMException. + if self.is_locked() { + return Err(()); + } + + let global = self.global(); + let realm = enter_realm(&*global); + let comp = InRealm::Entered(&realm); + let cx = GlobalScope::get_cx(); + let can_gc = CanGc::note(); + + // Let port1 be a new MessagePort in the current Realm. + let port_1 = MessagePort::new(&global, can_gc); + global.track_message_port(&port_1, None); + + // Let port2 be a new MessagePort in the current Realm. + let port_2 = MessagePort::new(&global, can_gc); + global.track_message_port(&port_2, None); + + // Entangle port1 and port2. + global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id()); + + // Let writable be a new WritableStream in the current Realm. + let writable = WritableStream::new_with_proto(&global, None, can_gc); + + // Perform ! SetUpCrossRealmTransformWritable(writable, port1). + writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc); + + // Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false). + let promise = self.pipe_to(cx, &global, &writable, false, false, false, comp, can_gc); + + // Set promise.[[PromiseIsHandled]] to true. + promise.set_promise_is_handled(); + + // Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »). + port_2.transfer() + } + + /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A1%E2%91%A0> + fn transfer_receive( + owner: &GlobalScope, + id: MessagePortId, + port_impl: MessagePortImpl, + ) -> Result<DomRoot<Self>, ()> { + let cx = GlobalScope::get_cx(); + let can_gc = CanGc::note(); + + // Their transfer-receiving steps, given dataHolder and value, are: + // Note: dataHolder is used in `structuredclone.rs`, and value is created here. + let value = ReadableStream::new_with_proto(owner, None, can_gc); + + // Let deserializedRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[port]], the current Realm). + // Done with the `Deserialize` derive of `MessagePortImpl`. + + // Let port be deserializedRecord.[[Deserialized]]. + let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?; + + // Perform ! SetUpCrossRealmTransformReadable(value, port). + value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc); + Ok(value) + } + + /// Note: we are relying on the port transfer, so the data returned here are related to the port. + fn serialized_storage(data: StructuredData<'_>) -> &mut Option<HashMap<Self::Id, Self::Data>> { + match data { + StructuredData::Reader(r) => &mut r.port_impls, + StructuredData::Writer(w) => &mut w.ports, + } + } + + fn deserialized_storage(reader: &mut StructuredDataReader) -> &mut Option<Vec<DomRoot<Self>>> { + &mut reader.readable_streams + } +} diff --git a/components/script/dom/underlyingsourcecontainer.rs b/components/script/dom/underlyingsourcecontainer.rs index e219bbb7b8a..cf396825d4f 100644 --- a/components/script/dom/underlyingsourcecontainer.rs +++ b/components/script/dom/underlyingsourcecontainer.rs @@ -7,7 +7,7 @@ use std::rc::Rc; use dom_struct::dom_struct; use js::jsapi::{Heap, IsPromiseObject, JSObject}; -use js::jsval::JSVal; +use js::jsval::{JSVal, UndefinedValue}; use js::rust::{Handle as SafeHandle, HandleObject, HandleValue as SafeHandleValue, IntoHandle}; use crate::dom::bindings::callback::ExceptionHandling; @@ -18,6 +18,7 @@ use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_w use crate::dom::bindings::root::{Dom, DomRoot}; use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource; use crate::dom::globalscope::GlobalScope; +use crate::dom::messageport::MessagePort; use crate::dom::promise::Promise; use crate::script_runtime::CanGc; @@ -40,6 +41,8 @@ pub(crate) enum UnderlyingSourceType { Js(JsUnderlyingSource, Heap<*mut JSObject>), /// Tee Tee(Dom<DefaultTeeUnderlyingSource>), + /// Transfer, with the port used in some of the algorithms. + Transfer(Dom<MessagePort>), } impl UnderlyingSourceType { @@ -49,7 +52,8 @@ impl UnderlyingSourceType { self, UnderlyingSourceType::Memory(_) | UnderlyingSourceType::Blob(_) | - UnderlyingSourceType::FetchResponse + UnderlyingSourceType::FetchResponse | + UnderlyingSourceType::Transfer(_) ) } @@ -128,6 +132,28 @@ impl UnderlyingSourceContainer { // Call the cancel algorithm for the appropriate branch. tee_underlyin_source.cancel_algorithm(reason, can_gc) }, + UnderlyingSourceType::Transfer(port) => { + // Let cancelAlgorithm be the following steps, taking a reason argument: + // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable + + // Let result be PackAndPostMessageHandlingError(port, "error", reason). + let result = port.pack_and_post_message_handling_error("error", reason, can_gc); + + // Disentangle port. + self.global().disentangle_port(port); + + let promise = Promise::new(&self.global(), can_gc); + + // If result is an abrupt completion, + if let Err(error) = result { + // Return a promise rejected with result.[[Value]]. + promise.reject_error(error, can_gc); + } else { + // Otherwise, return a promise resolved with undefined. + promise.resolve_native(&(), can_gc); + } + Some(Ok(promise)) + }, _ => None, } } @@ -158,6 +184,22 @@ impl UnderlyingSourceContainer { // Call the pull algorithm for the appropriate branch. Some(Ok(tee_underlyin_source.pull_algorithm(can_gc))) }, + UnderlyingSourceType::Transfer(port) => { + // Let pullAlgorithm be the following steps: + // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable + + let cx = GlobalScope::get_cx(); + + // Perform ! PackAndPostMessage(port, "pull", undefined). + rooted!(in(*cx) let mut value = UndefinedValue()); + port.pack_and_post_message("pull", value.handle(), can_gc) + .expect("Sending pull should not fail."); + + // Return a promise resolved with undefined. + let promise = Promise::new(&self.global(), can_gc); + promise.resolve_native(&(), can_gc); + Some(Ok(promise)) + }, // Note: other source type have no pull steps for now. _ => None, } @@ -217,6 +259,11 @@ impl UnderlyingSourceContainer { // Let startAlgorithm be an algorithm that returns undefined. None }, + UnderlyingSourceType::Transfer(_) => { + // Let startAlgorithm be an algorithm that returns undefined. + // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable + None + }, _ => None, } } diff --git a/components/script/dom/writablestream.rs b/components/script/dom/writablestream.rs index bbc5d6316b6..c8a91cd7475 100644 --- a/components/script/dom/writablestream.rs +++ b/components/script/dom/writablestream.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::cell::Cell; +use std::cell::{Cell, RefCell}; use std::collections::VecDeque; use std::mem; use std::ptr::{self}; @@ -15,6 +15,7 @@ use js::rust::{ HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, MutableHandleValue as SafeMutableHandleValue, }; +use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods; use crate::dom::bindings::cell::DomRefCell; use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy; @@ -22,16 +23,20 @@ use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSi use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods; use crate::dom::bindings::conversions::ConversionResult; use crate::dom::bindings::error::{Error, Fallible}; -use crate::dom::bindings::reflector::{Reflector, reflect_dom_object_with_proto}; +use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto}; use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm}; +use crate::dom::domexception::{DOMErrorName, DOMException}; use crate::dom::globalscope::GlobalScope; +use crate::dom::messageport::MessagePort; use crate::dom::promise::Promise; use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; +use crate::dom::readablestream::get_type_and_value_from_message; use crate::dom::writablestreamdefaultcontroller::{ UnderlyingSinkType, WritableStreamDefaultController, }; use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter; +use crate::js::conversions::ToJSValConvertible; use crate::realms::{InRealm, enter_realm}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; @@ -175,7 +180,7 @@ impl WritableStream { } } - fn new_with_proto( + pub(crate) fn new_with_proto( global: &GlobalScope, proto: Option<SafeHandleObject>, can_gc: CanGc, @@ -834,6 +839,58 @@ impl WritableStream { // Set stream.[[backpressure]] to backpressure. self.set_backpressure(backpressure); } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + pub(crate) fn setup_cross_realm_transform_writable( + &self, + cx: SafeJSContext, + port: &MessagePort, + can_gc: CanGc, + ) { + let port_id = port.message_port_id(); + let global = self.global(); + + // Perform ! InitializeWritableStream(stream). + // Done in `new_inherited`. + + // Let sizeAlgorithm be an algorithm that returns 1. + // Re-ordered because of the need to pass it to `new`. + let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc); + + // Note: other algorithms defined in the controller at call site. + + // Let backpressurePromise be a new promise. + let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc)))); + + // Let controller be a new WritableStreamDefaultController. + let controller = WritableStreamDefaultController::new( + &global, + UnderlyingSinkType::Transfer { + backpressure_promise: backpressure_promise.clone(), + port: Dom::from_ref(port), + }, + &UnderlyingSink::empty(), + 1.0, + size_algorithm, + can_gc, + ); + + // Add a handler for port’s message event with the following steps: + // Add a handler for port’s messageerror event with the following steps: + rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable { + controller: Dom::from_ref(&controller), + backpressure_promise: backpressure_promise.clone(), + }); + global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id); + + // Enable port’s port message queue. + port.Start(); + + // Perform ! SetUpWritableStreamDefaultController + controller + .setup(cx, &global, self, &None, can_gc) + .expect("Setup for transfer cannot fail"); + } } impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream { @@ -967,3 +1024,86 @@ impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream { self.aquire_default_writer(cx, &global, can_gc) } } + +impl js::gc::Rootable for CrossRealmTransformWritable {} + +/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> +/// A wrapper to handle `message` and `messageerror` events +/// for the port used by the transfered stream. +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +pub(crate) struct CrossRealmTransformWritable { + /// The controller used in the algorithm. + controller: Dom<WritableStreamDefaultController>, + + /// The `backpressurePromise` used in the algorithm. + #[ignore_malloc_size_of = "Rc is hard"] + backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>, +} + +impl CrossRealmTransformWritable { + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + /// Add a handler for port’s message event with the following steps: + #[allow(unsafe_code)] + pub(crate) fn handle_message( + &self, + cx: SafeJSContext, + global: &GlobalScope, + message: SafeHandleValue, + _realm: InRealm, + can_gc: CanGc, + ) { + rooted!(in(*cx) let mut value = UndefinedValue()); + let type_string = + unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) }; + + // If type is "pull", + // Done below as the steps are the same for both types. + + // Otherwise, if type is "error", + if type_string == "error" { + // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value). + self.controller + .error_if_needed(cx, value.handle(), global, can_gc); + } + + let backpressure_promise = self.backpressure_promise.borrow_mut().take(); + + // Note: the below steps are for both "pull" and "error" types. + // If backpressurePromise is not undefined, + if let Some(promise) = backpressure_promise { + // Resolve backpressurePromise with undefined. + promise.resolve_native(&(), can_gc); + + // Set backpressurePromise to undefined. + // Done above with `take`. + } + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + /// Add a handler for port’s messageerror event with the following steps: + #[allow(unsafe_code)] + pub(crate) fn handle_error( + &self, + cx: SafeJSContext, + global: &GlobalScope, + port: &MessagePort, + _realm: InRealm, + can_gc: CanGc, + ) { + // Let error be a new "DataCloneError" DOMException. + let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc); + rooted!(in(*cx) let mut rooted_error = UndefinedValue()); + unsafe { error.to_jsval(*cx, rooted_error.handle_mut()) }; + + // Perform ! CrossRealmTransformSendError(port, error). + port.cross_realm_transform_send_error(rooted_error.handle(), can_gc); + + // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error). + self.controller + .error_if_needed(cx, rooted_error.handle(), global, can_gc); + + // Disentangle port. + global.disentangle_port(port); + } +} diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs index 691e85408e9..0037672cbdf 100644 --- a/components/script/dom/writablestreamdefaultcontroller.rs +++ b/components/script/dom/writablestreamdefaultcontroller.rs @@ -19,9 +19,10 @@ use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{ }; use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods; use crate::dom::bindings::error::{Error, ErrorToJsval}; -use crate::dom::bindings::reflector::{Reflector, reflect_dom_object}; +use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object}; use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; use crate::dom::globalscope::GlobalScope; +use crate::dom::messageport::MessagePort; use crate::dom::promise::Promise; use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize}; @@ -135,6 +136,57 @@ impl Callback for StartAlgorithmRejectionHandler { } } +impl js::gc::Rootable for TransferBackPressurePromiseReaction {} + +/// Reacting to backpressurePromise as part of the `writeAlgorithm` of +/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> +#[derive(JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct TransferBackPressurePromiseReaction { + /// The result of reacting to backpressurePromise. + #[ignore_malloc_size_of = "Rc is hard"] + result_promise: Rc<Promise>, + + /// The backpressurePromise. + #[ignore_malloc_size_of = "Rc is hard"] + backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>, + + /// The chunk received by the `writeAlgorithm`. + #[ignore_malloc_size_of = "mozjs"] + chunk: Box<Heap<JSVal>>, + + /// The port used in the algorithm. + port: Dom<MessagePort>, +} + +impl Callback for TransferBackPressurePromiseReaction { + /// Reacting to backpressurePromise with the following fulfillment steps: + fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) { + let global = self.result_promise.global(); + // Set backpressurePromise to a new promise. + *self.backpressure_promise.borrow_mut() = Some(Promise::new(&global, can_gc)); + + // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk). + rooted!(in(*cx) let mut chunk = UndefinedValue()); + chunk.set(self.chunk.get()); + let result = + self.port + .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc); + + // Disentangle port. + global.disentangle_port(&self.port); + + // If result is an abrupt completion, + if let Err(error) = result { + // Return a promise rejected with result.[[Value]]. + self.result_promise.reject_error(error, can_gc); + } else { + // Otherwise, return a promise resolved with undefined. + self.result_promise.resolve_native(&(), can_gc); + } + } +} + impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {} /// The fulfillment handler for @@ -215,14 +267,16 @@ impl Callback for WriteAlgorithmRejectionHandler { } /// The type of sink algorithms we are using. -#[allow(dead_code)] -#[derive(JSTraceable, MallocSizeOf, PartialEq)] +#[derive(JSTraceable, PartialEq)] pub enum UnderlyingSinkType { /// Algorithms are provided by Js callbacks. Js, /// Algorithms supporting streams transfer are implemented in Rust. - /// TODO: implement transfer. - Transfer, + /// The promise and port used in those algorithms are stored here. + Transfer { + backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>, + port: Dom<MessagePort>, + }, } /// <https://streams.spec.whatwg.org/#ws-default-controller-class> @@ -230,8 +284,7 @@ pub enum UnderlyingSinkType { pub struct WritableStreamDefaultController { reflector_: Reflector, - /// The type of underlying sink used. Besides the default JS one, - /// there will be others for stream transfer, and for transform stream. + #[ignore_malloc_size_of = "Rc is hard"] underlying_sink_type: UnderlyingSinkType, /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm> @@ -409,6 +462,11 @@ impl WritableStreamDefaultController { Promise::new_resolved(global, cx, result.get(), can_gc) } } else { + // Note: we are either here because the Js algorithm is none, + // or because we are suppporting a stream transfer as + // part of #abstract-opdef-setupcrossrealmtransformwritable + // and the logic is the same for both. + // Let startAlgorithm be an algorithm that returns undefined. Promise::new_resolved(global, cx, (), can_gc) }; @@ -480,9 +538,26 @@ impl WritableStreamDefaultController { promise }) }, - UnderlyingSinkType::Transfer => { - // TODO: implement transfer. - Promise::new_resolved(global, cx, (), can_gc) + UnderlyingSinkType::Transfer { ref port, .. } => { + // The steps from the `abortAlgorithm` at + // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + + // Let result be PackAndPostMessageHandlingError(port, "error", reason). + let result = port.pack_and_post_message_handling_error("error", reason, can_gc); + + // Disentangle port. + global.disentangle_port(port); + + let promise = Promise::new(global, can_gc); + + // If result is an abrupt completion, return a promise rejected with result.[[Value]] + if let Err(error) = result { + promise.reject_error(error, can_gc); + } else { + // Otherwise, return a promise resolved with undefined. + promise.reject_native(&(), can_gc); + } + promise }, }; @@ -521,9 +596,45 @@ impl WritableStreamDefaultController { promise }) }, - UnderlyingSinkType::Transfer => { - // TODO: implement transfer. - Promise::new_resolved(global, cx, (), can_gc) + UnderlyingSinkType::Transfer { + ref backpressure_promise, + ref port, + .. + } => { + // The steps from the `writeAlgorithm` at + // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + + { + // If backpressurePromise is undefined, + // set backpressurePromise to a promise resolved with undefined. + let mut backpressure_promise = backpressure_promise.borrow_mut(); + if backpressure_promise.is_none() { + *backpressure_promise = Some(Promise::new_resolved(global, cx, (), can_gc)); + } + } + + // Return the result of reacting to backpressurePromise with the following fulfillment steps: + let result_promise = Promise::new(global, can_gc); + rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction { + port: port.clone(), + backpressure_promise: backpressure_promise.clone(), + chunk: Heap::boxed(chunk.get()), + result_promise: result_promise.clone(), + })); + let handler = PromiseNativeHandler::new( + global, + fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), + None, + can_gc, + ); + let realm = enter_realm(global); + let comp = InRealm::Entered(&realm); + backpressure_promise + .borrow() + .as_ref() + .expect("Promise must be some by now.") + .append_native_handler(&handler, comp, can_gc); + result_promise }, } } @@ -551,8 +662,19 @@ impl WritableStreamDefaultController { promise }) }, - UnderlyingSinkType::Transfer => { - // TODO: implement transfer. + UnderlyingSinkType::Transfer { ref port, .. } => { + // The steps from the `closeAlgorithm` at + // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + + // Perform ! PackAndPostMessage(port, "close", undefined). + rooted!(in(*cx) let mut value = UndefinedValue()); + port.pack_and_post_message("close", value.handle(), can_gc) + .expect("Sending close should not fail."); + + // Disentangle port. + global.disentangle_port(port); + + // Return a promise resolved with undefined. Promise::new_resolved(global, cx, (), can_gc) }, } |