aboutsummaryrefslogtreecommitdiffstats
path: root/components/script/dom
diff options
context:
space:
mode:
authorGregory Terzian <2792687+gterzian@users.noreply.github.com>2025-02-19 20:02:14 +0700
committerGitHub <noreply@github.com>2025-02-19 13:02:14 +0000
commitdf6d6361688f487142809b5923e030447870e073 (patch)
tree3e81927a47cfc98e8c895c8153eee47ee5758753 /components/script/dom
parent720bc725b0c1cd5f01f9ff1d17793a3b1f32a480 (diff)
downloadservo-df6d6361688f487142809b5923e030447870e073.tar.gz
servo-df6d6361688f487142809b5923e030447870e073.zip
dom: Implement `WritableStream` (#34844)
* add basic interface for writable stream Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add stubs for pipeTo and pipeThrough methods Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add stubs for writable stream defautl writer Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add stubs for writable stream controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add underlying source dict Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add underlying source dict Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement constructor Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement init writable stream Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * impl setup default controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement controller setup Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement controller advance queue if neededd Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream finish erroring Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream reject close and closed promise if needed Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * finish implementation of stream finish erroring Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * call into controller setup from stream constructor Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream mark first write request in flight Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement controller process write Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * call into advance queue if needed at various points Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream deal with rejection, use from_safe_context Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement controller clear algorithms Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove unused todo Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream start erroring Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * finish writer ensure ready promise rejected Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream finish in flight write request Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement write constructor and setup Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement controller error Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove unnecessary unsafe code Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * finish implementing process write Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement close sentinel Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement public locked Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream abort Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream close Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement controller close Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix use of crown Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove unnecessary options around writer promises Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement writer get desired size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement writer ready Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement writer abort Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement writer close Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement writer release lock Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement writer public write Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement private writer write Uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement writer release. Uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * impl controller process close Uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * finish controller process close Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * root promise handlers Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * handler errors in stream and writer constructor finish implementation of stream finish in flight close Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix warnings Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement controller get chunk size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * tidy the webidls Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * implement stream get writer Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix assertion of stream state when advancing queue if needed Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add docs for value with size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * use reject_error in abort Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove unnecessary allowances of unsafe code Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * turn writable-streams test suite on Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * update encodings test expectations Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * properly check if type is set on sink in stream constructor Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix double borrow in controller advance queue if needed Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * make the queue aware of the close sentinel when dequeuing a value Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix assertion of no backpressure in update backpressure Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * also clear strategy size when clearing algorithms Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove this object arg when calling into strategy size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix has operations marked in flight Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix typo in has in flight write request Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * turn error into no-op when aborting a stream, if the stream is closed or errored. Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix error handling of calling into abort algorithm Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix error handling of calling into close and write algorithms Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix double borrow on queue fix logic in update_backpressure fix logic in get_desired_size fix logic in writer setup Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * update test expectations for aborting suite Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix controller get_backpressure Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix clippy Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * update test expectations to expect errors in tests using unsupported apis Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix error handling of calling into start algo in controller setup Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * update test expectations for test checking for undefined this in strategy size call Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * update test expectation to timeout for response-stream-with-broken-then.any.worker Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * update interfaces Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix use of global() and error to_jsval Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix use of crown for promise handlers Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove fail expectation from worker interface objects test Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove fail expectation for test expecting this to be undefined in callback Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix documentation link for writablestream state Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * refactor write_requests to use a vec deque uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove unnecessary doc Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * refactor reject_close_and_closed_promise_if_needed to take a safe js context as argument uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * pass globals and contexts by ref where possible uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix doc link for controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove unnecessary comment Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * change update_backpressure to be a method of the writablestream uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * rename writer method that resolve closed and ready promise for clarity uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add comments for steps in peek queue value Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix doc link for the abort algorihtm fulfillment handler Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix step doc and variable name in abort algo rejection handler Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * Add must_root to pending abort request Co-authored-by: Josh Matthews <josh@joshmatthews.net> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> * limit visibility to crate for has_operations_marked_inflight Co-authored-by: Josh Matthews <josh@joshmatthews.net> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> * limit visibility to crate for get_stored_error Co-authored-by: Josh Matthews <josh@joshmatthews.net> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> * remove potention re-borrow risk in reject loop on write requests in finish_erroring Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove potential re-borrow risk when taking pending abort request in finish_erroring Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove potential re-borrow risk when taking close request in reject_close_and_closed_promise_if_needed Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove re-borrow risks in finish_in_flight_close Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove re-borrow risk on in_flight_close_request in finish_in_flight_close_with_error Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove unnecessary clone of of reason in abort Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix condition on backpressure and a writable state in close Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * limit visibility to crate for update_backpressure Co-authored-by: Josh Matthews <josh@joshmatthews.net> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> * remove mutability of reason in abort workflow Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove unnecessary use of ignore_malloc_size_of around Dom in controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix ignore malloc size of comment for strategy size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * reduce visibility of public methods to crate in controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove use of JS_GetPendingException in controller get_chunk_size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * return early on error in write Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * use is_some_and in assertion that stream.witer is writer in release Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * root pending abort request uses ai Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix mutable re-borrow risk in writer Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Diffstat (limited to 'components/script/dom')
-rw-r--r--components/script/dom/mod.rs3
-rw-r--r--components/script/dom/readablestreamdefaultcontroller.rs59
-rw-r--r--components/script/dom/writablestream.rs970
-rw-r--r--components/script/dom/writablestreamdefaultcontroller.rs829
-rw-r--r--components/script/dom/writablestreamdefaultwriter.rs500
5 files changed, 2350 insertions, 11 deletions
diff --git a/components/script/dom/mod.rs b/components/script/dom/mod.rs
index 7127828224e..73320e527b4 100644
--- a/components/script/dom/mod.rs
+++ b/components/script/dom/mod.rs
@@ -628,6 +628,9 @@ pub(crate) mod workerlocation;
pub(crate) mod workernavigator;
pub(crate) mod worklet;
pub(crate) mod workletglobalscope;
+pub(crate) mod writablestream;
+pub(crate) mod writablestreamdefaultcontroller;
+pub(crate) mod writablestreamdefaultwriter;
pub(crate) mod xmldocument;
pub(crate) mod xmlhttprequest;
pub(crate) mod xmlhttprequesteventtarget;
diff --git a/components/script/dom/readablestreamdefaultcontroller.rs b/components/script/dom/readablestreamdefaultcontroller.rs
index 6aa17253282..779bf1947ba 100644
--- a/components/script/dom/readablestreamdefaultcontroller.rs
+++ b/components/script/dom/readablestreamdefaultcontroller.rs
@@ -115,21 +115,25 @@ impl Callback for StartAlgorithmRejectionHandler {
}
/// <https://streams.spec.whatwg.org/#value-with-size>
-#[derive(JSTraceable)]
+#[derive(Debug, JSTraceable, PartialEq)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
pub(crate) struct ValueWithSize {
- value: Box<Heap<JSVal>>,
- size: f64,
+ /// <https://streams.spec.whatwg.org/#value-with-size-value>
+ pub(crate) value: Box<Heap<JSVal>>,
+ /// <https://streams.spec.whatwg.org/#value-with-size-size>
+ pub(crate) size: f64,
}
/// <https://streams.spec.whatwg.org/#value-with-size>
-#[derive(JSTraceable)]
+#[derive(Debug, JSTraceable, PartialEq)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
pub(crate) enum EnqueuedValue {
/// A value enqueued from Rust.
Native(Box<[u8]>),
/// A Js value.
Js(ValueWithSize),
+ /// <https://streams.spec.whatwg.org/#close-sentinel>
+ CloseSentinel,
}
impl EnqueuedValue {
@@ -137,6 +141,9 @@ impl EnqueuedValue {
match self {
EnqueuedValue::Native(v) => v.len() as f64,
EnqueuedValue::Js(v) => v.size,
+ // The size of the sentinel is zero,
+ // as per <https://streams.spec.whatwg.org/#ref-for-close-sentinel%E2%91%A0>
+ EnqueuedValue::CloseSentinel => 0.,
}
}
@@ -152,6 +159,9 @@ impl EnqueuedValue {
EnqueuedValue::Js(value_with_size) => unsafe {
value_with_size.value.to_jsval(*cx, rval);
},
+ EnqueuedValue::CloseSentinel => {
+ unreachable!("The close sentinel is never made available as a js val.")
+ },
}
}
}
@@ -161,6 +171,7 @@ fn is_non_negative_number(value: &EnqueuedValue) -> bool {
let value_with_size = match value {
EnqueuedValue::Native(_) => return true,
EnqueuedValue::Js(value_with_size) => value_with_size,
+ EnqueuedValue::CloseSentinel => return true,
};
// If v is not a Number, return false.
@@ -186,23 +197,29 @@ pub(crate) struct QueueWithSizes {
#[ignore_malloc_size_of = "EnqueuedValue::Js"]
queue: VecDeque<EnqueuedValue>,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
- total_size: f64,
+ pub(crate) total_size: f64,
}
impl QueueWithSizes {
/// <https://streams.spec.whatwg.org/#dequeue-value>
- fn dequeue_value(&mut self, cx: SafeJSContext, rval: MutableHandleValue) {
+ /// A none `rval` means we're dequeing the close sentinel,
+ /// which should never be made available to script.
+ pub(crate) fn dequeue_value(&mut self, cx: SafeJSContext, rval: Option<MutableHandleValue>) {
let Some(value) = self.queue.front() else {
unreachable!("Buffer cannot be empty when dequeue value is called into.");
};
self.total_size -= value.size();
- value.to_jsval(cx, rval);
+ if let Some(rval) = rval {
+ value.to_jsval(cx, rval);
+ } else {
+ assert_eq!(value, &EnqueuedValue::CloseSentinel);
+ }
self.queue.pop_front();
}
/// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
- fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> {
+ pub(crate) fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> {
// If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
if !is_non_negative_number(&value) {
return Err(Error::Range(
@@ -223,10 +240,30 @@ impl QueueWithSizes {
Ok(())
}
- fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
self.queue.is_empty()
}
+ /// <https://streams.spec.whatwg.org/#peek-queue-value>
+ /// Returns whether value is the close sentinel.
+ pub(crate) fn peek_queue_value(&self, cx: SafeJSContext, rval: MutableHandleValue) -> bool {
+ // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
+ // Done with the QueueWithSizes type.
+
+ // Assert: container.[[queue]] is not empty.
+ assert!(!self.is_empty());
+
+ // Let valueWithSize be container.[[queue]][0].
+ let value_with_size = self.queue.front().expect("Queue is not empty.");
+ if let EnqueuedValue::CloseSentinel = value_with_size {
+ return true;
+ }
+
+ // Return valueWithSize’s value.
+ value_with_size.to_jsval(cx, rval);
+ false
+ }
+
/// Only used with native sources.
fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
self.queue
@@ -244,7 +281,7 @@ impl QueueWithSizes {
}
/// <https://streams.spec.whatwg.org/#reset-queue>
- fn reset(&mut self) {
+ pub(crate) fn reset(&mut self) {
self.queue.clear();
self.total_size = Default::default();
}
@@ -409,7 +446,7 @@ impl ReadableStreamDefaultController {
/// <https://streams.spec.whatwg.org/#dequeue-value>
fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue) {
let mut queue = self.queue.borrow_mut();
- queue.dequeue_value(cx, rval);
+ queue.dequeue_value(cx, Some(rval));
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
diff --git a/components/script/dom/writablestream.rs b/components/script/dom/writablestream.rs
new file mode 100644
index 00000000000..e650db4a0ce
--- /dev/null
+++ b/components/script/dom/writablestream.rs
@@ -0,0 +1,970 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::mem;
+use std::ptr::{self};
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::{Heap, JSObject};
+use js::jsval::{JSVal, ObjectValue, UndefinedValue};
+use js::rust::{
+ HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
+ MutableHandleValue as SafeMutableHandleValue,
+};
+
+use crate::dom::bindings::cell::DomRefCell;
+use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
+use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
+use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
+use crate::dom::bindings::conversions::ConversionResult;
+use crate::dom::bindings::error::Error;
+use crate::dom::bindings::import::module::Fallible;
+use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, Reflector};
+use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
+use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
+use crate::dom::writablestreamdefaultcontroller::WritableStreamDefaultController;
+use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
+use crate::realms::{enter_realm, InRealm};
+use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
+
+impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
+
+/// The fulfillment handler for the abort steps of
+/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
+#[derive(JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct AbortAlgorithmFulfillmentHandler {
+ stream: Dom<WritableStream>,
+ #[ignore_malloc_size_of = "Rc is hard"]
+ abort_request_promise: Rc<Promise>,
+}
+
+impl Callback for AbortAlgorithmFulfillmentHandler {
+ fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
+ // Resolve abortRequest’s promise with undefined.
+ self.abort_request_promise.resolve_native(&());
+
+ // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
+ self.stream
+ .as_rooted()
+ .reject_close_and_closed_promise_if_needed(cx);
+ }
+}
+
+impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
+
+/// The rejection handler for the abort steps of
+/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
+#[derive(JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct AbortAlgorithmRejectionHandler {
+ stream: Dom<WritableStream>,
+ #[ignore_malloc_size_of = "Rc is hard"]
+ abort_request_promise: Rc<Promise>,
+}
+
+impl Callback for AbortAlgorithmRejectionHandler {
+ fn callback(
+ &self,
+ cx: SafeJSContext,
+ reason: SafeHandleValue,
+ _realm: InRealm,
+ _can_gc: CanGc,
+ ) {
+ // Reject abortRequest’s promise with reason.
+ self.abort_request_promise.reject_native(&reason);
+
+ // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
+ self.stream
+ .as_rooted()
+ .reject_close_and_closed_promise_if_needed(cx);
+ }
+}
+
+impl js::gc::Rootable for PendingAbortRequest {}
+
+/// <https://streams.spec.whatwg.org/#pending-abort-request>
+#[derive(JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct PendingAbortRequest {
+ /// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ promise: Rc<Promise>,
+
+ /// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
+ #[ignore_malloc_size_of = "mozjs"]
+ reason: Box<Heap<JSVal>>,
+
+ /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
+ was_already_erroring: bool,
+}
+
+/// <https://streams.spec.whatwg.org/#writablestream-state>
+#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
+pub(crate) enum WritableStreamState {
+ #[default]
+ Writable,
+ Closed,
+ Erroring,
+ Errored,
+}
+
+/// <https://streams.spec.whatwg.org/#ws-class>
+#[dom_struct]
+pub struct WritableStream {
+ reflector_: Reflector,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-backpressure>
+ backpressure: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-closerequest>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ close_request: DomRefCell<Option<Rc<Promise>>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-controller>
+ controller: MutNullableDom<WritableStreamDefaultController>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-detached>
+ detached: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
+ pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-state>
+ state: Cell<WritableStreamState>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-storederror>
+ #[ignore_malloc_size_of = "mozjs"]
+ stored_error: Heap<JSVal>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-writer>
+ writer: MutNullableDom<WritableStreamDefaultWriter>,
+
+ /// <https://streams.spec.whatwg.org/#writablestream-writerequests>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
+}
+
+impl WritableStream {
+ #[cfg_attr(crown, allow(crown::unrooted_must_root))]
+ /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
+ fn new_inherited() -> WritableStream {
+ WritableStream {
+ reflector_: Reflector::new(),
+ backpressure: Default::default(),
+ close_request: Default::default(),
+ controller: Default::default(),
+ detached: Default::default(),
+ in_flight_write_request: Default::default(),
+ in_flight_close_request: Default::default(),
+ pending_abort_request: Default::default(),
+ state: Default::default(),
+ stored_error: Default::default(),
+ writer: Default::default(),
+ write_requests: Default::default(),
+ }
+ }
+
+ fn new_with_proto(
+ global: &GlobalScope,
+ proto: Option<SafeHandleObject>,
+ can_gc: CanGc,
+ ) -> DomRoot<WritableStream> {
+ reflect_dom_object_with_proto(
+ Box::new(WritableStream::new_inherited()),
+ global,
+ proto,
+ can_gc,
+ )
+ }
+
+ /// Used as part of
+ /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
+ pub(crate) fn assert_no_controller(&self) {
+ assert!(self.controller.get().is_none());
+ }
+
+ /// Used as part of
+ /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
+ pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
+ self.controller.set(Some(controller));
+ }
+
+ pub(crate) fn is_writable(&self) -> bool {
+ matches!(self.state.get(), WritableStreamState::Writable)
+ }
+
+ pub(crate) fn is_erroring(&self) -> bool {
+ matches!(self.state.get(), WritableStreamState::Erroring)
+ }
+
+ pub(crate) fn is_errored(&self) -> bool {
+ matches!(self.state.get(), WritableStreamState::Errored)
+ }
+
+ pub(crate) fn is_closed(&self) -> bool {
+ matches!(self.state.get(), WritableStreamState::Closed)
+ }
+
+ pub(crate) fn has_in_flight_write_request(&self) -> bool {
+ self.in_flight_write_request.borrow().is_some()
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
+ pub(crate) fn has_operations_marked_inflight(&self) -> bool {
+ let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
+ let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
+
+ in_flight_write_requested || in_flight_close_requested
+ }
+
+ /// <https://streams.spec.whatwg.org/#writablestream-storederror>
+ pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
+ handle_mut.set(self.stored_error.get());
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
+ pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
+ // Assert: stream.[[state]] is "erroring".
+ assert!(self.is_erroring());
+
+ // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
+ assert!(!self.has_operations_marked_inflight());
+
+ // Set stream.[[state]] to "errored".
+ self.state.set(WritableStreamState::Errored);
+
+ // Perform ! stream.[[controller]].[[ErrorSteps]]().
+ let Some(controller) = self.controller.get() else {
+ unreachable!("Stream should have a controller.");
+ };
+ controller.perform_error_steps();
+
+ // Let storedError be stream.[[storedError]].
+ rooted!(in(*cx) let mut stored_error = UndefinedValue());
+ self.get_stored_error(stored_error.handle_mut());
+
+ // For each writeRequest of stream.[[writeRequests]]:
+ let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
+ for request in write_requests {
+ // Reject writeRequest with storedError.
+ request.reject(cx, stored_error.handle());
+ }
+
+ // Set stream.[[writeRequests]] to an empty list.
+ // Done above with `drain`.
+
+ // If stream.[[pendingAbortRequest]] is undefined,
+ if self.pending_abort_request.borrow().is_none() {
+ // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
+ self.reject_close_and_closed_promise_if_needed(cx);
+
+ // Return.
+ return;
+ }
+
+ // Let abortRequest be stream.[[pendingAbortRequest]].
+ // Set stream.[[pendingAbortRequest]] to undefined.
+ rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
+ if let Some(pending_abort_request) = &*pending_abort_request {
+ // If abortRequest’s was already erroring is true,
+ if pending_abort_request.was_already_erroring {
+ // Reject abortRequest’s promise with storedError.
+ pending_abort_request
+ .promise
+ .reject(cx, stored_error.handle());
+
+ // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
+ self.reject_close_and_closed_promise_if_needed(cx);
+
+ // Return.
+ return;
+ }
+
+ // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
+ rooted!(in(*cx) let mut reason = UndefinedValue());
+ reason.set(pending_abort_request.reason.get());
+ let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
+
+ // Upon fulfillment of promise,
+ rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
+ stream: Dom::from_ref(self),
+ abort_request_promise: pending_abort_request.promise.clone(),
+ }));
+
+ // Upon rejection of promise with reason r,
+ rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
+ stream: Dom::from_ref(self),
+ abort_request_promise: pending_abort_request.promise.clone(),
+ }));
+
+ let handler = PromiseNativeHandler::new(
+ global,
+ fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
+ rejection_handler.take().map(|h| Box::new(h) as Box<_>),
+ );
+ let realm = enter_realm(global);
+ let comp = InRealm::Entered(&realm);
+ promise.append_native_handler(&handler, comp, can_gc);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
+ fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext) {
+ // Assert: stream.[[state]] is "errored".
+ assert!(self.is_errored());
+
+ rooted!(in(*cx) let mut stored_error = UndefinedValue());
+ self.get_stored_error(stored_error.handle_mut());
+
+ // If stream.[[closeRequest]] is not undefined
+ let close_request = self.close_request.borrow_mut().take();
+ if let Some(close_request) = close_request {
+ // Assert: stream.[[inFlightCloseRequest]] is undefined.
+ assert!(self.in_flight_close_request.borrow().is_none());
+
+ // Reject stream.[[closeRequest]] with stream.[[storedError]].
+ close_request.reject_native(&stored_error.handle())
+
+ // Set stream.[[closeRequest]] to undefined.
+ // Done with `take` above.
+ }
+
+ // Let writer be stream.[[writer]].
+ // If writer is not undefined,
+ if let Some(writer) = self.writer.get() {
+ // Reject writer.[[closedPromise]] with stream.[[storedError]].
+ writer.reject_closed_promise_with_stored_error(&stored_error.handle());
+
+ // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
+ writer.set_close_promise_is_handled();
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
+ pub(crate) fn close_queued_or_in_flight(&self) -> bool {
+ let close_requested = self.close_request.borrow().is_some();
+ let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
+
+ close_requested || in_flight_close_requested
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
+ pub(crate) fn finish_in_flight_write(&self) {
+ let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
+ // Assert: stream.[[inFlightWriteRequest]] is not undefined.
+ unreachable!("Stream should have a write request");
+ };
+
+ // Resolve stream.[[inFlightWriteRequest]] with undefined.
+ in_flight_write_request.resolve_native(&());
+
+ // Set stream.[[inFlightWriteRequest]] to undefined.
+ // Done above with `take`.
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
+ pub(crate) fn start_erroring(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ error: SafeHandleValue,
+ can_gc: CanGc,
+ ) {
+ // Assert: stream.[[storedError]] is undefined.
+ assert!(self.stored_error.get().is_undefined());
+
+ // Assert: stream.[[state]] is "writable".
+ assert!(self.is_writable());
+
+ // Let controller be stream.[[controller]].
+ let Some(controller) = self.controller.get() else {
+ // Assert: controller is not undefined.
+ unreachable!("Stream should have a controller.");
+ };
+
+ // Set stream.[[state]] to "erroring".
+ self.state.set(WritableStreamState::Erroring);
+
+ // Set stream.[[storedError]] to reason.
+ self.stored_error.set(*error);
+
+ // Let writer be stream.[[writer]].
+ if let Some(writer) = self.writer.get() {
+ // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
+ writer.ensure_ready_promise_rejected(global, error, can_gc);
+ }
+
+ // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
+ if !self.has_operations_marked_inflight() && controller.started() {
+ // perform ! WritableStreamFinishErroring
+ self.finish_erroring(cx, global, can_gc);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
+ pub(crate) fn deal_with_rejection(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ error: SafeHandleValue,
+ can_gc: CanGc,
+ ) {
+ // Let state be stream.[[state]].
+
+ // If state is "writable",
+ if self.is_writable() {
+ // Perform ! WritableStreamStartErroring(stream, error).
+ self.start_erroring(cx, global, error, can_gc);
+
+ // Return.
+ return;
+ }
+
+ // Assert: state is "erroring".
+ assert!(self.is_erroring());
+
+ // Perform ! WritableStreamFinishErroring(stream).
+ self.finish_erroring(cx, global, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
+ pub(crate) fn mark_first_write_request_in_flight(&self) {
+ let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
+ let mut write_requests = self.write_requests.borrow_mut();
+
+ // Assert: stream.[[inFlightWriteRequest]] is undefined.
+ assert!(in_flight_write_request.is_none());
+
+ // Assert: stream.[[writeRequests]] is not empty.
+ assert!(!write_requests.is_empty());
+
+ // Let writeRequest be stream.[[writeRequests]][0].
+ // Remove writeRequest from stream.[[writeRequests]].
+ let write_request = write_requests.pop_front().unwrap();
+
+ // Set stream.[[inFlightWriteRequest]] to writeRequest.
+ *in_flight_write_request = Some(write_request);
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
+ pub(crate) fn mark_close_request_in_flight(&self) {
+ let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
+ let mut close_request = self.close_request.borrow_mut();
+
+ // Assert: stream.[[inFlightCloseRequest]] is undefined.
+ assert!(in_flight_close_request.is_none());
+
+ // Assert: stream.[[closeRequest]] is not undefined.
+ assert!(close_request.is_some());
+
+ // Let closeRequest be stream.[[closeRequest]].
+ // Set stream.[[closeRequest]] to undefined.
+ let close_request = close_request.take().unwrap();
+
+ // Set stream.[[inFlightCloseRequest]] to closeRequest.
+ *in_flight_close_request = Some(close_request);
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
+ pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext) {
+ let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
+ // Assert: stream.[[inFlightCloseRequest]] is not undefined.
+ unreachable!("in_flight_close_request must be Some");
+ };
+
+ // Resolve stream.[[inFlightCloseRequest]] with undefined.
+ in_flight_close_request.resolve_native(&());
+
+ // Set stream.[[inFlightCloseRequest]] to undefined.
+ // Done with take above.
+
+ // Assert: stream.[[state]] is "writable" or "erroring".
+ assert!(self.is_writable() || self.is_erroring());
+
+ // If state is "erroring",
+ if self.is_erroring() {
+ // Set stream.[[storedError]] to undefined.
+ self.stored_error.set(UndefinedValue());
+
+ // If stream.[[pendingAbortRequest]] is not undefined,
+ rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
+ if let Some(pending_abort_request) = &*pending_abort_request {
+ // Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
+ pending_abort_request.promise.resolve_native(&());
+
+ // Set stream.[[pendingAbortRequest]] to undefined.
+ // Done above with `take`.
+ }
+ }
+
+ // Set stream.[[state]] to "closed".
+ self.state.set(WritableStreamState::Closed);
+
+ // Let writer be stream.[[writer]].
+ if let Some(writer) = self.writer.get() {
+ // If writer is not undefined,
+ // resolve writer.[[closedPromise]] with undefined.
+ writer.resolve_closed_promise_with_undefined();
+ }
+
+ // Assert: stream.[[pendingAbortRequest]] is undefined.
+ assert!(self.pending_abort_request.borrow().is_none());
+
+ // Assert: stream.[[storedError]] is undefined.
+ assert!(self.stored_error.get().is_undefined());
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
+ pub(crate) fn finish_in_flight_close_with_error(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ error: SafeHandleValue,
+ can_gc: CanGc,
+ ) {
+ let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
+ // Assert: stream.[[inFlightCloseRequest]] is not undefined.
+ unreachable!("Inflight close request must be defined.");
+ };
+
+ // Reject stream.[[inFlightCloseRequest]] with error.
+ in_flight_close_request.reject_native(&error);
+
+ // Set stream.[[inFlightCloseRequest]] to undefined.
+ // Done above with `take`.
+
+ // Assert: stream.[[state]] is "writable" or "erroring".
+ assert!(self.is_erroring() || self.is_writable());
+
+ // If stream.[[pendingAbortRequest]] is not undefined,
+ rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
+ if let Some(pending_abort_request) = &*pending_abort_request {
+ // Reject stream.[[pendingAbortRequest]]'s promise with error.
+ pending_abort_request.promise.reject_native(&error);
+
+ // Set stream.[[pendingAbortRequest]] to undefined.
+ // Done above with `take`.
+ }
+
+ // Perform ! WritableStreamDealWithRejection(stream, error).
+ self.deal_with_rejection(cx, global, error, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
+ pub(crate) fn finish_in_flight_write_with_error(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ error: SafeHandleValue,
+ can_gc: CanGc,
+ ) {
+ let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
+ // Assert: stream.[[inFlightWriteRequest]] is not undefined.
+ unreachable!("Inflight write request must be defined.");
+ };
+
+ // Reject stream.[[inFlightWriteRequest]] with error.
+ in_flight_write_request.reject_native(&error);
+
+ // Set stream.[[inFlightWriteRequest]] to undefined.
+ // Done above with `take`.
+
+ // Assert: stream.[[state]] is "writable" or "erroring".
+ assert!(self.is_erroring() || self.is_writable());
+
+ // Perform ! WritableStreamDealWithRejection(stream, error).
+ self.deal_with_rejection(cx, global, error, can_gc);
+ }
+
+ pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
+ self.writer.get()
+ }
+
+ pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
+ self.writer.set(writer);
+ }
+
+ pub(crate) fn set_backpressure(&self, backpressure: bool) {
+ self.backpressure.set(backpressure);
+ }
+
+ pub(crate) fn get_backpressure(&self) -> bool {
+ self.backpressure.get()
+ }
+
+ /// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
+ pub(crate) fn is_locked(&self) -> bool {
+ // If stream.[[writer]] is undefined, return false.
+ // Return true.
+ self.get_writer().is_some()
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
+ pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
+ // Assert: ! IsWritableStreamLocked(stream) is true.
+ assert!(self.is_locked());
+
+ // Assert: stream.[[state]] is "writable".
+ assert!(self.is_writable());
+
+ // Let promise be a new promise.
+ let promise = Promise::new(global, can_gc);
+
+ // Append promise to stream.[[writeRequests]].
+ self.write_requests.borrow_mut().push_back(promise.clone());
+
+ // Return promise.
+ promise
+ }
+
+ // Returns the rooted controller of the stream, if any.
+ pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
+ self.controller.get()
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-abort>
+ pub(crate) fn abort(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ provided_reason: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ // If stream.[[state]] is "closed" or "errored",
+ if self.is_closed() || self.is_errored() {
+ // return a promise resolved with undefined.
+ let promise = Promise::new(global, can_gc);
+ promise.resolve_native(&());
+ return promise;
+ }
+
+ // TODO: Signal abort on stream.[[controller]].[[abortController]] with reason.
+
+ // TODO: If state is "closed" or "errored", return a promise resolved with undefined.
+ // Note: state may have changed because of signal above.
+
+ // If stream.[[pendingAbortRequest]] is not undefined,
+ if self.pending_abort_request.borrow().is_some() {
+ // return stream.[[pendingAbortRequest]]'s promise.
+ return self
+ .pending_abort_request
+ .borrow()
+ .as_ref()
+ .expect("Pending abort request must be Some.")
+ .promise
+ .clone();
+ }
+
+ // Assert: state is "writable" or "erroring".
+ assert!(self.is_writable() || self.is_erroring());
+
+ // Let wasAlreadyErroring be false.
+ let mut was_already_erroring = false;
+ rooted!(in(*cx) let undefined_reason = UndefinedValue());
+
+ // If state is "erroring",
+ let reason = if self.is_erroring() {
+ // Set wasAlreadyErroring to true.
+ was_already_erroring = true;
+
+ // Set reason to undefined.
+ undefined_reason.handle()
+ } else {
+ // Use the provided reason.
+ provided_reason
+ };
+
+ // Let promise be a new promise.
+ let promise = Promise::new(global, can_gc);
+
+ // Set stream.[[pendingAbortRequest]] to a new pending abort request
+ // whose promise is promise,
+ // reason is reason,
+ // and was already erroring is wasAlreadyErroring.
+ *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
+ promise: promise.clone(),
+ reason: Heap::boxed(reason.get()),
+ was_already_erroring,
+ });
+
+ // If wasAlreadyErroring is false,
+ if !was_already_erroring {
+ // perform ! WritableStreamStartErroring(stream, reason)
+ self.start_erroring(cx, global, reason, can_gc);
+ }
+
+ // Return promise.
+ promise
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-close>
+ pub(crate) fn close(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ // Let state be stream.[[state]].
+ // If state is "closed" or "errored",
+ if self.is_closed() || self.is_errored() {
+ // return a promise rejected with a TypeError exception.
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(Error::Type("Stream is closed or errored.".to_string()));
+ return promise;
+ }
+
+ // Assert: state is "writable" or "erroring".
+ assert!(self.is_writable() || self.is_erroring());
+
+ // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
+ assert!(!self.close_queued_or_in_flight());
+
+ // Let promise be a new promise.
+ let promise = Promise::new(global, can_gc);
+
+ // Set stream.[[closeRequest]] to promise.
+ *self.close_request.borrow_mut() = Some(promise.clone());
+
+ // Let writer be stream.[[writer]].
+ // If writer is not undefined,
+ if let Some(writer) = self.writer.get() {
+ // and stream.[[backpressure]] is true,
+ // and state is "writable",
+ if self.get_backpressure() && self.is_writable() {
+ // resolve writer.[[readyPromise]] with undefined.
+ writer.resolve_ready_promise_with_undefined();
+ }
+ }
+
+ // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
+ let Some(controller) = self.controller.get() else {
+ unreachable!("Stream must have a controller.");
+ };
+ controller.close(cx, global, can_gc);
+
+ // Return promise.
+ promise
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
+ /// Note: implement as a stream method, as opposed to a writer one, for convenience.
+ pub(crate) fn get_desired_size(&self) -> Option<f64> {
+ // Let stream be writer.[[stream]].
+ // Stream is `self`.
+
+ // Let state be stream.[[state]].
+ // If state is "errored" or "erroring", return null.
+ if self.is_errored() || self.is_erroring() {
+ return None;
+ }
+
+ // If state is "closed", return 0.
+ if self.is_closed() {
+ return Some(0.);
+ }
+
+ let Some(controller) = self.controller.get() else {
+ unreachable!("Stream must have a controller.");
+ };
+ Some(controller.get_desired_size())
+ }
+
+ /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
+ pub(crate) fn aquire_default_writer(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
+ // Let writer be a new WritableStreamDefaultWriter object.
+ let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
+
+ // Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
+ writer.setup(cx, self)?;
+
+ // Return writer.
+ Ok(writer)
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
+ pub(crate) fn update_backpressure(
+ &self,
+ backpressure: bool,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) {
+ // Assert: stream.[[state]] is "writable".
+ self.is_writable();
+
+ // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
+ assert!(!self.close_queued_or_in_flight());
+
+ // Let writer be stream.[[writer]].
+ let writer = self.get_writer();
+ if writer.is_some() && backpressure != self.get_backpressure() {
+ // If writer is not undefined
+ let writer = writer.expect("Writer is some, as per the above check.");
+ // and backpressure is not stream.[[backpressure]],
+ if backpressure {
+ // If backpressure is true, set writer.[[readyPromise]] to a new promise.
+ let promise = Promise::new(global, can_gc);
+ writer.set_ready_promise(promise);
+ } else {
+ // Otherwise,
+ // Assert: backpressure is false.
+ assert!(!backpressure);
+ // Resolve writer.[[readyPromise]] with undefined.
+ writer.resolve_ready_promise_with_undefined();
+ }
+ };
+
+ // Set stream.[[backpressure]] to backpressure.
+ self.set_backpressure(backpressure);
+ }
+}
+
+impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
+ /// <https://streams.spec.whatwg.org/#ws-constructor>
+ fn Constructor(
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ proto: Option<SafeHandleObject>,
+ can_gc: CanGc,
+ underlying_sink: Option<*mut JSObject>,
+ strategy: &QueuingStrategy,
+ ) -> Fallible<DomRoot<WritableStream>> {
+ // If underlyingSink is missing, set it to null.
+ rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
+
+ // Let underlyingSinkDict be underlyingSink,
+ // converted to an IDL value of type UnderlyingSink.
+ let underlying_sink_dict = if !underlying_sink_obj.is_null() {
+ rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
+ match UnderlyingSink::new(cx, obj_val.handle()) {
+ Ok(ConversionResult::Success(val)) => val,
+ Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
+ _ => {
+ return Err(Error::JSFailed);
+ },
+ }
+ } else {
+ UnderlyingSink::empty()
+ };
+
+ if !underlying_sink_dict.type_.handle().is_undefined() {
+ // If underlyingSinkDict["type"] exists, throw a RangeError exception.
+ return Err(Error::Range("type is set".to_string()));
+ }
+
+ // Perform ! InitializeWritableStream(this).
+ let stream = WritableStream::new_with_proto(global, proto, can_gc);
+
+ // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
+ let size_algorithm = extract_size_algorithm(strategy);
+
+ // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
+ let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
+
+ // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink
+ let controller = WritableStreamDefaultController::new(
+ global,
+ &underlying_sink_dict,
+ high_water_mark,
+ size_algorithm,
+ can_gc,
+ );
+
+ // Note: this must be done before `setup`,
+ // otherwise `thisOb` is null in the start callback.
+ controller.set_underlying_sink_this_object(underlying_sink_obj.handle());
+
+ // Perform ? SetUpWritableStreamDefaultController
+ controller.setup(cx, global, &stream, &underlying_sink_dict.start, can_gc)?;
+
+ Ok(stream)
+ }
+
+ /// <https://streams.spec.whatwg.org/#ws-locked>
+ fn Locked(&self) -> bool {
+ // Return ! IsWritableStreamLocked(this).
+ self.is_locked()
+ }
+
+ /// <https://streams.spec.whatwg.org/#ws-abort>
+ fn Abort(
+ &self,
+ cx: SafeJSContext,
+ reason: SafeHandleValue,
+ realm: InRealm,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // If ! IsWritableStreamLocked(this) is true,
+ if self.is_locked() {
+ // return a promise rejected with a TypeError exception.
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_error(Error::Type("Stream is locked.".to_string()));
+ return promise;
+ }
+
+ // Return ! WritableStreamAbort(this, reason).
+ self.abort(cx, &global, reason, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#ws-close>
+ fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
+ let cx = GlobalScope::get_cx();
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // If ! IsWritableStreamLocked(this) is true,
+ if self.is_locked() {
+ // return a promise rejected with a TypeError exception.
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_error(Error::Type("Stream is locked.".to_string()));
+ return promise;
+ }
+
+ // If ! WritableStreamCloseQueuedOrInFlight(this) is true
+ if self.close_queued_or_in_flight() {
+ // return a promise rejected with a TypeError exception.
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_error(Error::Type(
+ "Stream has closed queued or in-flight".to_string(),
+ ));
+ return promise;
+ }
+
+ // Return ! WritableStreamClose(this).
+ self.close(cx, &global, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#ws-get-writer>
+ fn GetWriter(
+ &self,
+ realm: InRealm,
+ can_gc: CanGc,
+ ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
+ let cx = GlobalScope::get_cx();
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // Return ? AcquireWritableStreamDefaultWriter(this).
+ self.aquire_default_writer(cx, &global, can_gc)
+ }
+}
diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs
new file mode 100644
index 00000000000..cd027907211
--- /dev/null
+++ b/components/script/dom/writablestreamdefaultcontroller.rs
@@ -0,0 +1,829 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+use std::cell::{Cell, RefCell};
+use std::ptr;
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::{Heap, IsPromiseObject, JSObject};
+use js::jsval::{JSVal, UndefinedValue};
+use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
+
+use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
+use crate::dom::bindings::callback::ExceptionHandling;
+use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
+ UnderlyingSink, UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback,
+ UnderlyingSinkStartCallback, UnderlyingSinkWriteCallback,
+};
+use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
+use crate::dom::bindings::error::Error;
+use crate::dom::bindings::reflector::{reflect_dom_object, Reflector};
+use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
+use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
+use crate::dom::writablestream::WritableStream;
+use crate::realms::{enter_realm, InRealm};
+use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
+
+impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
+
+/// The fulfillment handler for
+/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct CloseAlgorithmFulfillmentHandler {
+ stream: Dom<WritableStream>,
+}
+
+impl Callback for CloseAlgorithmFulfillmentHandler {
+ fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
+ let stream = self.stream.as_rooted();
+
+ // Perform ! WritableStreamFinishInFlightClose(stream).
+ stream.finish_in_flight_close(cx);
+ }
+}
+
+impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
+
+/// The rejection handler for
+/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct CloseAlgorithmRejectionHandler {
+ stream: Dom<WritableStream>,
+}
+
+impl Callback for CloseAlgorithmRejectionHandler {
+ fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
+ let stream = self.stream.as_rooted();
+
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason).
+ stream.finish_in_flight_close_with_error(cx, &global, v, can_gc);
+ }
+}
+
+impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
+
+/// The fulfillment handler for
+/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct StartAlgorithmFulfillmentHandler {
+ controller: Dom<WritableStreamDefaultController>,
+}
+
+impl Callback for StartAlgorithmFulfillmentHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
+ /// Upon fulfillment of startPromise,
+ fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
+ let controller = self.controller.as_rooted();
+ let stream = controller
+ .stream
+ .get()
+ .expect("Controller should have a stream.");
+
+ // Assert: stream.[[state]] is "writable" or "erroring".
+ assert!(stream.is_erroring() || stream.is_writable());
+
+ // Set controller.[[started]] to true.
+ controller.started.set(true);
+
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
+ controller.advance_queue_if_needed(cx, &global, can_gc)
+ }
+}
+
+impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
+
+/// The rejection handler for
+/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct StartAlgorithmRejectionHandler {
+ controller: Dom<WritableStreamDefaultController>,
+}
+
+impl Callback for StartAlgorithmRejectionHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
+ /// Upon rejection of startPromise with reason r,
+ fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
+ let controller = self.controller.as_rooted();
+ let stream = controller
+ .stream
+ .get()
+ .expect("Controller should have a stream.");
+
+ // Assert: stream.[[state]] is "writable" or "erroring".
+ assert!(stream.is_erroring() || stream.is_writable());
+
+ // Set controller.[[started]] to true.
+ controller.started.set(true);
+
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // Perform ! WritableStreamDealWithRejection(stream, r).
+ stream.deal_with_rejection(cx, &global, v, can_gc);
+ }
+}
+
+impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
+
+/// The fulfillment handler for
+/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct WriteAlgorithmFulfillmentHandler {
+ controller: Dom<WritableStreamDefaultController>,
+}
+
+impl Callback for WriteAlgorithmFulfillmentHandler {
+ fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
+ let controller = self.controller.as_rooted();
+ let stream = controller
+ .stream
+ .get()
+ .expect("Controller should have a stream.");
+
+ // Perform ! WritableStreamFinishInFlightWrite(stream).
+ stream.finish_in_flight_write();
+
+ // Let state be stream.[[state]].
+ // Assert: state is "writable" or "erroring".
+ assert!(stream.is_erroring() || stream.is_writable());
+
+ // Perform ! DequeueValue(controller).
+ {
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ let mut queue = controller.queue.borrow_mut();
+ queue.dequeue_value(cx, Some(rval.handle_mut()));
+ }
+
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "writable",
+ if !stream.close_queued_or_in_flight() && stream.is_writable() {
+ // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
+ let backpressure = controller.get_backpressure();
+
+ // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
+ stream.update_backpressure(backpressure, &global, can_gc);
+ }
+
+ // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
+ controller.advance_queue_if_needed(cx, &global, can_gc)
+ }
+}
+
+impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
+
+/// The rejection handler for
+/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
+struct WriteAlgorithmRejectionHandler {
+ controller: Dom<WritableStreamDefaultController>,
+}
+
+impl Callback for WriteAlgorithmRejectionHandler {
+ fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
+ let controller = self.controller.as_rooted();
+ let stream = controller
+ .stream
+ .get()
+ .expect("Controller should have a stream.");
+
+ // If stream.[[state]] is "writable",
+ if stream.is_writable() {
+ // perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
+ controller.clear_algorithms();
+ }
+
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason).
+ stream.finish_in_flight_write_with_error(cx, &global, v, can_gc);
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
+#[dom_struct]
+pub struct WritableStreamDefaultController {
+ reflector_: Reflector,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
+
+ /// The JS object used as `this` when invoking sink algorithms.
+ #[ignore_malloc_size_of = "mozjs"]
+ underlying_sink_obj: Heap<*mut JSObject>,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-queue>
+ queue: RefCell<QueueWithSizes>,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-started>
+ started: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategyhwm>
+ strategy_hwm: f64,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategysizealgorithm>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-stream>
+ stream: MutNullableDom<WritableStream>,
+}
+
+impl WritableStreamDefaultController {
+ /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
+ #[cfg_attr(crown, allow(crown::unrooted_must_root))]
+ fn new_inherited(
+ underlying_sink: &UnderlyingSink,
+ strategy_hwm: f64,
+ strategy_size: Rc<QueuingStrategySize>,
+ ) -> WritableStreamDefaultController {
+ WritableStreamDefaultController {
+ reflector_: Reflector::new(),
+ queue: Default::default(),
+ stream: Default::default(),
+ abort: RefCell::new(underlying_sink.abort.clone()),
+ close: RefCell::new(underlying_sink.close.clone()),
+ write: RefCell::new(underlying_sink.write.clone()),
+ underlying_sink_obj: Default::default(),
+ strategy_hwm,
+ strategy_size: RefCell::new(Some(strategy_size)),
+ started: Default::default(),
+ }
+ }
+
+ pub(crate) fn new(
+ global: &GlobalScope,
+ underlying_sink: &UnderlyingSink,
+ strategy_hwm: f64,
+ strategy_size: Rc<QueuingStrategySize>,
+ can_gc: CanGc,
+ ) -> DomRoot<WritableStreamDefaultController> {
+ reflect_dom_object(
+ Box::new(WritableStreamDefaultController::new_inherited(
+ underlying_sink,
+ strategy_hwm,
+ strategy_size,
+ )),
+ global,
+ can_gc,
+ )
+ }
+
+ pub(crate) fn started(&self) -> bool {
+ self.started.get()
+ }
+
+ /// Setting the JS object after the heap has settled down.
+ pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
+ self.underlying_sink_obj.set(*this_object);
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms>
+ fn clear_algorithms(&self) {
+ // Set controller.[[writeAlgorithm]] to undefined.
+ self.write.borrow_mut().take();
+
+ // Set controller.[[closeAlgorithm]] to undefined.
+ self.close.borrow_mut().take();
+
+ // Set controller.[[abortAlgorithm]] to undefined.
+ self.abort.borrow_mut().take();
+
+ // Set controller.[[strategySizeAlgorithm]] to undefined.
+ self.strategy_size.borrow_mut().take();
+ }
+
+ /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controllerr>
+ #[allow(unsafe_code)]
+ pub(crate) fn setup(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ stream: &WritableStream,
+ start: &Option<Rc<UnderlyingSinkStartCallback>>,
+ can_gc: CanGc,
+ ) -> Result<(), Error> {
+ // Assert: stream implements WritableStream.
+ // Implied by stream type.
+
+ // Assert: stream.[[controller]] is undefined.
+ stream.assert_no_controller();
+
+ // Set controller.[[stream]] to stream.
+ self.stream.set(Some(stream));
+
+ // Set stream.[[controller]] to controller.
+ stream.set_default_controller(self);
+
+ // Perform ! ResetQueue(controller).
+
+ // Set controller.[[abortController]] to a new AbortController.
+
+ // Set controller.[[started]] to false.
+
+ // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
+
+ // Set controller.[[strategyHWM]] to highWaterMark.
+
+ // Set controller.[[writeAlgorithm]] to writeAlgorithm.
+
+ // Set controller.[[closeAlgorithm]] to closeAlgorithm.
+
+ // Set controller.[[abortAlgorithm]] to abortAlgorithm.
+
+ // Note: above steps are done in `new_inherited`.
+
+ // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
+ let backpressure = self.get_backpressure();
+
+ // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
+ stream.update_backpressure(backpressure, global, can_gc);
+
+ // Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
+ // Let startPromise be a promise resolved with startResult.
+ let start_promise = if let Some(start) = start {
+ rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
+ rooted!(in(*cx) let mut result: JSVal);
+ rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
+ start.Call_(
+ &this_object.handle(),
+ self,
+ result.handle_mut(),
+ ExceptionHandling::Rethrow,
+ )?;
+ let is_promise = unsafe {
+ if result.is_object() {
+ result_object.set(result.to_object());
+ IsPromiseObject(result_object.handle().into_handle())
+ } else {
+ false
+ }
+ };
+ if is_promise {
+ let promise = Promise::new_with_js_promise(result_object.handle(), cx);
+ promise
+ } else {
+ let promise = Promise::new(global, can_gc);
+ promise.resolve_native(&result.get());
+ promise
+ }
+ } else {
+ let promise = Promise::new(global, can_gc);
+ promise.resolve_native(&());
+ promise
+ };
+
+ let rooted_default_controller = DomRoot::from_ref(self);
+
+ // Upon fulfillment of startPromise,
+ rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
+ controller: Dom::from_ref(&rooted_default_controller),
+ }));
+
+ // Upon rejection of startPromise with reason r,
+ rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
+ controller: Dom::from_ref(&rooted_default_controller),
+ }));
+
+ let handler = PromiseNativeHandler::new(
+ global,
+ fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
+ rejection_handler.take().map(|h| Box::new(h) as Box<_>),
+ );
+ let realm = enter_realm(global);
+ let comp = InRealm::Entered(&realm);
+ start_promise.append_native_handler(&handler, comp, can_gc);
+
+ Ok(())
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-close>
+ pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
+ // Perform ! EnqueueValueWithSize(controller, close sentinel, 0).
+ {
+ let mut queue = self.queue.borrow_mut();
+ queue
+ .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
+ .expect("Enqueuing the close sentinel should not fail.");
+ }
+ // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
+ self.advance_queue_if_needed(cx, global, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-writablestreamcontroller-abortsteps>
+ pub(crate) fn abort_steps(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ reason: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
+ let algo = self.abort.borrow().clone();
+ let result = if let Some(algo) = algo {
+ algo.Call_(
+ &this_object.handle(),
+ Some(reason),
+ ExceptionHandling::Rethrow,
+ )
+ } else {
+ let promise = Promise::new(global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e);
+ promise
+ })
+ }
+
+ pub(crate) fn call_write_algorithm(
+ &self,
+ cx: SafeJSContext,
+ chunk: SafeHandleValue,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
+ let algo = self.write.borrow().clone();
+ let result = if let Some(algo) = algo {
+ algo.Call_(
+ &this_object.handle(),
+ chunk,
+ self,
+ ExceptionHandling::Rethrow,
+ )
+ } else {
+ let promise = Promise::new(global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e);
+ promise
+ })
+ }
+
+ fn call_close_algorithm(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
+ this_object.set(self.underlying_sink_obj.get());
+ let algo = self.close.borrow().clone();
+ let result = if let Some(algo) = algo {
+ algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow)
+ } else {
+ let promise = Promise::new(global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e);
+ promise
+ })
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
+ pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
+ // Let stream be controller.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ unreachable!("Controller should have a stream");
+ };
+
+ // Perform ! WritableStreamMarkCloseRequestInFlight(stream).
+ stream.mark_close_request_in_flight();
+
+ // Perform ! DequeueValue(controller).
+ {
+ let mut queue = self.queue.borrow_mut();
+ queue.dequeue_value(cx, None);
+ }
+
+ // Assert: controller.[[queue]] is empty.
+ assert!(self.queue.borrow().is_empty());
+
+ // Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]].
+ let sink_close_promise = self.call_close_algorithm(cx, global, can_gc);
+
+ // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ // Upon fulfillment of sinkClosePromise,
+ rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
+ stream: Dom::from_ref(&stream),
+ }));
+
+ // Upon rejection of sinkClosePromise with reason reason,
+ rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
+ stream: Dom::from_ref(&stream),
+ }));
+
+ // Attach handlers to the promise.
+ let handler = PromiseNativeHandler::new(
+ global,
+ fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
+ rejection_handler.take().map(|h| Box::new(h) as Box<_>),
+ );
+ let realm = enter_realm(global);
+ let comp = InRealm::Entered(&realm);
+ sink_close_promise.append_native_handler(&handler, comp, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed>
+ fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
+ // Let stream be controller.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ unreachable!("Controller should have a stream");
+ };
+
+ // If controller.[[started]] is false, return.
+ if !self.started.get() {
+ return;
+ }
+
+ // If stream.[[inFlightWriteRequest]] is not undefined, return.
+ if stream.has_in_flight_write_request() {
+ return;
+ }
+
+ // Let state be stream.[[state]].
+
+ // Assert: state is not "closed" or "errored".
+ assert!(!(stream.is_errored() || stream.is_closed()));
+
+ // If state is "erroring",
+ if stream.is_erroring() {
+ // Perform ! WritableStreamFinishErroring(stream).
+ stream.finish_erroring(cx, global, can_gc);
+
+ // Return.
+ return;
+ }
+
+ // Let value be ! PeekQueueValue(controller).
+ rooted!(in(*cx) let mut value = UndefinedValue());
+ let is_closed = {
+ let queue = self.queue.borrow_mut();
+
+ // If controller.[[queue]] is empty, return.
+ if queue.is_empty() {
+ return;
+ }
+ queue.peek_queue_value(cx, value.handle_mut())
+ };
+
+ if is_closed {
+ // If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller).
+ self.process_close(cx, global, can_gc);
+ } else {
+ // Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value).
+ self.process_write(cx, value.handle(), global, can_gc);
+ };
+ }
+
+ /// <https://streams.spec.whatwg.org/#ws-default-controller-private-error>
+ pub(crate) fn perform_error_steps(&self) {
+ // Perform ! ResetQueue(this).
+ self.queue.borrow_mut().reset();
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
+ fn process_write(
+ &self,
+ cx: SafeJSContext,
+ chunk: SafeHandleValue,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) {
+ // Let stream be controller.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ unreachable!("Controller should have a stream");
+ };
+
+ // Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream).
+ stream.mark_first_write_request_in_flight();
+
+ // Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk.
+ let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc);
+
+ // Upon fulfillment of sinkWritePromise,
+ rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
+ controller: Dom::from_ref(self),
+ }));
+
+ // Upon rejection of sinkWritePromise with reason,
+ rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
+ controller: Dom::from_ref(self),
+ }));
+
+ // Attach handlers to the promise.
+ let handler = PromiseNativeHandler::new(
+ global,
+ fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
+ rejection_handler.take().map(|h| Box::new(h) as Box<_>),
+ );
+ let realm = enter_realm(global);
+ let comp = InRealm::Entered(&realm);
+ sink_write_promise.append_native_handler(&handler, comp, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size>
+ pub(crate) fn get_desired_size(&self) -> f64 {
+ // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
+ let queue = self.queue.borrow();
+ let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX);
+ desired_size.clamp(desired_size, self.strategy_hwm)
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure>
+ fn get_backpressure(&self) -> bool {
+ // Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
+ let desired_size = self.get_desired_size();
+
+ // Return true if desiredSize ≤ 0, or false otherwise.
+ desired_size == 0.0 || desired_size.is_sign_negative()
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size>
+ pub(crate) fn get_chunk_size(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ chunk: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> f64 {
+ // If controller.[[strategySizeAlgorithm]] is undefined,
+ let Some(strategy_size) = self.strategy_size.borrow().clone() else {
+ // Assert: controller.[[stream]].[[state]] is "erroring" or "errored".
+ let Some(stream) = self.stream.get() else {
+ unreachable!("Controller should have a stream");
+ };
+ assert!(stream.is_erroring() || stream.is_errored());
+
+ // Return 1.
+ return 1.0;
+ };
+
+ // Let returnValue be the result of performing controller.[[strategySizeAlgorithm]],
+ // passing in chunk, and interpreting the result as a completion record.
+ let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow);
+
+ match result {
+ // Let chunkSize be result.[[Value]].
+ Ok(size) => size,
+ Err(error) => {
+ // If result is an abrupt completion,
+
+ // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]).
+ // Create a rooted value for the error.
+ rooted!(in(*cx) let mut rooted_error = UndefinedValue());
+ error.to_jsval(cx, global, rooted_error.handle_mut());
+ self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
+
+ // Return 1.
+ 1.0
+ },
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-write>
+ pub(crate) fn write(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ chunk: SafeHandleValue,
+ chunk_size: f64,
+ can_gc: CanGc,
+ ) {
+ // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
+ let enqueue_result = {
+ let mut queue = self.queue.borrow_mut();
+ queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
+ value: Heap::boxed(chunk.get()),
+ size: chunk_size,
+ }))
+ };
+
+ // If enqueueResult is an abrupt completion,
+ if let Err(error) = enqueue_result {
+ // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]).
+ // Create a rooted value for the error.
+ rooted!(in(*cx) let mut rooted_error = UndefinedValue());
+ error.to_jsval(cx, global, rooted_error.handle_mut());
+ self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
+
+ // Return.
+ return;
+ }
+
+ // Let stream be controller.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ unreachable!("Controller should have a stream");
+ };
+
+ // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "writable",
+ if !stream.close_queued_or_in_flight() && stream.is_writable() {
+ // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
+ let backpressure = self.get_backpressure();
+
+ // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
+ stream.update_backpressure(backpressure, global, can_gc);
+ }
+
+ // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
+ self.advance_queue_if_needed(cx, global, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed>
+ pub(crate) fn error_if_needed(
+ &self,
+ cx: SafeJSContext,
+ error: SafeHandleValue,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) {
+ // Let stream be controller.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ unreachable!("Controller should have a stream");
+ };
+
+ // If stream.[[state]] is "writable",
+ if stream.is_writable() {
+ // Perform ! WritableStreamDefaultControllerError(controller, e).
+ self.error(&stream, cx, error, global, can_gc);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error>
+ fn error(
+ &self,
+ stream: &WritableStream,
+ cx: SafeJSContext,
+ e: SafeHandleValue,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) {
+ // Let stream be controller.[[stream]].
+ // Done above with the argument.
+
+ // Assert: stream.[[state]] is "writable".
+ assert!(stream.is_writable());
+
+ // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ // Perform ! WritableStreamStartErroring(stream, error).
+ stream.start_erroring(cx, global, e, can_gc);
+ }
+}
+
+impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
+ for WritableStreamDefaultController
+{
+ /// <https://streams.spec.whatwg.org/#ws-default-controller-error>
+ fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
+ // Let state be this.[[stream]].[[state]].
+ let Some(stream) = self.stream.get() else {
+ unreachable!("Controller should have a stream");
+ };
+
+ // If state is not "writable", return.
+ if !stream.is_writable() {
+ return;
+ }
+
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // Perform ! WritableStreamDefaultControllerError(this, e).
+ self.error(&stream, cx, e, &global, can_gc);
+ }
+}
diff --git a/components/script/dom/writablestreamdefaultwriter.rs b/components/script/dom/writablestreamdefaultwriter.rs
new file mode 100644
index 00000000000..2a01f5347c6
--- /dev/null
+++ b/components/script/dom/writablestreamdefaultwriter.rs
@@ -0,0 +1,500 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsval::UndefinedValue;
+use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
+
+use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
+use crate::dom::bindings::error::Error;
+use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, DomGlobal, Reflector};
+use crate::dom::bindings::root::{DomRoot, MutNullableDom};
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::writablestream::WritableStream;
+use crate::realms::InRealm;
+use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
+
+/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
+#[dom_struct]
+pub struct WritableStreamDefaultWriter {
+ reflector_: Reflector,
+
+ #[ignore_malloc_size_of = "Rc is hard"]
+ ready_promise: RefCell<Rc<Promise>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ closed_promise: RefCell<Rc<Promise>>,
+
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
+ stream: MutNullableDom<WritableStream>,
+}
+
+impl WritableStreamDefaultWriter {
+ #[cfg_attr(crown, allow(crown::unrooted_must_root))]
+ /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
+ /// The parts that create a new promise.
+ fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
+ WritableStreamDefaultWriter {
+ reflector_: Reflector::new(),
+ stream: Default::default(),
+ closed_promise: RefCell::new(Promise::new(global, can_gc)),
+ ready_promise: RefCell::new(Promise::new(global, can_gc)),
+ }
+ }
+
+ pub(crate) fn new(
+ global: &GlobalScope,
+ proto: Option<SafeHandleObject>,
+ can_gc: CanGc,
+ ) -> DomRoot<WritableStreamDefaultWriter> {
+ reflect_dom_object_with_proto(
+ Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
+ global,
+ proto,
+ can_gc,
+ )
+ }
+
+ /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
+ /// Continuing from `new_inherited`, the rest.
+ pub(crate) fn setup(&self, cx: SafeJSContext, stream: &WritableStream) -> Result<(), Error> {
+ // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
+ if stream.is_locked() {
+ return Err(Error::Type("Stream is locked".to_string()));
+ }
+
+ // Set writer.[[stream]] to stream.
+ self.stream.set(Some(stream));
+
+ // Set stream.[[writer]] to writer.
+ stream.set_writer(Some(self));
+
+ // Let state be stream.[[state]].
+
+ // If state is "writable",
+ if stream.is_writable() {
+ // If ! WritableStreamCloseQueuedOrInFlight(stream) is false
+ // and stream.[[backpressure]] is true,
+ if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
+ // set writer.[[readyPromise]] to a new promise.
+ // Done in `new_inherited`.
+ } else {
+ // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
+ // Note: new promise created in `new_inherited`.
+ self.ready_promise.borrow().resolve_native(&());
+ }
+
+ // Set writer.[[closedPromise]] to a new promise.
+ // Done in `new_inherited`.
+ return Ok(());
+ }
+
+ // Otherwise, if state is "erroring",
+ if stream.is_erroring() {
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ stream.get_stored_error(error.handle_mut());
+
+ // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
+ // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
+ // Note: new promise created in `new_inherited`.
+ let ready_promise = self.ready_promise.borrow();
+ ready_promise.reject_native(&error.handle());
+ ready_promise.set_promise_is_handled();
+
+ // Set writer.[[closedPromise]] to a new promise.
+ // Done in `new_inherited`.
+ return Ok(());
+ }
+
+ // Otherwise, if state is "closed",
+ if stream.is_closed() {
+ // Set writer.[[readyPromise]] to a promise resolved with undefined.
+ // Note: new promise created in `new_inherited`.
+ self.ready_promise.borrow().resolve_native(&());
+
+ // Set writer.[[closedPromise]] to a promise resolved with undefined.
+ // Note: new promise created in `new_inherited`.
+ self.closed_promise.borrow().resolve_native(&());
+ return Ok(());
+ }
+
+ // Otherwise,
+ // Assert: state is "errored".
+ assert!(stream.is_errored());
+
+ // Let storedError be stream.[[storedError]].
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ stream.get_stored_error(error.handle_mut());
+
+ // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
+ // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
+ // Note: new promise created in `new_inherited`.
+ let ready_promise = self.ready_promise.borrow();
+ ready_promise.reject_native(&error.handle());
+ ready_promise.set_promise_is_handled();
+
+ // Set writer.[[closedPromise]] to a promise rejected with storedError.
+ // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
+ // Note: new promise created in `new_inherited`.
+ let ready_promise = self.closed_promise.borrow();
+ ready_promise.reject_native(&error.handle());
+ ready_promise.set_promise_is_handled();
+
+ Ok(())
+ }
+
+ pub(crate) fn reject_closed_promise_with_stored_error(&self, error: &SafeHandleValue) {
+ self.closed_promise.borrow().reject_native(error);
+ }
+
+ pub(crate) fn set_close_promise_is_handled(&self) {
+ self.closed_promise.borrow().set_promise_is_handled();
+ }
+
+ pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
+ *self.ready_promise.borrow_mut() = promise;
+ }
+
+ pub(crate) fn resolve_ready_promise_with_undefined(&self) {
+ self.ready_promise.borrow().resolve_native(&());
+ }
+
+ pub(crate) fn resolve_closed_promise_with_undefined(&self) {
+ self.closed_promise.borrow().resolve_native(&());
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
+ pub(crate) fn ensure_ready_promise_rejected(
+ &self,
+ global: &GlobalScope,
+ error: SafeHandleValue,
+ can_gc: CanGc,
+ ) {
+ let ready_promise = self.ready_promise.borrow().clone();
+
+ // If writer.[[readyPromise]].[[PromiseState]] is "pending",
+ if ready_promise.is_pending() {
+ // reject writer.[[readyPromise]] with error.
+ ready_promise.reject_native(&error);
+
+ // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
+ ready_promise.set_promise_is_handled();
+ } else {
+ // Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
+ let promise = Promise::new(global, can_gc);
+ promise.reject_native(&error);
+
+ // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
+ promise.set_promise_is_handled();
+ *self.ready_promise.borrow_mut() = promise;
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
+ pub(crate) fn ensure_closed_promise_rejected(
+ &self,
+ global: &GlobalScope,
+ error: SafeHandleValue,
+ can_gc: CanGc,
+ ) {
+ let closed_promise = self.closed_promise.borrow().clone();
+
+ // If writer.[[closedPromise]].[[PromiseState]] is "pending",
+ if closed_promise.is_pending() {
+ // reject writer.[[closedPromise]] with error.
+ closed_promise.reject_native(&error);
+
+ // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
+ closed_promise.set_promise_is_handled();
+ } else {
+ // Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
+ let promise = Promise::new(global, can_gc);
+ promise.reject_native(&error);
+
+ // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
+ promise.set_promise_is_handled();
+ *self.closed_promise.borrow_mut() = promise;
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
+ fn abort(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ reason: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ // Let stream be writer.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ // Assert: stream is not undefined.
+ unreachable!("Stream should be set.");
+ };
+
+ // Return ! WritableStreamAbort(stream, reason).
+ stream.abort(cx, global, reason, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
+ fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
+ // Let stream be writer.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ // Assert: stream is not undefined.
+ unreachable!("Stream should be set.");
+ };
+
+ // Return ! WritableStreamClose(stream).
+ stream.close(cx, global, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
+ fn write(
+ &self,
+ cx: SafeJSContext,
+ global: &GlobalScope,
+ chunk: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ // Let stream be writer.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ // Assert: stream is not undefined.
+ unreachable!("Stream should be set.");
+ };
+
+ // Let controller be stream.[[controller]].
+ // Note: asserting controller is some.
+ let Some(controller) = stream.get_controller() else {
+ unreachable!("Controller should be set.");
+ };
+
+ // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
+ let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc);
+
+ // If stream is not equal to writer.[[stream]],
+ // return a promise rejected with a TypeError exception.
+ if !self
+ .stream
+ .get()
+ .map_or(false, |current_stream| current_stream == stream)
+ {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(Error::Type(
+ "Stream is not equal to writer stream".to_string(),
+ ));
+ return promise;
+ }
+
+ // Let state be stream.[[state]].
+ // If state is "errored",
+ if stream.is_errored() {
+ // return a promise rejected with stream.[[storedError]].
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ stream.get_stored_error(error.handle_mut());
+ let promise = Promise::new(global, can_gc);
+ promise.reject_native(&error.handle());
+ return promise;
+ }
+
+ // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
+ // or state is "closed",
+ if stream.close_queued_or_in_flight() || stream.is_closed() {
+ // return a promise rejected with a TypeError exception
+ // indicating that the stream is closing or closed
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(Error::Type(
+ "Stream has been closed, or has close queued or in-flight".to_string(),
+ ));
+ return promise;
+ }
+
+ // If state is "erroring",
+ if stream.is_erroring() {
+ // return a promise rejected with stream.[[storedError]].
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ stream.get_stored_error(error.handle_mut());
+ let promise = Promise::new(global, can_gc);
+ promise.reject_native(&error.handle());
+ return promise;
+ }
+
+ // Assert: state is "writable".
+ assert!(stream.is_writable());
+
+ // Let promise be ! WritableStreamAddWriteRequest(stream).
+ let promise = stream.add_write_request(global, can_gc);
+
+ // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
+ controller.write(cx, global, chunk, chunk_size, can_gc);
+
+ // Return promise.
+ promise
+ }
+
+ /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
+ pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
+ // Let stream be this.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ // Assert: stream is not undefined.
+ unreachable!("Stream should be set.");
+ };
+
+ // Assert: stream.[[writer]] is writer.
+ assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
+
+ // Let releasedError be a new TypeError.
+ let released_error = Error::Type("Writer has been released".to_string());
+
+ // Root the js val of the error.
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ released_error.to_jsval(cx, global, error.handle_mut());
+
+ // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
+ self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
+
+ // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
+ self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
+
+ // Set stream.[[writer]] to undefined.
+ stream.set_writer(None);
+
+ // Set this.[[stream]] to undefined.
+ self.stream.set(None);
+ }
+}
+
+impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
+ /// <https://streams.spec.whatwg.org/#default-writer-closed>
+ fn Closed(&self) -> Rc<Promise> {
+ // Return this.[[closedPromise]].
+ return self.closed_promise.borrow().clone();
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-writer-desired-size>
+ fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
+ // If this.[[stream]] is undefined, throw a TypeError exception.
+ let Some(stream) = self.stream.get() else {
+ return Err(Error::Type("Stream is undefined".to_string()));
+ };
+
+ // Return ! WritableStreamDefaultWriterGetDesiredSize(this).
+ Ok(stream.get_desired_size())
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-writer-ready>
+ fn Ready(&self) -> Rc<Promise> {
+ // Return this.[[readyPromise]].
+ return self.ready_promise.borrow().clone();
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-writer-abort>
+ fn Abort(
+ &self,
+ cx: SafeJSContext,
+ reason: SafeHandleValue,
+ realm: InRealm,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // If this.[[stream]] is undefined,
+ if self.stream.get().is_none() {
+ // return a promise rejected with a TypeError exception.
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_error(Error::Type("Stream is undefined".to_string()));
+ return promise;
+ }
+
+ // Return ! WritableStreamDefaultWriterAbort(this, reason).
+ self.abort(cx, &global, reason, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-writer-close>
+ fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
+ let cx = GlobalScope::get_cx();
+ let global = GlobalScope::from_safe_context(cx, in_realm);
+ let promise = Promise::new(&global, can_gc);
+
+ // Let stream be this.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ // If stream is undefined,
+ // return a promise rejected with a TypeError exception.
+ promise.reject_error(Error::Type("Stream is undefined".to_string()));
+ return promise;
+ };
+
+ // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
+ if stream.close_queued_or_in_flight() {
+ // return a promise rejected with a TypeError exception.
+ promise.reject_error(Error::Type(
+ "Stream has closed queued or in-flight".to_string(),
+ ));
+ return promise;
+ }
+
+ self.close(cx, &global, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-writer-release-lock>
+ fn ReleaseLock(&self, can_gc: CanGc) {
+ // Let stream be this.[[stream]].
+ let Some(stream) = self.stream.get() else {
+ // If stream is undefined, return.
+ return;
+ };
+
+ // Assert: stream.[[writer]] is not undefined.
+ assert!(stream.get_writer().is_some());
+
+ let global = self.global();
+ let cx = GlobalScope::get_cx();
+
+ // Perform ! WritableStreamDefaultWriterRelease(this).
+ self.release(cx, &global, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-writer-write>
+ fn Write(
+ &self,
+ cx: SafeJSContext,
+ chunk: SafeHandleValue,
+ realm: InRealm,
+ can_gc: CanGc,
+ ) -> Rc<Promise> {
+ let global = GlobalScope::from_safe_context(cx, realm);
+
+ // If this.[[stream]] is undefined,
+ if self.stream.get().is_none() {
+ // return a promise rejected with a TypeError exception.
+ let global = GlobalScope::from_safe_context(cx, realm);
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_error(Error::Type("Stream is undefined".to_string()));
+ return promise;
+ }
+
+ // Return ! WritableStreamDefaultWriterWrite(this, chunk).
+ self.write(cx, &global, chunk, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-writer-constructor>
+ fn Constructor(
+ global: &GlobalScope,
+ proto: Option<SafeHandleObject>,
+ can_gc: CanGc,
+ stream: &WritableStream,
+ ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
+ let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
+
+ let cx = GlobalScope::get_cx();
+
+ // Perform ? SetUpWritableStreamDefaultWriter(this, stream).
+ writer.setup(cx, stream)?;
+
+ Ok(writer)
+ }
+}