diff options
Diffstat (limited to 'components/script/dom/writablestreamdefaultcontroller.rs')
-rw-r--r-- | components/script/dom/writablestreamdefaultcontroller.rs | 152 |
1 files changed, 137 insertions, 15 deletions
diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs index 691e85408e9..0037672cbdf 100644 --- a/components/script/dom/writablestreamdefaultcontroller.rs +++ b/components/script/dom/writablestreamdefaultcontroller.rs @@ -19,9 +19,10 @@ use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{ }; use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods; use crate::dom::bindings::error::{Error, ErrorToJsval}; -use crate::dom::bindings::reflector::{Reflector, reflect_dom_object}; +use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object}; use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; use crate::dom::globalscope::GlobalScope; +use crate::dom::messageport::MessagePort; use crate::dom::promise::Promise; use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize}; @@ -135,6 +136,57 @@ impl Callback for StartAlgorithmRejectionHandler { } } +impl js::gc::Rootable for TransferBackPressurePromiseReaction {} + +/// Reacting to backpressurePromise as part of the `writeAlgorithm` of +/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> +#[derive(JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct TransferBackPressurePromiseReaction { + /// The result of reacting to backpressurePromise. + #[ignore_malloc_size_of = "Rc is hard"] + result_promise: Rc<Promise>, + + /// The backpressurePromise. + #[ignore_malloc_size_of = "Rc is hard"] + backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>, + + /// The chunk received by the `writeAlgorithm`. + #[ignore_malloc_size_of = "mozjs"] + chunk: Box<Heap<JSVal>>, + + /// The port used in the algorithm. + port: Dom<MessagePort>, +} + +impl Callback for TransferBackPressurePromiseReaction { + /// Reacting to backpressurePromise with the following fulfillment steps: + fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) { + let global = self.result_promise.global(); + // Set backpressurePromise to a new promise. + *self.backpressure_promise.borrow_mut() = Some(Promise::new(&global, can_gc)); + + // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk). + rooted!(in(*cx) let mut chunk = UndefinedValue()); + chunk.set(self.chunk.get()); + let result = + self.port + .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc); + + // Disentangle port. + global.disentangle_port(&self.port); + + // If result is an abrupt completion, + if let Err(error) = result { + // Return a promise rejected with result.[[Value]]. + self.result_promise.reject_error(error, can_gc); + } else { + // Otherwise, return a promise resolved with undefined. + self.result_promise.resolve_native(&(), can_gc); + } + } +} + impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {} /// The fulfillment handler for @@ -215,14 +267,16 @@ impl Callback for WriteAlgorithmRejectionHandler { } /// The type of sink algorithms we are using. -#[allow(dead_code)] -#[derive(JSTraceable, MallocSizeOf, PartialEq)] +#[derive(JSTraceable, PartialEq)] pub enum UnderlyingSinkType { /// Algorithms are provided by Js callbacks. Js, /// Algorithms supporting streams transfer are implemented in Rust. - /// TODO: implement transfer. - Transfer, + /// The promise and port used in those algorithms are stored here. + Transfer { + backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>, + port: Dom<MessagePort>, + }, } /// <https://streams.spec.whatwg.org/#ws-default-controller-class> @@ -230,8 +284,7 @@ pub enum UnderlyingSinkType { pub struct WritableStreamDefaultController { reflector_: Reflector, - /// The type of underlying sink used. Besides the default JS one, - /// there will be others for stream transfer, and for transform stream. + #[ignore_malloc_size_of = "Rc is hard"] underlying_sink_type: UnderlyingSinkType, /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm> @@ -409,6 +462,11 @@ impl WritableStreamDefaultController { Promise::new_resolved(global, cx, result.get(), can_gc) } } else { + // Note: we are either here because the Js algorithm is none, + // or because we are suppporting a stream transfer as + // part of #abstract-opdef-setupcrossrealmtransformwritable + // and the logic is the same for both. + // Let startAlgorithm be an algorithm that returns undefined. Promise::new_resolved(global, cx, (), can_gc) }; @@ -480,9 +538,26 @@ impl WritableStreamDefaultController { promise }) }, - UnderlyingSinkType::Transfer => { - // TODO: implement transfer. - Promise::new_resolved(global, cx, (), can_gc) + UnderlyingSinkType::Transfer { ref port, .. } => { + // The steps from the `abortAlgorithm` at + // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + + // Let result be PackAndPostMessageHandlingError(port, "error", reason). + let result = port.pack_and_post_message_handling_error("error", reason, can_gc); + + // Disentangle port. + global.disentangle_port(port); + + let promise = Promise::new(global, can_gc); + + // If result is an abrupt completion, return a promise rejected with result.[[Value]] + if let Err(error) = result { + promise.reject_error(error, can_gc); + } else { + // Otherwise, return a promise resolved with undefined. + promise.reject_native(&(), can_gc); + } + promise }, }; @@ -521,9 +596,45 @@ impl WritableStreamDefaultController { promise }) }, - UnderlyingSinkType::Transfer => { - // TODO: implement transfer. - Promise::new_resolved(global, cx, (), can_gc) + UnderlyingSinkType::Transfer { + ref backpressure_promise, + ref port, + .. + } => { + // The steps from the `writeAlgorithm` at + // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + + { + // If backpressurePromise is undefined, + // set backpressurePromise to a promise resolved with undefined. + let mut backpressure_promise = backpressure_promise.borrow_mut(); + if backpressure_promise.is_none() { + *backpressure_promise = Some(Promise::new_resolved(global, cx, (), can_gc)); + } + } + + // Return the result of reacting to backpressurePromise with the following fulfillment steps: + let result_promise = Promise::new(global, can_gc); + rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction { + port: port.clone(), + backpressure_promise: backpressure_promise.clone(), + chunk: Heap::boxed(chunk.get()), + result_promise: result_promise.clone(), + })); + let handler = PromiseNativeHandler::new( + global, + fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), + None, + can_gc, + ); + let realm = enter_realm(global); + let comp = InRealm::Entered(&realm); + backpressure_promise + .borrow() + .as_ref() + .expect("Promise must be some by now.") + .append_native_handler(&handler, comp, can_gc); + result_promise }, } } @@ -551,8 +662,19 @@ impl WritableStreamDefaultController { promise }) }, - UnderlyingSinkType::Transfer => { - // TODO: implement transfer. + UnderlyingSinkType::Transfer { ref port, .. } => { + // The steps from the `closeAlgorithm` at + // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> + + // Perform ! PackAndPostMessage(port, "close", undefined). + rooted!(in(*cx) let mut value = UndefinedValue()); + port.pack_and_post_message("close", value.handle(), can_gc) + .expect("Sending close should not fail."); + + // Disentangle port. + global.disentangle_port(port); + + // Return a promise resolved with undefined. Promise::new_resolved(global, cx, (), can_gc) }, } |