aboutsummaryrefslogtreecommitdiffstats
path: root/components/script/dom/writablestreamdefaultcontroller.rs
diff options
context:
space:
mode:
Diffstat (limited to 'components/script/dom/writablestreamdefaultcontroller.rs')
-rw-r--r--components/script/dom/writablestreamdefaultcontroller.rs152
1 files changed, 137 insertions, 15 deletions
diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs
index 691e85408e9..0037672cbdf 100644
--- a/components/script/dom/writablestreamdefaultcontroller.rs
+++ b/components/script/dom/writablestreamdefaultcontroller.rs
@@ -19,9 +19,10 @@ use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
};
use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
use crate::dom::bindings::error::{Error, ErrorToJsval};
-use crate::dom::bindings::reflector::{Reflector, reflect_dom_object};
+use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::globalscope::GlobalScope;
+use crate::dom::messageport::MessagePort;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
@@ -135,6 +136,57 @@ impl Callback for StartAlgorithmRejectionHandler {
}
}
+impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
+
+/// Reacting to backpressurePromise as part of the `writeAlgorithm` of
+/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+#[derive(JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct TransferBackPressurePromiseReaction {
+ /// The result of reacting to backpressurePromise.
+ #[ignore_malloc_size_of = "Rc is hard"]
+ result_promise: Rc<Promise>,
+
+ /// The backpressurePromise.
+ #[ignore_malloc_size_of = "Rc is hard"]
+ backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
+
+ /// The chunk received by the `writeAlgorithm`.
+ #[ignore_malloc_size_of = "mozjs"]
+ chunk: Box<Heap<JSVal>>,
+
+ /// The port used in the algorithm.
+ port: Dom<MessagePort>,
+}
+
+impl Callback for TransferBackPressurePromiseReaction {
+ /// Reacting to backpressurePromise with the following fulfillment steps:
+ fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
+ let global = self.result_promise.global();
+ // Set backpressurePromise to a new promise.
+ *self.backpressure_promise.borrow_mut() = Some(Promise::new(&global, can_gc));
+
+ // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk).
+ rooted!(in(*cx) let mut chunk = UndefinedValue());
+ chunk.set(self.chunk.get());
+ let result =
+ self.port
+ .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
+
+ // Disentangle port.
+ global.disentangle_port(&self.port);
+
+ // If result is an abrupt completion,
+ if let Err(error) = result {
+ // Return a promise rejected with result.[[Value]].
+ self.result_promise.reject_error(error, can_gc);
+ } else {
+ // Otherwise, return a promise resolved with undefined.
+ self.result_promise.resolve_native(&(), can_gc);
+ }
+ }
+}
+
impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
/// The fulfillment handler for
@@ -215,14 +267,16 @@ impl Callback for WriteAlgorithmRejectionHandler {
}
/// The type of sink algorithms we are using.
-#[allow(dead_code)]
-#[derive(JSTraceable, MallocSizeOf, PartialEq)]
+#[derive(JSTraceable, PartialEq)]
pub enum UnderlyingSinkType {
/// Algorithms are provided by Js callbacks.
Js,
/// Algorithms supporting streams transfer are implemented in Rust.
- /// TODO: implement transfer.
- Transfer,
+ /// The promise and port used in those algorithms are stored here.
+ Transfer {
+ backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
+ port: Dom<MessagePort>,
+ },
}
/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
@@ -230,8 +284,7 @@ pub enum UnderlyingSinkType {
pub struct WritableStreamDefaultController {
reflector_: Reflector,
- /// The type of underlying sink used. Besides the default JS one,
- /// there will be others for stream transfer, and for transform stream.
+ #[ignore_malloc_size_of = "Rc is hard"]
underlying_sink_type: UnderlyingSinkType,
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
@@ -409,6 +462,11 @@ impl WritableStreamDefaultController {
Promise::new_resolved(global, cx, result.get(), can_gc)
}
} else {
+ // Note: we are either here because the Js algorithm is none,
+ // or because we are suppporting a stream transfer as
+ // part of #abstract-opdef-setupcrossrealmtransformwritable
+ // and the logic is the same for both.
+
// Let startAlgorithm be an algorithm that returns undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
@@ -480,9 +538,26 @@ impl WritableStreamDefaultController {
promise
})
},
- UnderlyingSinkType::Transfer => {
- // TODO: implement transfer.
- Promise::new_resolved(global, cx, (), can_gc)
+ UnderlyingSinkType::Transfer { ref port, .. } => {
+ // The steps from the `abortAlgorithm` at
+ // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+
+ // Let result be PackAndPostMessageHandlingError(port, "error", reason).
+ let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
+
+ // Disentangle port.
+ global.disentangle_port(port);
+
+ let promise = Promise::new(global, can_gc);
+
+ // If result is an abrupt completion, return a promise rejected with result.[[Value]]
+ if let Err(error) = result {
+ promise.reject_error(error, can_gc);
+ } else {
+ // Otherwise, return a promise resolved with undefined.
+ promise.reject_native(&(), can_gc);
+ }
+ promise
},
};
@@ -521,9 +596,45 @@ impl WritableStreamDefaultController {
promise
})
},
- UnderlyingSinkType::Transfer => {
- // TODO: implement transfer.
- Promise::new_resolved(global, cx, (), can_gc)
+ UnderlyingSinkType::Transfer {
+ ref backpressure_promise,
+ ref port,
+ ..
+ } => {
+ // The steps from the `writeAlgorithm` at
+ // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+
+ {
+ // If backpressurePromise is undefined,
+ // set backpressurePromise to a promise resolved with undefined.
+ let mut backpressure_promise = backpressure_promise.borrow_mut();
+ if backpressure_promise.is_none() {
+ *backpressure_promise = Some(Promise::new_resolved(global, cx, (), can_gc));
+ }
+ }
+
+ // Return the result of reacting to backpressurePromise with the following fulfillment steps:
+ let result_promise = Promise::new(global, can_gc);
+ rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
+ port: port.clone(),
+ backpressure_promise: backpressure_promise.clone(),
+ chunk: Heap::boxed(chunk.get()),
+ result_promise: result_promise.clone(),
+ }));
+ let handler = PromiseNativeHandler::new(
+ global,
+ fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
+ None,
+ can_gc,
+ );
+ let realm = enter_realm(global);
+ let comp = InRealm::Entered(&realm);
+ backpressure_promise
+ .borrow()
+ .as_ref()
+ .expect("Promise must be some by now.")
+ .append_native_handler(&handler, comp, can_gc);
+ result_promise
},
}
}
@@ -551,8 +662,19 @@ impl WritableStreamDefaultController {
promise
})
},
- UnderlyingSinkType::Transfer => {
- // TODO: implement transfer.
+ UnderlyingSinkType::Transfer { ref port, .. } => {
+ // The steps from the `closeAlgorithm` at
+ // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
+
+ // Perform ! PackAndPostMessage(port, "close", undefined).
+ rooted!(in(*cx) let mut value = UndefinedValue());
+ port.pack_and_post_message("close", value.handle(), can_gc)
+ .expect("Sending close should not fail.");
+
+ // Disentangle port.
+ global.disentangle_port(port);
+
+ // Return a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
},
}