aboutsummaryrefslogtreecommitdiffstats
path: root/components/script
diff options
context:
space:
mode:
Diffstat (limited to 'components/script')
-rw-r--r--components/script/dom/bindings/structuredclone.rs8
-rw-r--r--components/script/dom/globalscope.rs203
-rw-r--r--components/script/dom/messageport.rs85
-rw-r--r--components/script/dom/readablestream.rs275
-rw-r--r--components/script/dom/underlyingsourcecontainer.rs51
-rw-r--r--components/script/dom/writablestream.rs146
-rw-r--r--components/script/dom/writablestreamdefaultcontroller.rs152
7 files changed, 864 insertions, 56 deletions
diff --git a/components/script/dom/bindings/structuredclone.rs b/components/script/dom/bindings/structuredclone.rs
index fb701406762..e53318516a9 100644
--- a/components/script/dom/bindings/structuredclone.rs
+++ b/components/script/dom/bindings/structuredclone.rs
@@ -41,6 +41,7 @@ use crate::dom::dompoint::DOMPoint;
use crate::dom::dompointreadonly::DOMPointReadOnly;
use crate::dom::globalscope::GlobalScope;
use crate::dom::messageport::MessagePort;
+use crate::dom::readablestream::ReadableStream;
use crate::realms::{AlreadyInRealm, InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
@@ -57,6 +58,7 @@ pub(super) enum StructuredCloneTags {
Principals = 0xFFFF8003,
DomPointReadOnly = 0xFFFF8004,
DomPoint = 0xFFFF8005,
+ ReadableStream = 0xFFFF8006,
Max = 0xFFFFFFFF,
}
@@ -74,6 +76,7 @@ impl From<TransferrableInterface> for StructuredCloneTags {
fn from(v: TransferrableInterface) -> Self {
match v {
TransferrableInterface::MessagePort => StructuredCloneTags::MessagePort,
+ TransferrableInterface::ReadableStream => StructuredCloneTags::ReadableStream,
}
}
}
@@ -250,6 +253,7 @@ fn receiver_for_type(
) -> fn(&GlobalScope, &mut StructuredDataReader, u64, RawMutableHandleObject) -> Result<(), ()> {
match val {
TransferrableInterface::MessagePort => receive_object::<MessagePort>,
+ TransferrableInterface::ReadableStream => receive_object::<ReadableStream>,
}
}
@@ -375,6 +379,7 @@ type TransferOperation = unsafe fn(
fn transfer_for_type(val: TransferrableInterface) -> TransferOperation {
match val {
TransferrableInterface::MessagePort => try_transfer::<MessagePort>,
+ TransferrableInterface::ReadableStream => try_transfer::<ReadableStream>,
}
}
@@ -421,6 +426,7 @@ unsafe fn can_transfer_for_type(
}
match transferable {
TransferrableInterface::MessagePort => can_transfer::<MessagePort>(obj, cx),
+ TransferrableInterface::ReadableStream => can_transfer::<ReadableStream>(obj, cx),
}
}
@@ -507,6 +513,7 @@ pub(crate) struct StructuredDataReader {
pub(crate) blob_impls: Option<HashMap<BlobId, BlobImpl>>,
/// A map of serialized points.
pub(crate) points: Option<HashMap<DomPointId, DomPoint>>,
+ pub(crate) readable_streams: Option<Vec<DomRoot<ReadableStream>>>,
}
/// A data holder for transferred and serialized objects.
@@ -597,6 +604,7 @@ pub(crate) fn read(
blob_impls: data.blobs.take(),
points: data.points.take(),
errors: DOMErrorRecord { message: None },
+ readable_streams: None,
};
let sc_reader_ptr = &mut sc_reader as *mut _;
unsafe {
diff --git a/components/script/dom/globalscope.rs b/components/script/dom/globalscope.rs
index b488daf24e4..e56f4693e35 100644
--- a/components/script/dom/globalscope.rs
+++ b/components/script/dom/globalscope.rs
@@ -121,7 +121,7 @@ use crate::dom::paintworkletglobalscope::PaintWorkletGlobalScope;
use crate::dom::performance::Performance;
use crate::dom::performanceobserver::VALID_ENTRY_TYPES;
use crate::dom::promise::Promise;
-use crate::dom::readablestream::ReadableStream;
+use crate::dom::readablestream::{CrossRealmTransformReadable, ReadableStream};
use crate::dom::serviceworker::ServiceWorker;
use crate::dom::serviceworkerregistration::ServiceWorkerRegistration;
use crate::dom::trustedtypepolicyfactory::TrustedTypePolicyFactory;
@@ -133,6 +133,7 @@ use crate::dom::webgpu::identityhub::IdentityHub;
use crate::dom::window::Window;
use crate::dom::workerglobalscope::WorkerGlobalScope;
use crate::dom::workletglobalscope::WorkletGlobalScope;
+use crate::dom::writablestream::CrossRealmTransformWritable;
use crate::messaging::{CommonScriptMsg, ScriptEventLoopReceiver, ScriptEventLoopSender};
use crate::microtask::{Microtask, MicrotaskQueue, UserMicrotask};
use crate::network_listener::{NetworkListener, PreInvoke};
@@ -456,6 +457,13 @@ pub(crate) struct ManagedMessagePort {
pending: bool,
/// Has the port been closed? If closed, it can be dropped and later GC'ed.
closed: bool,
+ /// Note: it may seem strange to use a pair of options, versus for example an enum.
+ /// But it looks like tranform streams will require both of those in their transfer.
+ /// This will be resolved when we reach that point of the implementation.
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
+ cross_realm_transform_readable: Option<CrossRealmTransformReadable>,
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+ cross_realm_transform_writable: Option<CrossRealmTransformWritable>,
}
/// State representing whether this global is currently managing broadcast channels.
@@ -940,6 +948,11 @@ impl GlobalScope {
*self.broadcast_channel_state.borrow_mut() = BroadcastChannelState::UnManaged;
}
+ /// <https://html.spec.whatwg.org/multipage/#disentangle>
+ pub(crate) fn disentangle_port(&self, _port: &MessagePort) {
+ // TODO: #36465
+ }
+
/// <https://html.spec.whatwg.org/multipage/#entangle>
pub(crate) fn entangle_ports(&self, port1: MessagePortId, port2: MessagePortId) {
if let MessagePortState::Managed(_id, message_ports) =
@@ -1007,10 +1020,10 @@ impl GlobalScope {
/// <https://html.spec.whatwg.org/multipage/#dom-messageport-start>
pub(crate) fn start_message_port(&self, port_id: &MessagePortId) {
- if let MessagePortState::Managed(_id, message_ports) =
+ let message_buffer = if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
- let message_buffer = match message_ports.get_mut(port_id) {
+ match message_ports.get_mut(port_id) {
None => panic!("start_message_port called on a unknown port."),
Some(managed_port) => {
if let Some(port_impl) = managed_port.port_impl.as_mut() {
@@ -1019,21 +1032,14 @@ impl GlobalScope {
panic!("managed-port has no port-impl.");
}
},
- };
- if let Some(message_buffer) = message_buffer {
- for task in message_buffer {
- let port_id = *port_id;
- let this = Trusted::new(self);
- self.task_manager().port_message_queue().queue(
- task!(process_pending_port_messages: move || {
- let target_global = this.root();
- target_global.route_task_to_port(port_id, task, CanGc::note());
- }),
- );
- }
}
} else {
- warn!("start_message_port called on a global not managing any ports.")
+ return warn!("start_message_port called on a global not managing any ports.");
+ };
+ if let Some(message_buffer) = message_buffer {
+ for task in message_buffer {
+ self.route_task_to_port(*port_id, task, CanGc::note());
+ }
}
}
@@ -1208,13 +1214,64 @@ impl GlobalScope {
}
}
- /// Route the task to be handled by the relevant port.
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
+ /// The "Add a handler for port’s message event with the following steps:"
+ /// and "Add a handler for port’s messageerror event with the following steps:" part.
+ pub(crate) fn note_cross_realm_transform_readable(
+ &self,
+ cross_realm_transform_readable: &CrossRealmTransformReadable,
+ port_id: &MessagePortId,
+ ) {
+ let MessagePortState::Managed(_id, message_ports) =
+ &mut *self.message_port_state.borrow_mut()
+ else {
+ unreachable!(
+ "Cross realm transform readable must be called on a global managing ports"
+ );
+ };
+
+ let Some(managed_port) = message_ports.get_mut(port_id) else {
+ unreachable!("Cross realm transform readable must match a managed port");
+ };
+
+ managed_port.cross_realm_transform_readable = Some(cross_realm_transform_readable.clone());
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+ /// The "Add a handler for port’s message event with the following steps:"
+ /// and "Add a handler for port’s messageerror event with the following steps:" part.
+ pub(crate) fn note_cross_realm_transform_writable(
+ &self,
+ cross_realm_transform_writable: &CrossRealmTransformWritable,
+ port_id: &MessagePortId,
+ ) {
+ let MessagePortState::Managed(_id, message_ports) =
+ &mut *self.message_port_state.borrow_mut()
+ else {
+ unreachable!(
+ "Cross realm transform writable must be called on a global managing ports"
+ );
+ };
+
+ let Some(managed_port) = message_ports.get_mut(port_id) else {
+ unreachable!("Cross realm transform writable must match a managed port");
+ };
+
+ managed_port.cross_realm_transform_writable = Some(cross_realm_transform_writable.clone());
+ }
+
+ /// Custom routing logic, followed by the task steps of
+ /// <https://html.spec.whatwg.org/multipage/#message-port-post-message-steps>
pub(crate) fn route_task_to_port(
&self,
port_id: MessagePortId,
task: PortMessageTask,
can_gc: CanGc,
) {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut cross_realm_transform_readable = None);
+ rooted!(in(*cx) let mut cross_realm_transform_writable = None);
+
let should_dispatch = if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
@@ -1228,9 +1285,14 @@ impl GlobalScope {
// If the port is not enabled yet, or if is awaiting the completion of it's transfer,
// the task will be buffered and dispatched upon enablement or completion of the transfer.
if let Some(port_impl) = managed_port.port_impl.as_mut() {
- port_impl.handle_incoming(task).map(|to_dispatch| {
+ let to_dispatch = port_impl.handle_incoming(task).map(|to_dispatch| {
(DomRoot::from_ref(&*managed_port.dom_port), to_dispatch)
- })
+ });
+ cross_realm_transform_readable
+ .set(managed_port.cross_realm_transform_readable.clone());
+ cross_realm_transform_writable
+ .set(managed_port.cross_realm_transform_writable.clone());
+ to_dispatch
} else {
panic!("managed-port has no port-impl.");
}
@@ -1240,24 +1302,93 @@ impl GlobalScope {
self.re_route_port_task(port_id, task);
return;
};
+
+ // Add a task that runs the following steps to the port message queue of targetPort:
+ // Note: we are in the task, and running the relevant steps.
+
+ // Let finalTargetPort be the MessagePort in whose port message queue the task now finds itself.
if let Some((dom_port, PortMessageTask { origin, data })) = should_dispatch {
- // Substep 3-4
- rooted!(in(*GlobalScope::get_cx()) let mut message_clone = UndefinedValue());
+ // Let messageEventTarget be finalTargetPort's message event target.
+ let message_event_target = dom_port.upcast();
+
+ // Let targetRealm be finalTargetPort's relevant realm.
+ // Done via the routing logic here and in the constellation: `self` is the target realm.
+
+ // Let messageClone be deserializeRecord.[[Deserialized]].
+ // Re-ordered because we need to pass it to `structuredclone::read`.
+ rooted!(in(*cx) let mut message_clone = UndefinedValue());
+
+ // Note: if this port is used to transfer a stream, we handle the events in Rust.
+ let has_cross_realm_tansform = cross_realm_transform_readable.is_some() ||
+ cross_realm_transform_writable.is_some();
+
+ let realm = enter_realm(self);
+ let comp = InRealm::Entered(&realm);
+
+ // Note: this is necessary, on top of entering the realm above,
+ // for the call to `GlobalScope::incumbent`,
+ // in `MessagePort::post_message_impl` to succeed.
+ let _aes = AutoEntryScript::new(self);
+
+ // Let deserializeRecord be StructuredDeserializeWithTransfer(serializeWithTransferResult, targetRealm).
+ // Let newPorts be a new frozen array
+ // consisting of all MessagePort objects in deserializeRecord.[[TransferredValues]],
+ // if any, maintaining their relative order.
+ // Note: both done in `structuredclone::read`.
if let Ok(ports) = structuredclone::read(self, data, message_clone.handle_mut()) {
- // Substep 6
- // Dispatch the event, using the dom message-port.
- MessageEvent::dispatch_jsval(
- dom_port.upcast(),
- self,
- message_clone.handle(),
- Some(&origin.ascii_serialization()),
- None,
- ports,
- can_gc,
- );
+ // Add a handler for port’s message event with the following steps:
+ // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
+ if let Some(transform) = cross_realm_transform_readable.as_ref() {
+ transform.handle_message(
+ cx,
+ self,
+ &dom_port,
+ message_clone.handle(),
+ comp,
+ can_gc,
+ );
+ }
+
+ // Add a handler for port’s message event with the following steps:
+ // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+ if let Some(transform) = cross_realm_transform_writable.as_ref() {
+ transform.handle_message(cx, self, message_clone.handle(), comp, can_gc);
+ }
+
+ if !has_cross_realm_tansform {
+ // Fire an event named message at messageEventTarget,
+ // using MessageEvent,
+ // with the data attribute initialized to messageClone
+ // and the ports attribute initialized to newPorts.
+ MessageEvent::dispatch_jsval(
+ message_event_target,
+ self,
+ message_clone.handle(),
+ Some(&origin.ascii_serialization()),
+ None,
+ ports,
+ can_gc,
+ );
+ }
} else {
- // Step 4, fire messageerror event.
- MessageEvent::dispatch_error(dom_port.upcast(), self, can_gc);
+ // Add a handler for port’s messageerror event with the following steps:
+ // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
+ if let Some(transform) = cross_realm_transform_readable.as_ref() {
+ transform.handle_error(cx, self, &dom_port, comp, can_gc);
+ }
+
+ // Add a handler for port’s messageerror event with the following steps:
+ // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+ if let Some(transform) = cross_realm_transform_writable.as_ref() {
+ transform.handle_error(cx, self, &dom_port, comp, can_gc);
+ }
+
+ if !has_cross_realm_tansform {
+ // If this throws an exception, catch it,
+ // fire an event named messageerror at messageEventTarget,
+ // using MessageEvent, and then return.
+ MessageEvent::dispatch_error(message_event_target, self, can_gc);
+ }
}
}
}
@@ -1449,6 +1580,8 @@ impl GlobalScope {
dom_port: Dom::from_ref(dom_port),
pending: true,
closed: false,
+ cross_realm_transform_readable: None,
+ cross_realm_transform_writable: None,
},
);
@@ -1471,6 +1604,8 @@ impl GlobalScope {
dom_port: Dom::from_ref(dom_port),
pending: false,
closed: false,
+ cross_realm_transform_readable: None,
+ cross_realm_transform_writable: None,
},
);
let _ = self.script_to_constellation_chan().send(
diff --git a/components/script/dom/messageport.rs b/components/script/dom/messageport.rs
index d299ab8156f..3e25602b365 100644
--- a/components/script/dom/messageport.rs
+++ b/components/script/dom/messageport.rs
@@ -5,12 +5,14 @@
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::num::NonZeroU32;
+use std::ptr;
use std::rc::Rc;
use base::id::{MessagePortId, MessagePortIndex, PipelineNamespaceId};
use constellation_traits::{MessagePortImpl, PortMessageTask};
use dom_struct::dom_struct;
-use js::jsapi::{Heap, JSObject};
+use js::jsapi::{Heap, JS_NewObject, JSObject};
+use js::jsval::UndefinedValue;
use js::rust::{CustomAutoRooter, CustomAutoRooterGuard, HandleValue};
use crate::dom::bindings::codegen::Bindings::EventHandlerBinding::EventHandlerNonNull;
@@ -25,8 +27,10 @@ use crate::dom::bindings::root::DomRoot;
use crate::dom::bindings::structuredclone::{self, StructuredData, StructuredDataReader};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::bindings::transferable::{ExtractComponents, IdFromComponents, Transferable};
+use crate::dom::bindings::utils::set_dictionary_property;
use crate::dom::eventtarget::EventTarget;
use crate::dom::globalscope::GlobalScope;
+use crate::js::conversions::ToJSValConvertible;
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
#[dom_struct]
@@ -57,7 +61,7 @@ impl MessagePort {
}
/// Create a new port for an incoming transfer-received one.
- fn new_transferred(
+ pub(crate) fn new_transferred(
owner: &GlobalScope,
transferred_port: MessagePortId,
entangled_port: Option<MessagePortId>,
@@ -156,6 +160,83 @@ impl MessagePort {
.post_messageport_msg(*self.message_port_id(), task);
Ok(())
}
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-crossrealmtransformsenderror>
+ pub(crate) fn cross_realm_transform_send_error(&self, error: HandleValue, can_gc: CanGc) {
+ // Perform PackAndPostMessage(port, "error", error),
+ // discarding the result.
+ let _ = self.pack_and_post_message("error", error, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-packandpostmessagehandlingerror>
+ #[allow(unsafe_code)]
+ pub(crate) fn pack_and_post_message_handling_error(
+ &self,
+ type_: &str,
+ value: HandleValue,
+ can_gc: CanGc,
+ ) -> ErrorResult {
+ // Let result be PackAndPostMessage(port, type, value).
+ let result = self.pack_and_post_message(type_, value, can_gc);
+
+ // If result is an abrupt completion,
+ if result.is_err() {
+ // Perform ! CrossRealmTransformSendError(port, result.[[Value]]).
+ // Note: we send UndefinedValue across,
+ // because somehow sending an error results in another error.
+ // The Error::DataClone, which is the only one that is sent across,
+ // will be created upon receipt.
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rooted_error = UndefinedValue());
+ self.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
+ }
+
+ result
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-packandpostmessage>
+ #[allow(unsafe_code)]
+ pub(crate) fn pack_and_post_message(
+ &self,
+ type_: &str,
+ value: HandleValue,
+ _can_gc: CanGc,
+ ) -> ErrorResult {
+ let cx = GlobalScope::get_cx();
+
+ // Let message be OrdinaryObjectCreate(null).
+ rooted!(in(*cx) let mut message = unsafe { JS_NewObject(*cx, ptr::null()) });
+ rooted!(in(*cx) let mut type_string = UndefinedValue());
+ unsafe {
+ type_.to_jsval(*cx, type_string.handle_mut());
+ }
+
+ // Perform ! CreateDataProperty(message, "type", type).
+ unsafe {
+ set_dictionary_property(*cx, message.handle(), "type", type_string.handle())
+ .expect("Setting the message type should not fail.");
+ }
+
+ // Perform ! CreateDataProperty(message, "value", value).
+ unsafe {
+ set_dictionary_property(*cx, message.handle(), "value", value)
+ .expect("Setting the message value should not fail.");
+ }
+
+ // Let targetPort be the port with which port is entangled, if any; otherwise let it be null.
+ // Done in `global.post_messageport_msg`.
+
+ // Let options be «[ "transfer" → « » ]».
+ let mut rooted = CustomAutoRooter::new(vec![]);
+ let transfer = CustomAutoRooterGuard::new(*cx, &mut rooted);
+
+ // Run the message port post message steps providing targetPort, message, and options.
+ rooted!(in(*cx) let mut message_val = UndefinedValue());
+ unsafe {
+ message.to_jsval(*cx, message_val.handle_mut());
+ }
+ self.post_message_impl(cx, message_val.handle(), transfer)
+ }
}
impl Transferable for MessagePort {
diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs
index b5048a4644d..933a14ae317 100644
--- a/components/script/dom/readablestream.rs
+++ b/components/script/dom/readablestream.rs
@@ -6,6 +6,7 @@ use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::ptr::{self};
use std::rc::Rc;
+use std::collections::HashMap;
use dom_struct::dom_struct;
use js::conversions::ToJSValConvertible;
@@ -21,6 +22,10 @@ use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStra
use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode, StreamPipeOptions
};
+use script_bindings::str::DOMString;
+
+use crate::dom::domexception::{DOMErrorName, DOMException};
+use script_bindings::conversions::StringificationBehavior;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
@@ -45,10 +50,16 @@ use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm;
use crate::dom::types::DefaultTeeUnderlyingSource;
use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
+use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
+use crate::dom::messageport::MessagePort;
use crate::js::conversions::FromJSValConvertible;
use crate::realms::{enter_realm, InRealm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
+use base::id::MessagePortId;
+use constellation_traits::MessagePortImpl;
+use crate::dom::bindings::transferable::Transferable;
+use crate::dom::bindings::structuredclone::{StructuredData, StructuredDataReader};
use super::bindings::buffer_source::HeapBufferSource;
use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
@@ -1769,6 +1780,49 @@ impl ReadableStream {
// pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
controller.setup(global, stream, can_gc)
}
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
+ fn setup_cross_realm_transform_readable(
+ &self,
+ cx: SafeJSContext,
+ port: &MessagePort,
+ can_gc: CanGc,
+ ) {
+ let port_id = port.message_port_id();
+ let global = self.global();
+
+ // Perform ! InitializeReadableStream(stream).
+ // Done in `new_inherited`.
+
+ // Let sizeAlgorithm be an algorithm that returns 1.
+ let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
+
+ // Note: other algorithms defined in the underlying source container.
+
+ // Let controller be a new ReadableStreamDefaultController.
+ let controller = ReadableStreamDefaultController::new(
+ &self.global(),
+ UnderlyingSourceType::Transfer(Dom::from_ref(port)),
+ 0.,
+ size_algorithm,
+ can_gc,
+ );
+
+ // Add a handler for port’s message event with the following steps:
+ // Add a handler for port’s messageerror event with the following steps:
+ rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
+ controller: Dom::from_ref(&controller),
+ });
+ global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
+
+ // Enable port’s port message queue.
+ port.Start();
+
+ // Perform ! SetUpReadableStreamDefaultController
+ controller
+ .setup(DomRoot::from_ref(self), can_gc)
+ .expect("Setting up controller for transfer cannot fail.");
+ }
}
impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
@@ -1942,6 +1996,145 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
}
#[allow(unsafe_code)]
+/// The initial steps for the message handler for both readable and writable cross realm transforms.
+/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
+/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+pub(crate) unsafe fn get_type_and_value_from_message(
+ cx: SafeJSContext,
+ data: SafeHandleValue,
+ value: SafeMutableHandleValue,
+ can_gc: CanGc,
+) -> DOMString {
+ // Let data be the data of the message.
+ // Note: we are passed the data as argument,
+ // which originates in the return value of `structuredclone::read`.
+
+ // Assert: data is an Object.
+ assert!(data.is_object());
+ rooted!(in(*cx) let data_object = data.to_object());
+
+ // Let type be ! Get(data, "type").
+ rooted!(in(*cx) let mut type_ = UndefinedValue());
+ get_dictionary_property(
+ *cx,
+ data_object.handle(),
+ "type",
+ type_.handle_mut(),
+ can_gc,
+ )
+ .expect("Getting the type should not fail.");
+
+ // Let value be ! Get(data, "value").
+ get_dictionary_property(*cx, data_object.handle(), "value", value, can_gc)
+ .expect("Getting the value should not fail.");
+
+ // Assert: type is a String.
+ let result = unsafe {
+ DOMString::from_jsval(*cx, type_.handle(), StringificationBehavior::Empty)
+ .expect("The type of the message should be a string")
+ };
+ let ConversionResult::Success(type_string) = result else {
+ unreachable!("The type of the message should be a string");
+ };
+
+ type_string
+}
+
+impl js::gc::Rootable for CrossRealmTransformReadable {}
+
+/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
+/// A wrapper to handle `message` and `messageerror` events
+/// for the port used by the transfered stream.
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+pub(crate) struct CrossRealmTransformReadable {
+ /// The controller used in the algorithm.
+ controller: Dom<ReadableStreamDefaultController>,
+}
+
+impl CrossRealmTransformReadable {
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
+ /// Add a handler for port’s message event with the following steps:
+ #[allow(unsafe_code)]
+ pub(crate) fn handle_message(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ port: &MessagePort,
+ message: SafeHandleValue,
+ _realm: InRealm,
+ can_gc: CanGc,
+ ) {
+ rooted!(in(*cx) let mut value = UndefinedValue());
+ let type_string =
+ unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
+
+ // If type is "chunk",
+ if type_string == "chunk" {
+ // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
+ self.controller
+ .enqueue(cx, value.handle(), can_gc)
+ .expect("Enqueing a chunk should not fail.");
+ }
+
+ // Otherwise, if type is "close",
+ if type_string == "close" {
+ // Perform ! ReadableStreamDefaultControllerClose(controller).
+ self.controller.close(can_gc);
+
+ // Disentangle port.
+ global.disentangle_port(port);
+ }
+
+ // Otherwise, if type is "error",
+ if type_string == "error" {
+ if value.is_undefined() {
+ // Note: for DataClone errors, we send UndefinedValue across,
+ // because somehow sending the error results in another error.
+ // The error is then created here.
+ rooted!(in(*cx) let mut rooted_error = UndefinedValue());
+ Error::DataClone(None).to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, value).
+ self.controller.error(rooted_error.handle(), can_gc);
+ } else {
+ // Perform ! ReadableStreamDefaultControllerError(controller, value).
+ self.controller.error(value.handle(), can_gc);
+ }
+
+ // Disentangle port.
+ global.disentangle_port(port);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+ /// Add a handler for port’s messageerror event with the following steps:
+ #[allow(unsafe_code)]
+ pub(crate) fn handle_error(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ port: &MessagePort,
+ _realm: InRealm,
+ can_gc: CanGc,
+ ) {
+ // Let error be a new "DataCloneError" DOMException.
+ let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
+ rooted!(in(*cx) let mut rooted_error = UndefinedValue());
+ unsafe { error.to_jsval(*cx, rooted_error.handle_mut()) };
+
+ // Perform ! CrossRealmTransformSendError(port, error).
+ port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, error).
+ self.controller.error(rooted_error.handle(), can_gc);
+
+ // Disentangle port.
+ global.disentangle_port(port);
+ }
+}
+
+#[allow(unsafe_code)]
/// Get the `done` property of an object that a read promise resolved to.
pub(crate) fn get_read_promise_done(
cx: SafeJSContext,
@@ -1994,3 +2187,85 @@ pub(crate) fn get_read_promise_bytes(
}
}
}
+
+/// <https://streams.spec.whatwg.org/#rs-transfer>
+impl Transferable for ReadableStream {
+ type Id = MessagePortId;
+ type Data = MessagePortImpl;
+
+ /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A1%E2%91%A0>
+ fn transfer(&self) -> Result<(MessagePortId, MessagePortImpl), ()> {
+ // If ! IsReadableStreamLocked(value) is true, throw a "DataCloneError" DOMException.
+ if self.is_locked() {
+ return Err(());
+ }
+
+ let global = self.global();
+ let realm = enter_realm(&*global);
+ let comp = InRealm::Entered(&realm);
+ let cx = GlobalScope::get_cx();
+ let can_gc = CanGc::note();
+
+ // Let port1 be a new MessagePort in the current Realm.
+ let port_1 = MessagePort::new(&global, can_gc);
+ global.track_message_port(&port_1, None);
+
+ // Let port2 be a new MessagePort in the current Realm.
+ let port_2 = MessagePort::new(&global, can_gc);
+ global.track_message_port(&port_2, None);
+
+ // Entangle port1 and port2.
+ global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
+
+ // Let writable be a new WritableStream in the current Realm.
+ let writable = WritableStream::new_with_proto(&global, None, can_gc);
+
+ // Perform ! SetUpCrossRealmTransformWritable(writable, port1).
+ writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
+
+ // Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
+ let promise = self.pipe_to(cx, &global, &writable, false, false, false, comp, can_gc);
+
+ // Set promise.[[PromiseIsHandled]] to true.
+ promise.set_promise_is_handled();
+
+ // Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
+ port_2.transfer()
+ }
+
+ /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A1%E2%91%A0>
+ fn transfer_receive(
+ owner: &GlobalScope,
+ id: MessagePortId,
+ port_impl: MessagePortImpl,
+ ) -> Result<DomRoot<Self>, ()> {
+ let cx = GlobalScope::get_cx();
+ let can_gc = CanGc::note();
+
+ // Their transfer-receiving steps, given dataHolder and value, are:
+ // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
+ let value = ReadableStream::new_with_proto(owner, None, can_gc);
+
+ // Let deserializedRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[port]], the current Realm).
+ // Done with the `Deserialize` derive of `MessagePortImpl`.
+
+ // Let port be deserializedRecord.[[Deserialized]].
+ let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
+
+ // Perform ! SetUpCrossRealmTransformReadable(value, port).
+ value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc);
+ Ok(value)
+ }
+
+ /// Note: we are relying on the port transfer, so the data returned here are related to the port.
+ fn serialized_storage(data: StructuredData<'_>) -> &mut Option<HashMap<Self::Id, Self::Data>> {
+ match data {
+ StructuredData::Reader(r) => &mut r.port_impls,
+ StructuredData::Writer(w) => &mut w.ports,
+ }
+ }
+
+ fn deserialized_storage(reader: &mut StructuredDataReader) -> &mut Option<Vec<DomRoot<Self>>> {
+ &mut reader.readable_streams
+ }
+}
diff --git a/components/script/dom/underlyingsourcecontainer.rs b/components/script/dom/underlyingsourcecontainer.rs
index e219bbb7b8a..cf396825d4f 100644
--- a/components/script/dom/underlyingsourcecontainer.rs
+++ b/components/script/dom/underlyingsourcecontainer.rs
@@ -7,7 +7,7 @@ use std::rc::Rc;
use dom_struct::dom_struct;
use js::jsapi::{Heap, IsPromiseObject, JSObject};
-use js::jsval::JSVal;
+use js::jsval::{JSVal, UndefinedValue};
use js::rust::{Handle as SafeHandle, HandleObject, HandleValue as SafeHandleValue, IntoHandle};
use crate::dom::bindings::callback::ExceptionHandling;
@@ -18,6 +18,7 @@ use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_w
use crate::dom::bindings::root::{Dom, DomRoot};
use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
use crate::dom::globalscope::GlobalScope;
+use crate::dom::messageport::MessagePort;
use crate::dom::promise::Promise;
use crate::script_runtime::CanGc;
@@ -40,6 +41,8 @@ pub(crate) enum UnderlyingSourceType {
Js(JsUnderlyingSource, Heap<*mut JSObject>),
/// Tee
Tee(Dom<DefaultTeeUnderlyingSource>),
+ /// Transfer, with the port used in some of the algorithms.
+ Transfer(Dom<MessagePort>),
}
impl UnderlyingSourceType {
@@ -49,7 +52,8 @@ impl UnderlyingSourceType {
self,
UnderlyingSourceType::Memory(_) |
UnderlyingSourceType::Blob(_) |
- UnderlyingSourceType::FetchResponse
+ UnderlyingSourceType::FetchResponse |
+ UnderlyingSourceType::Transfer(_)
)
}
@@ -128,6 +132,28 @@ impl UnderlyingSourceContainer {
// Call the cancel algorithm for the appropriate branch.
tee_underlyin_source.cancel_algorithm(reason, can_gc)
},
+ UnderlyingSourceType::Transfer(port) => {
+ // Let cancelAlgorithm be the following steps, taking a reason argument:
+ // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
+
+ // Let result be PackAndPostMessageHandlingError(port, "error", reason).
+ let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
+
+ // Disentangle port.
+ self.global().disentangle_port(port);
+
+ let promise = Promise::new(&self.global(), can_gc);
+
+ // If result is an abrupt completion,
+ if let Err(error) = result {
+ // Return a promise rejected with result.[[Value]].
+ promise.reject_error(error, can_gc);
+ } else {
+ // Otherwise, return a promise resolved with undefined.
+ promise.resolve_native(&(), can_gc);
+ }
+ Some(Ok(promise))
+ },
_ => None,
}
}
@@ -158,6 +184,22 @@ impl UnderlyingSourceContainer {
// Call the pull algorithm for the appropriate branch.
Some(Ok(tee_underlyin_source.pull_algorithm(can_gc)))
},
+ UnderlyingSourceType::Transfer(port) => {
+ // Let pullAlgorithm be the following steps:
+ // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
+
+ let cx = GlobalScope::get_cx();
+
+ // Perform ! PackAndPostMessage(port, "pull", undefined).
+ rooted!(in(*cx) let mut value = UndefinedValue());
+ port.pack_and_post_message("pull", value.handle(), can_gc)
+ .expect("Sending pull should not fail.");
+
+ // Return a promise resolved with undefined.
+ let promise = Promise::new(&self.global(), can_gc);
+ promise.resolve_native(&(), can_gc);
+ Some(Ok(promise))
+ },
// Note: other source type have no pull steps for now.
_ => None,
}
@@ -217,6 +259,11 @@ impl UnderlyingSourceContainer {
// Let startAlgorithm be an algorithm that returns undefined.
None
},
+ UnderlyingSourceType::Transfer(_) => {
+ // Let startAlgorithm be an algorithm that returns undefined.
+ // from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
+ None
+ },
_ => None,
}
}
diff --git a/components/script/dom/writablestream.rs b/components/script/dom/writablestream.rs
index bbc5d6316b6..c8a91cd7475 100644
--- a/components/script/dom/writablestream.rs
+++ b/components/script/dom/writablestream.rs
@@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
-use std::cell::Cell;
+use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::mem;
use std::ptr::{self};
@@ -15,6 +15,7 @@ use js::rust::{
HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
MutableHandleValue as SafeMutableHandleValue,
};
+use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
@@ -22,16 +23,20 @@ use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSi
use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
use crate::dom::bindings::conversions::ConversionResult;
use crate::dom::bindings::error::{Error, Fallible};
-use crate::dom::bindings::reflector::{Reflector, reflect_dom_object_with_proto};
+use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
+use crate::dom::domexception::{DOMErrorName, DOMException};
use crate::dom::globalscope::GlobalScope;
+use crate::dom::messageport::MessagePort;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
+use crate::dom::readablestream::get_type_and_value_from_message;
use crate::dom::writablestreamdefaultcontroller::{
UnderlyingSinkType, WritableStreamDefaultController,
};
use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
+use crate::js::conversions::ToJSValConvertible;
use crate::realms::{InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
@@ -175,7 +180,7 @@ impl WritableStream {
}
}
- fn new_with_proto(
+ pub(crate) fn new_with_proto(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
@@ -834,6 +839,58 @@ impl WritableStream {
// Set stream.[[backpressure]] to backpressure.
self.set_backpressure(backpressure);
}
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+ pub(crate) fn setup_cross_realm_transform_writable(
+ &self,
+ cx: SafeJSContext,
+ port: &MessagePort,
+ can_gc: CanGc,
+ ) {
+ let port_id = port.message_port_id();
+ let global = self.global();
+
+ // Perform ! InitializeWritableStream(stream).
+ // Done in `new_inherited`.
+
+ // Let sizeAlgorithm be an algorithm that returns 1.
+ // Re-ordered because of the need to pass it to `new`.
+ let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
+
+ // Note: other algorithms defined in the controller at call site.
+
+ // Let backpressurePromise be a new promise.
+ let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
+
+ // Let controller be a new WritableStreamDefaultController.
+ let controller = WritableStreamDefaultController::new(
+ &global,
+ UnderlyingSinkType::Transfer {
+ backpressure_promise: backpressure_promise.clone(),
+ port: Dom::from_ref(port),
+ },
+ &UnderlyingSink::empty(),
+ 1.0,
+ size_algorithm,
+ can_gc,
+ );
+
+ // Add a handler for port’s message event with the following steps:
+ // Add a handler for port’s messageerror event with the following steps:
+ rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
+ controller: Dom::from_ref(&controller),
+ backpressure_promise: backpressure_promise.clone(),
+ });
+ global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
+
+ // Enable port’s port message queue.
+ port.Start();
+
+ // Perform ! SetUpWritableStreamDefaultController
+ controller
+ .setup(cx, &global, self, &None, can_gc)
+ .expect("Setup for transfer cannot fail");
+ }
}
impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
@@ -967,3 +1024,86 @@ impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
self.aquire_default_writer(cx, &global, can_gc)
}
}
+
+impl js::gc::Rootable for CrossRealmTransformWritable {}
+
+/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+/// A wrapper to handle `message` and `messageerror` events
+/// for the port used by the transfered stream.
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+pub(crate) struct CrossRealmTransformWritable {
+ /// The controller used in the algorithm.
+ controller: Dom<WritableStreamDefaultController>,
+
+ /// The `backpressurePromise` used in the algorithm.
+ #[ignore_malloc_size_of = "Rc is hard"]
+ backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
+}
+
+impl CrossRealmTransformWritable {
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+ /// Add a handler for port’s message event with the following steps:
+ #[allow(unsafe_code)]
+ pub(crate) fn handle_message(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ message: SafeHandleValue,
+ _realm: InRealm,
+ can_gc: CanGc,
+ ) {
+ rooted!(in(*cx) let mut value = UndefinedValue());
+ let type_string =
+ unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
+
+ // If type is "pull",
+ // Done below as the steps are the same for both types.
+
+ // Otherwise, if type is "error",
+ if type_string == "error" {
+ // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
+ self.controller
+ .error_if_needed(cx, value.handle(), global, can_gc);
+ }
+
+ let backpressure_promise = self.backpressure_promise.borrow_mut().take();
+
+ // Note: the below steps are for both "pull" and "error" types.
+ // If backpressurePromise is not undefined,
+ if let Some(promise) = backpressure_promise {
+ // Resolve backpressurePromise with undefined.
+ promise.resolve_native(&(), can_gc);
+
+ // Set backpressurePromise to undefined.
+ // Done above with `take`.
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+ /// Add a handler for port’s messageerror event with the following steps:
+ #[allow(unsafe_code)]
+ pub(crate) fn handle_error(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ port: &MessagePort,
+ _realm: InRealm,
+ can_gc: CanGc,
+ ) {
+ // Let error be a new "DataCloneError" DOMException.
+ let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
+ rooted!(in(*cx) let mut rooted_error = UndefinedValue());
+ unsafe { error.to_jsval(*cx, rooted_error.handle_mut()) };
+
+ // Perform ! CrossRealmTransformSendError(port, error).
+ port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
+
+ // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
+ self.controller
+ .error_if_needed(cx, rooted_error.handle(), global, can_gc);
+
+ // Disentangle port.
+ global.disentangle_port(port);
+ }
+}
diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs
index 691e85408e9..0037672cbdf 100644
--- a/components/script/dom/writablestreamdefaultcontroller.rs
+++ b/components/script/dom/writablestreamdefaultcontroller.rs
@@ -19,9 +19,10 @@ use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
};
use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
use crate::dom::bindings::error::{Error, ErrorToJsval};
-use crate::dom::bindings::reflector::{Reflector, reflect_dom_object};
+use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::globalscope::GlobalScope;
+use crate::dom::messageport::MessagePort;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
@@ -135,6 +136,57 @@ impl Callback for StartAlgorithmRejectionHandler {
}
}
+impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
+
+/// Reacting to backpressurePromise as part of the `writeAlgorithm` of
+/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+#[derive(JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct TransferBackPressurePromiseReaction {
+ /// The result of reacting to backpressurePromise.
+ #[ignore_malloc_size_of = "Rc is hard"]
+ result_promise: Rc<Promise>,
+
+ /// The backpressurePromise.
+ #[ignore_malloc_size_of = "Rc is hard"]
+ backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
+
+ /// The chunk received by the `writeAlgorithm`.
+ #[ignore_malloc_size_of = "mozjs"]
+ chunk: Box<Heap<JSVal>>,
+
+ /// The port used in the algorithm.
+ port: Dom<MessagePort>,
+}
+
+impl Callback for TransferBackPressurePromiseReaction {
+ /// Reacting to backpressurePromise with the following fulfillment steps:
+ fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
+ let global = self.result_promise.global();
+ // Set backpressurePromise to a new promise.
+ *self.backpressure_promise.borrow_mut() = Some(Promise::new(&global, can_gc));
+
+ // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk).
+ rooted!(in(*cx) let mut chunk = UndefinedValue());
+ chunk.set(self.chunk.get());
+ let result =
+ self.port
+ .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
+
+ // Disentangle port.
+ global.disentangle_port(&self.port);
+
+ // If result is an abrupt completion,
+ if let Err(error) = result {
+ // Return a promise rejected with result.[[Value]].
+ self.result_promise.reject_error(error, can_gc);
+ } else {
+ // Otherwise, return a promise resolved with undefined.
+ self.result_promise.resolve_native(&(), can_gc);
+ }
+ }
+}
+
impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
/// The fulfillment handler for
@@ -215,14 +267,16 @@ impl Callback for WriteAlgorithmRejectionHandler {
}
/// The type of sink algorithms we are using.
-#[allow(dead_code)]
-#[derive(JSTraceable, MallocSizeOf, PartialEq)]
+#[derive(JSTraceable, PartialEq)]
pub enum UnderlyingSinkType {
/// Algorithms are provided by Js callbacks.
Js,
/// Algorithms supporting streams transfer are implemented in Rust.
- /// TODO: implement transfer.
- Transfer,
+ /// The promise and port used in those algorithms are stored here.
+ Transfer {
+ backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
+ port: Dom<MessagePort>,
+ },
}
/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
@@ -230,8 +284,7 @@ pub enum UnderlyingSinkType {
pub struct WritableStreamDefaultController {
reflector_: Reflector,
- /// The type of underlying sink used. Besides the default JS one,
- /// there will be others for stream transfer, and for transform stream.
+ #[ignore_malloc_size_of = "Rc is hard"]
underlying_sink_type: UnderlyingSinkType,
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
@@ -409,6 +462,11 @@ impl WritableStreamDefaultController {
Promise::new_resolved(global, cx, result.get(), can_gc)
}
} else {
+ // Note: we are either here because the Js algorithm is none,
+ // or because we are suppporting a stream transfer as
+ // part of #abstract-opdef-setupcrossrealmtransformwritable
+ // and the logic is the same for both.
+
// Let startAlgorithm be an algorithm that returns undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
@@ -480,9 +538,26 @@ impl WritableStreamDefaultController {
promise
})
},
- UnderlyingSinkType::Transfer => {
- // TODO: implement transfer.
- Promise::new_resolved(global, cx, (), can_gc)
+ UnderlyingSinkType::Transfer { ref port, .. } => {
+ // The steps from the `abortAlgorithm` at
+ // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+
+ // Let result be PackAndPostMessageHandlingError(port, "error", reason).
+ let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
+
+ // Disentangle port.
+ global.disentangle_port(port);
+
+ let promise = Promise::new(global, can_gc);
+
+ // If result is an abrupt completion, return a promise rejected with result.[[Value]]
+ if let Err(error) = result {
+ promise.reject_error(error, can_gc);
+ } else {
+ // Otherwise, return a promise resolved with undefined.
+ promise.reject_native(&(), can_gc);
+ }
+ promise
},
};
@@ -521,9 +596,45 @@ impl WritableStreamDefaultController {
promise
})
},
- UnderlyingSinkType::Transfer => {
- // TODO: implement transfer.
- Promise::new_resolved(global, cx, (), can_gc)
+ UnderlyingSinkType::Transfer {
+ ref backpressure_promise,
+ ref port,
+ ..
+ } => {
+ // The steps from the `writeAlgorithm` at
+ // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+
+ {
+ // If backpressurePromise is undefined,
+ // set backpressurePromise to a promise resolved with undefined.
+ let mut backpressure_promise = backpressure_promise.borrow_mut();
+ if backpressure_promise.is_none() {
+ *backpressure_promise = Some(Promise::new_resolved(global, cx, (), can_gc));
+ }
+ }
+
+ // Return the result of reacting to backpressurePromise with the following fulfillment steps:
+ let result_promise = Promise::new(global, can_gc);
+ rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
+ port: port.clone(),
+ backpressure_promise: backpressure_promise.clone(),
+ chunk: Heap::boxed(chunk.get()),
+ result_promise: result_promise.clone(),
+ }));
+ let handler = PromiseNativeHandler::new(
+ global,
+ fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
+ None,
+ can_gc,
+ );
+ let realm = enter_realm(global);
+ let comp = InRealm::Entered(&realm);
+ backpressure_promise
+ .borrow()
+ .as_ref()
+ .expect("Promise must be some by now.")
+ .append_native_handler(&handler, comp, can_gc);
+ result_promise
},
}
}
@@ -551,8 +662,19 @@ impl WritableStreamDefaultController {
promise
})
},
- UnderlyingSinkType::Transfer => {
- // TODO: implement transfer.
+ UnderlyingSinkType::Transfer { ref port, .. } => {
+ // The steps from the `closeAlgorithm` at
+ // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+
+ // Perform ! PackAndPostMessage(port, "close", undefined).
+ rooted!(in(*cx) let mut value = UndefinedValue());
+ port.pack_and_post_message("close", value.handle(), can_gc)
+ .expect("Sending close should not fail.");
+
+ // Disentangle port.
+ global.disentangle_port(port);
+
+ // Return a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
},
}