diff options
author | Gregory Terzian <2792687+gterzian@users.noreply.github.com> | 2025-02-19 20:02:14 +0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-19 13:02:14 +0000 |
commit | df6d6361688f487142809b5923e030447870e073 (patch) | |
tree | 3e81927a47cfc98e8c895c8153eee47ee5758753 /components/script/dom | |
parent | 720bc725b0c1cd5f01f9ff1d17793a3b1f32a480 (diff) | |
download | servo-df6d6361688f487142809b5923e030447870e073.tar.gz servo-df6d6361688f487142809b5923e030447870e073.zip |
dom: Implement `WritableStream` (#34844)
* add basic interface for writable stream
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add stubs for pipeTo and pipeThrough methods
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add stubs for writable stream defautl writer
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add stubs for writable stream controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add underlying source dict
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add underlying source dict
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement constructor
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement init writable stream
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* impl setup default controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement controller setup
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement controller advance queue if neededd
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream finish erroring
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream reject close and closed promise if needed
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* finish implementation of stream finish erroring
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* call into controller setup from stream constructor
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream mark first write request in flight
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement controller process write
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* call into advance queue if needed at various points
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream deal with rejection, use from_safe_context
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement controller clear algorithms
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove unused todo
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream start erroring
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* finish writer ensure ready promise rejected
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream finish in flight write request
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement write constructor and setup
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement controller error
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove unnecessary unsafe code
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* finish implementing process write
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement close sentinel
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement public locked
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream abort
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream close
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement controller close
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix use of crown
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove unnecessary options around writer promises
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement writer get desired size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement writer ready
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement writer abort
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement writer close
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement writer release lock
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement writer public write
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement private writer write
Uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement writer release.
Uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* impl controller process close
Uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* finish controller process close
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* root promise handlers
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* handler errors in stream and writer constructor
finish implementation of stream finish in flight close
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix warnings
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement controller get chunk size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* tidy the webidls
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* implement stream get writer
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix assertion of stream state when advancing queue if needed
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add docs for value with size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* use reject_error in abort
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove unnecessary allowances of unsafe code
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* turn writable-streams test suite on
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* update encodings test expectations
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* properly check if type is set on sink in stream constructor
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix double borrow in controller advance queue if needed
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* make the queue aware of the close sentinel when dequeuing a value
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix assertion of no backpressure in update backpressure
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* also clear strategy size when clearing algorithms
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove this object arg when calling into strategy size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix has operations marked in flight
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix typo in has in flight write request
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* turn error into no-op when aborting a stream, if the stream is closed or errored.
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix error handling of calling into abort algorithm
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix error handling of calling into close and write algorithms
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix double borrow on queue
fix logic in update_backpressure
fix logic in get_desired_size
fix logic in writer setup
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* update test expectations for aborting suite
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix controller get_backpressure
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix clippy
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* update test expectations to expect errors in tests using unsupported apis
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix error handling of calling into start algo in controller setup
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* update test expectations for test checking for undefined this in strategy size call
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* update test expectation to timeout for response-stream-with-broken-then.any.worker
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* update interfaces
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix use of global() and error to_jsval
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix use of crown for promise handlers
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove fail expectation from worker interface objects test
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove fail expectation for test expecting this to be undefined in callback
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix documentation link for writablestream state
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* refactor write_requests to use a vec deque
uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove unnecessary doc
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* refactor reject_close_and_closed_promise_if_needed to take a safe js context as argument
uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* pass globals and contexts by ref where possible
uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix doc link for controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove unnecessary comment
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* change update_backpressure to be a method of the writablestream
uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* rename writer method that resolve closed and ready promise for clarity
uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add comments for steps in peek queue value
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix doc link for the abort algorihtm fulfillment handler
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix step doc and variable name in abort algo rejection handler
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* Add must_root to pending abort request
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
* limit visibility to crate for has_operations_marked_inflight
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
* limit visibility to crate for get_stored_error
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
* remove potention re-borrow risk in reject loop on write requests in finish_erroring
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove potential re-borrow risk when taking pending abort request in finish_erroring
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove potential re-borrow risk when taking close request in reject_close_and_closed_promise_if_needed
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove re-borrow risks in finish_in_flight_close
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove re-borrow risk on in_flight_close_request in finish_in_flight_close_with_error
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove unnecessary clone of of reason in abort
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix condition on backpressure and a writable state in close
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* limit visibility to crate for update_backpressure
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
* remove mutability of reason in abort workflow
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove unnecessary use of ignore_malloc_size_of around Dom in controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix ignore malloc size of comment for strategy size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* reduce visibility of public methods to crate in controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove use of JS_GetPendingException in controller get_chunk_size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* return early on error in write
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* use is_some_and in assertion that stream.witer is writer in release
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* root pending abort request
uses ai
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix mutable re-borrow risk in writer
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Diffstat (limited to 'components/script/dom')
-rw-r--r-- | components/script/dom/mod.rs | 3 | ||||
-rw-r--r-- | components/script/dom/readablestreamdefaultcontroller.rs | 59 | ||||
-rw-r--r-- | components/script/dom/writablestream.rs | 970 | ||||
-rw-r--r-- | components/script/dom/writablestreamdefaultcontroller.rs | 829 | ||||
-rw-r--r-- | components/script/dom/writablestreamdefaultwriter.rs | 500 |
5 files changed, 2350 insertions, 11 deletions
diff --git a/components/script/dom/mod.rs b/components/script/dom/mod.rs index 7127828224e..73320e527b4 100644 --- a/components/script/dom/mod.rs +++ b/components/script/dom/mod.rs @@ -628,6 +628,9 @@ pub(crate) mod workerlocation; pub(crate) mod workernavigator; pub(crate) mod worklet; pub(crate) mod workletglobalscope; +pub(crate) mod writablestream; +pub(crate) mod writablestreamdefaultcontroller; +pub(crate) mod writablestreamdefaultwriter; pub(crate) mod xmldocument; pub(crate) mod xmlhttprequest; pub(crate) mod xmlhttprequesteventtarget; diff --git a/components/script/dom/readablestreamdefaultcontroller.rs b/components/script/dom/readablestreamdefaultcontroller.rs index 6aa17253282..779bf1947ba 100644 --- a/components/script/dom/readablestreamdefaultcontroller.rs +++ b/components/script/dom/readablestreamdefaultcontroller.rs @@ -115,21 +115,25 @@ impl Callback for StartAlgorithmRejectionHandler { } /// <https://streams.spec.whatwg.org/#value-with-size> -#[derive(JSTraceable)] +#[derive(Debug, JSTraceable, PartialEq)] #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] pub(crate) struct ValueWithSize { - value: Box<Heap<JSVal>>, - size: f64, + /// <https://streams.spec.whatwg.org/#value-with-size-value> + pub(crate) value: Box<Heap<JSVal>>, + /// <https://streams.spec.whatwg.org/#value-with-size-size> + pub(crate) size: f64, } /// <https://streams.spec.whatwg.org/#value-with-size> -#[derive(JSTraceable)] +#[derive(Debug, JSTraceable, PartialEq)] #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] pub(crate) enum EnqueuedValue { /// A value enqueued from Rust. Native(Box<[u8]>), /// A Js value. Js(ValueWithSize), + /// <https://streams.spec.whatwg.org/#close-sentinel> + CloseSentinel, } impl EnqueuedValue { @@ -137,6 +141,9 @@ impl EnqueuedValue { match self { EnqueuedValue::Native(v) => v.len() as f64, EnqueuedValue::Js(v) => v.size, + // The size of the sentinel is zero, + // as per <https://streams.spec.whatwg.org/#ref-for-close-sentinel%E2%91%A0> + EnqueuedValue::CloseSentinel => 0., } } @@ -152,6 +159,9 @@ impl EnqueuedValue { EnqueuedValue::Js(value_with_size) => unsafe { value_with_size.value.to_jsval(*cx, rval); }, + EnqueuedValue::CloseSentinel => { + unreachable!("The close sentinel is never made available as a js val.") + }, } } } @@ -161,6 +171,7 @@ fn is_non_negative_number(value: &EnqueuedValue) -> bool { let value_with_size = match value { EnqueuedValue::Native(_) => return true, EnqueuedValue::Js(value_with_size) => value_with_size, + EnqueuedValue::CloseSentinel => return true, }; // If v is not a Number, return false. @@ -186,23 +197,29 @@ pub(crate) struct QueueWithSizes { #[ignore_malloc_size_of = "EnqueuedValue::Js"] queue: VecDeque<EnqueuedValue>, /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize> - total_size: f64, + pub(crate) total_size: f64, } impl QueueWithSizes { /// <https://streams.spec.whatwg.org/#dequeue-value> - fn dequeue_value(&mut self, cx: SafeJSContext, rval: MutableHandleValue) { + /// A none `rval` means we're dequeing the close sentinel, + /// which should never be made available to script. + pub(crate) fn dequeue_value(&mut self, cx: SafeJSContext, rval: Option<MutableHandleValue>) { let Some(value) = self.queue.front() else { unreachable!("Buffer cannot be empty when dequeue value is called into."); }; self.total_size -= value.size(); - value.to_jsval(cx, rval); + if let Some(rval) = rval { + value.to_jsval(cx, rval); + } else { + assert_eq!(value, &EnqueuedValue::CloseSentinel); + } self.queue.pop_front(); } /// <https://streams.spec.whatwg.org/#enqueue-value-with-size> #[cfg_attr(crown, allow(crown::unrooted_must_root))] - fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> { + pub(crate) fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> { // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception. if !is_non_negative_number(&value) { return Err(Error::Range( @@ -223,10 +240,30 @@ impl QueueWithSizes { Ok(()) } - fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.queue.is_empty() } + /// <https://streams.spec.whatwg.org/#peek-queue-value> + /// Returns whether value is the close sentinel. + pub(crate) fn peek_queue_value(&self, cx: SafeJSContext, rval: MutableHandleValue) -> bool { + // Assert: container has [[queue]] and [[queueTotalSize]] internal slots. + // Done with the QueueWithSizes type. + + // Assert: container.[[queue]] is not empty. + assert!(!self.is_empty()); + + // Let valueWithSize be container.[[queue]][0]. + let value_with_size = self.queue.front().expect("Queue is not empty."); + if let EnqueuedValue::CloseSentinel = value_with_size { + return true; + } + + // Return valueWithSize’s value. + value_with_size.to_jsval(cx, rval); + false + } + /// Only used with native sources. fn get_in_memory_bytes(&self) -> Option<Vec<u8>> { self.queue @@ -244,7 +281,7 @@ impl QueueWithSizes { } /// <https://streams.spec.whatwg.org/#reset-queue> - fn reset(&mut self) { + pub(crate) fn reset(&mut self) { self.queue.clear(); self.total_size = Default::default(); } @@ -409,7 +446,7 @@ impl ReadableStreamDefaultController { /// <https://streams.spec.whatwg.org/#dequeue-value> fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue) { let mut queue = self.queue.borrow_mut(); - queue.dequeue_value(cx, rval); + queue.dequeue_value(cx, Some(rval)); } /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull> diff --git a/components/script/dom/writablestream.rs b/components/script/dom/writablestream.rs new file mode 100644 index 00000000000..e650db4a0ce --- /dev/null +++ b/components/script/dom/writablestream.rs @@ -0,0 +1,970 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use std::cell::Cell; +use std::collections::VecDeque; +use std::mem; +use std::ptr::{self}; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::{Heap, JSObject}; +use js::jsval::{JSVal, ObjectValue, UndefinedValue}; +use js::rust::{ + HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, + MutableHandleValue as SafeMutableHandleValue, +}; + +use crate::dom::bindings::cell::DomRefCell; +use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy; +use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink; +use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods; +use crate::dom::bindings::conversions::ConversionResult; +use crate::dom::bindings::error::Error; +use crate::dom::bindings::import::module::Fallible; +use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, Reflector}; +use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; +use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm}; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; +use crate::dom::writablestreamdefaultcontroller::WritableStreamDefaultController; +use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter; +use crate::realms::{enter_realm, InRealm}; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; + +impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {} + +/// The fulfillment handler for the abort steps of +/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring> +#[derive(JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct AbortAlgorithmFulfillmentHandler { + stream: Dom<WritableStream>, + #[ignore_malloc_size_of = "Rc is hard"] + abort_request_promise: Rc<Promise>, +} + +impl Callback for AbortAlgorithmFulfillmentHandler { + fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) { + // Resolve abortRequest’s promise with undefined. + self.abort_request_promise.resolve_native(&()); + + // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream). + self.stream + .as_rooted() + .reject_close_and_closed_promise_if_needed(cx); + } +} + +impl js::gc::Rootable for AbortAlgorithmRejectionHandler {} + +/// The rejection handler for the abort steps of +/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring> +#[derive(JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct AbortAlgorithmRejectionHandler { + stream: Dom<WritableStream>, + #[ignore_malloc_size_of = "Rc is hard"] + abort_request_promise: Rc<Promise>, +} + +impl Callback for AbortAlgorithmRejectionHandler { + fn callback( + &self, + cx: SafeJSContext, + reason: SafeHandleValue, + _realm: InRealm, + _can_gc: CanGc, + ) { + // Reject abortRequest’s promise with reason. + self.abort_request_promise.reject_native(&reason); + + // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream). + self.stream + .as_rooted() + .reject_close_and_closed_promise_if_needed(cx); + } +} + +impl js::gc::Rootable for PendingAbortRequest {} + +/// <https://streams.spec.whatwg.org/#pending-abort-request> +#[derive(JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct PendingAbortRequest { + /// <https://streams.spec.whatwg.org/#pending-abort-request-promise> + #[ignore_malloc_size_of = "Rc is hard"] + promise: Rc<Promise>, + + /// <https://streams.spec.whatwg.org/#pending-abort-request-reason> + #[ignore_malloc_size_of = "mozjs"] + reason: Box<Heap<JSVal>>, + + /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring> + was_already_erroring: bool, +} + +/// <https://streams.spec.whatwg.org/#writablestream-state> +#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)] +pub(crate) enum WritableStreamState { + #[default] + Writable, + Closed, + Erroring, + Errored, +} + +/// <https://streams.spec.whatwg.org/#ws-class> +#[dom_struct] +pub struct WritableStream { + reflector_: Reflector, + + /// <https://streams.spec.whatwg.org/#writablestream-backpressure> + backpressure: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#writablestream-closerequest> + #[ignore_malloc_size_of = "Rc is hard"] + close_request: DomRefCell<Option<Rc<Promise>>>, + + /// <https://streams.spec.whatwg.org/#writablestream-controller> + controller: MutNullableDom<WritableStreamDefaultController>, + + /// <https://streams.spec.whatwg.org/#writablestream-detached> + detached: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest> + #[ignore_malloc_size_of = "Rc is hard"] + in_flight_write_request: DomRefCell<Option<Rc<Promise>>>, + + /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest> + #[ignore_malloc_size_of = "Rc is hard"] + in_flight_close_request: DomRefCell<Option<Rc<Promise>>>, + + /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest> + pending_abort_request: DomRefCell<Option<PendingAbortRequest>>, + + /// <https://streams.spec.whatwg.org/#writablestream-state> + state: Cell<WritableStreamState>, + + /// <https://streams.spec.whatwg.org/#writablestream-storederror> + #[ignore_malloc_size_of = "mozjs"] + stored_error: Heap<JSVal>, + + /// <https://streams.spec.whatwg.org/#writablestream-writer> + writer: MutNullableDom<WritableStreamDefaultWriter>, + + /// <https://streams.spec.whatwg.org/#writablestream-writerequests> + #[ignore_malloc_size_of = "Rc is hard"] + write_requests: DomRefCell<VecDeque<Rc<Promise>>>, +} + +impl WritableStream { + #[cfg_attr(crown, allow(crown::unrooted_must_root))] + /// <https://streams.spec.whatwg.org/#initialize-readable-stream> + fn new_inherited() -> WritableStream { + WritableStream { + reflector_: Reflector::new(), + backpressure: Default::default(), + close_request: Default::default(), + controller: Default::default(), + detached: Default::default(), + in_flight_write_request: Default::default(), + in_flight_close_request: Default::default(), + pending_abort_request: Default::default(), + state: Default::default(), + stored_error: Default::default(), + writer: Default::default(), + write_requests: Default::default(), + } + } + + fn new_with_proto( + global: &GlobalScope, + proto: Option<SafeHandleObject>, + can_gc: CanGc, + ) -> DomRoot<WritableStream> { + reflect_dom_object_with_proto( + Box::new(WritableStream::new_inherited()), + global, + proto, + can_gc, + ) + } + + /// Used as part of + /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> + pub(crate) fn assert_no_controller(&self) { + assert!(self.controller.get().is_none()); + } + + /// Used as part of + /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> + pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) { + self.controller.set(Some(controller)); + } + + pub(crate) fn is_writable(&self) -> bool { + matches!(self.state.get(), WritableStreamState::Writable) + } + + pub(crate) fn is_erroring(&self) -> bool { + matches!(self.state.get(), WritableStreamState::Erroring) + } + + pub(crate) fn is_errored(&self) -> bool { + matches!(self.state.get(), WritableStreamState::Errored) + } + + pub(crate) fn is_closed(&self) -> bool { + matches!(self.state.get(), WritableStreamState::Closed) + } + + pub(crate) fn has_in_flight_write_request(&self) -> bool { + self.in_flight_write_request.borrow().is_some() + } + + /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight> + pub(crate) fn has_operations_marked_inflight(&self) -> bool { + let in_flight_write_requested = self.in_flight_write_request.borrow().is_some(); + let in_flight_close_requested = self.in_flight_close_request.borrow().is_some(); + + in_flight_write_requested || in_flight_close_requested + } + + /// <https://streams.spec.whatwg.org/#writablestream-storederror> + pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) { + handle_mut.set(self.stored_error.get()); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring> + pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { + // Assert: stream.[[state]] is "erroring". + assert!(self.is_erroring()); + + // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false. + assert!(!self.has_operations_marked_inflight()); + + // Set stream.[[state]] to "errored". + self.state.set(WritableStreamState::Errored); + + // Perform ! stream.[[controller]].[[ErrorSteps]](). + let Some(controller) = self.controller.get() else { + unreachable!("Stream should have a controller."); + }; + controller.perform_error_steps(); + + // Let storedError be stream.[[storedError]]. + rooted!(in(*cx) let mut stored_error = UndefinedValue()); + self.get_stored_error(stored_error.handle_mut()); + + // For each writeRequest of stream.[[writeRequests]]: + let write_requests = mem::take(&mut *self.write_requests.borrow_mut()); + for request in write_requests { + // Reject writeRequest with storedError. + request.reject(cx, stored_error.handle()); + } + + // Set stream.[[writeRequests]] to an empty list. + // Done above with `drain`. + + // If stream.[[pendingAbortRequest]] is undefined, + if self.pending_abort_request.borrow().is_none() { + // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream). + self.reject_close_and_closed_promise_if_needed(cx); + + // Return. + return; + } + + // Let abortRequest be stream.[[pendingAbortRequest]]. + // Set stream.[[pendingAbortRequest]] to undefined. + rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take()); + if let Some(pending_abort_request) = &*pending_abort_request { + // If abortRequest’s was already erroring is true, + if pending_abort_request.was_already_erroring { + // Reject abortRequest’s promise with storedError. + pending_abort_request + .promise + .reject(cx, stored_error.handle()); + + // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream). + self.reject_close_and_closed_promise_if_needed(cx); + + // Return. + return; + } + + // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason). + rooted!(in(*cx) let mut reason = UndefinedValue()); + reason.set(pending_abort_request.reason.get()); + let promise = controller.abort_steps(cx, global, reason.handle(), can_gc); + + // Upon fulfillment of promise, + rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler { + stream: Dom::from_ref(self), + abort_request_promise: pending_abort_request.promise.clone(), + })); + + // Upon rejection of promise with reason r, + rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler { + stream: Dom::from_ref(self), + abort_request_promise: pending_abort_request.promise.clone(), + })); + + let handler = PromiseNativeHandler::new( + global, + fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), + rejection_handler.take().map(|h| Box::new(h) as Box<_>), + ); + let realm = enter_realm(global); + let comp = InRealm::Entered(&realm); + promise.append_native_handler(&handler, comp, can_gc); + } + } + + /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed> + fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext) { + // Assert: stream.[[state]] is "errored". + assert!(self.is_errored()); + + rooted!(in(*cx) let mut stored_error = UndefinedValue()); + self.get_stored_error(stored_error.handle_mut()); + + // If stream.[[closeRequest]] is not undefined + let close_request = self.close_request.borrow_mut().take(); + if let Some(close_request) = close_request { + // Assert: stream.[[inFlightCloseRequest]] is undefined. + assert!(self.in_flight_close_request.borrow().is_none()); + + // Reject stream.[[closeRequest]] with stream.[[storedError]]. + close_request.reject_native(&stored_error.handle()) + + // Set stream.[[closeRequest]] to undefined. + // Done with `take` above. + } + + // Let writer be stream.[[writer]]. + // If writer is not undefined, + if let Some(writer) = self.writer.get() { + // Reject writer.[[closedPromise]] with stream.[[storedError]]. + writer.reject_closed_promise_with_stored_error(&stored_error.handle()); + + // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true. + writer.set_close_promise_is_handled(); + } + } + + /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight> + pub(crate) fn close_queued_or_in_flight(&self) -> bool { + let close_requested = self.close_request.borrow().is_some(); + let in_flight_close_requested = self.in_flight_close_request.borrow().is_some(); + + close_requested || in_flight_close_requested + } + + /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write> + pub(crate) fn finish_in_flight_write(&self) { + let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else { + // Assert: stream.[[inFlightWriteRequest]] is not undefined. + unreachable!("Stream should have a write request"); + }; + + // Resolve stream.[[inFlightWriteRequest]] with undefined. + in_flight_write_request.resolve_native(&()); + + // Set stream.[[inFlightWriteRequest]] to undefined. + // Done above with `take`. + } + + /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring> + pub(crate) fn start_erroring( + &self, + cx: SafeJSContext, + global: &GlobalScope, + error: SafeHandleValue, + can_gc: CanGc, + ) { + // Assert: stream.[[storedError]] is undefined. + assert!(self.stored_error.get().is_undefined()); + + // Assert: stream.[[state]] is "writable". + assert!(self.is_writable()); + + // Let controller be stream.[[controller]]. + let Some(controller) = self.controller.get() else { + // Assert: controller is not undefined. + unreachable!("Stream should have a controller."); + }; + + // Set stream.[[state]] to "erroring". + self.state.set(WritableStreamState::Erroring); + + // Set stream.[[storedError]] to reason. + self.stored_error.set(*error); + + // Let writer be stream.[[writer]]. + if let Some(writer) = self.writer.get() { + // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected + writer.ensure_ready_promise_rejected(global, error, can_gc); + } + + // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true + if !self.has_operations_marked_inflight() && controller.started() { + // perform ! WritableStreamFinishErroring + self.finish_erroring(cx, global, can_gc); + } + } + + /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection> + pub(crate) fn deal_with_rejection( + &self, + cx: SafeJSContext, + global: &GlobalScope, + error: SafeHandleValue, + can_gc: CanGc, + ) { + // Let state be stream.[[state]]. + + // If state is "writable", + if self.is_writable() { + // Perform ! WritableStreamStartErroring(stream, error). + self.start_erroring(cx, global, error, can_gc); + + // Return. + return; + } + + // Assert: state is "erroring". + assert!(self.is_erroring()); + + // Perform ! WritableStreamFinishErroring(stream). + self.finish_erroring(cx, global, can_gc); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight> + pub(crate) fn mark_first_write_request_in_flight(&self) { + let mut in_flight_write_request = self.in_flight_write_request.borrow_mut(); + let mut write_requests = self.write_requests.borrow_mut(); + + // Assert: stream.[[inFlightWriteRequest]] is undefined. + assert!(in_flight_write_request.is_none()); + + // Assert: stream.[[writeRequests]] is not empty. + assert!(!write_requests.is_empty()); + + // Let writeRequest be stream.[[writeRequests]][0]. + // Remove writeRequest from stream.[[writeRequests]]. + let write_request = write_requests.pop_front().unwrap(); + + // Set stream.[[inFlightWriteRequest]] to writeRequest. + *in_flight_write_request = Some(write_request); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight> + pub(crate) fn mark_close_request_in_flight(&self) { + let mut in_flight_close_request = self.in_flight_close_request.borrow_mut(); + let mut close_request = self.close_request.borrow_mut(); + + // Assert: stream.[[inFlightCloseRequest]] is undefined. + assert!(in_flight_close_request.is_none()); + + // Assert: stream.[[closeRequest]] is not undefined. + assert!(close_request.is_some()); + + // Let closeRequest be stream.[[closeRequest]]. + // Set stream.[[closeRequest]] to undefined. + let close_request = close_request.take().unwrap(); + + // Set stream.[[inFlightCloseRequest]] to closeRequest. + *in_flight_close_request = Some(close_request); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close> + pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext) { + let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else { + // Assert: stream.[[inFlightCloseRequest]] is not undefined. + unreachable!("in_flight_close_request must be Some"); + }; + + // Resolve stream.[[inFlightCloseRequest]] with undefined. + in_flight_close_request.resolve_native(&()); + + // Set stream.[[inFlightCloseRequest]] to undefined. + // Done with take above. + + // Assert: stream.[[state]] is "writable" or "erroring". + assert!(self.is_writable() || self.is_erroring()); + + // If state is "erroring", + if self.is_erroring() { + // Set stream.[[storedError]] to undefined. + self.stored_error.set(UndefinedValue()); + + // If stream.[[pendingAbortRequest]] is not undefined, + rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take()); + if let Some(pending_abort_request) = &*pending_abort_request { + // Resolve stream.[[pendingAbortRequest]]'s promise with undefined. + pending_abort_request.promise.resolve_native(&()); + + // Set stream.[[pendingAbortRequest]] to undefined. + // Done above with `take`. + } + } + + // Set stream.[[state]] to "closed". + self.state.set(WritableStreamState::Closed); + + // Let writer be stream.[[writer]]. + if let Some(writer) = self.writer.get() { + // If writer is not undefined, + // resolve writer.[[closedPromise]] with undefined. + writer.resolve_closed_promise_with_undefined(); + } + + // Assert: stream.[[pendingAbortRequest]] is undefined. + assert!(self.pending_abort_request.borrow().is_none()); + + // Assert: stream.[[storedError]] is undefined. + assert!(self.stored_error.get().is_undefined()); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error> + pub(crate) fn finish_in_flight_close_with_error( + &self, + cx: SafeJSContext, + global: &GlobalScope, + error: SafeHandleValue, + can_gc: CanGc, + ) { + let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else { + // Assert: stream.[[inFlightCloseRequest]] is not undefined. + unreachable!("Inflight close request must be defined."); + }; + + // Reject stream.[[inFlightCloseRequest]] with error. + in_flight_close_request.reject_native(&error); + + // Set stream.[[inFlightCloseRequest]] to undefined. + // Done above with `take`. + + // Assert: stream.[[state]] is "writable" or "erroring". + assert!(self.is_erroring() || self.is_writable()); + + // If stream.[[pendingAbortRequest]] is not undefined, + rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take()); + if let Some(pending_abort_request) = &*pending_abort_request { + // Reject stream.[[pendingAbortRequest]]'s promise with error. + pending_abort_request.promise.reject_native(&error); + + // Set stream.[[pendingAbortRequest]] to undefined. + // Done above with `take`. + } + + // Perform ! WritableStreamDealWithRejection(stream, error). + self.deal_with_rejection(cx, global, error, can_gc); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error> + pub(crate) fn finish_in_flight_write_with_error( + &self, + cx: SafeJSContext, + global: &GlobalScope, + error: SafeHandleValue, + can_gc: CanGc, + ) { + let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else { + // Assert: stream.[[inFlightWriteRequest]] is not undefined. + unreachable!("Inflight write request must be defined."); + }; + + // Reject stream.[[inFlightWriteRequest]] with error. + in_flight_write_request.reject_native(&error); + + // Set stream.[[inFlightWriteRequest]] to undefined. + // Done above with `take`. + + // Assert: stream.[[state]] is "writable" or "erroring". + assert!(self.is_erroring() || self.is_writable()); + + // Perform ! WritableStreamDealWithRejection(stream, error). + self.deal_with_rejection(cx, global, error, can_gc); + } + + pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> { + self.writer.get() + } + + pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) { + self.writer.set(writer); + } + + pub(crate) fn set_backpressure(&self, backpressure: bool) { + self.backpressure.set(backpressure); + } + + pub(crate) fn get_backpressure(&self) -> bool { + self.backpressure.get() + } + + /// <https://streams.spec.whatwg.org/#is-writable-stream-locked> + pub(crate) fn is_locked(&self) -> bool { + // If stream.[[writer]] is undefined, return false. + // Return true. + self.get_writer().is_some() + } + + /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request> + pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> { + // Assert: ! IsWritableStreamLocked(stream) is true. + assert!(self.is_locked()); + + // Assert: stream.[[state]] is "writable". + assert!(self.is_writable()); + + // Let promise be a new promise. + let promise = Promise::new(global, can_gc); + + // Append promise to stream.[[writeRequests]]. + self.write_requests.borrow_mut().push_back(promise.clone()); + + // Return promise. + promise + } + + // Returns the rooted controller of the stream, if any. + pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> { + self.controller.get() + } + + /// <https://streams.spec.whatwg.org/#writable-stream-abort> + pub(crate) fn abort( + &self, + cx: SafeJSContext, + global: &GlobalScope, + provided_reason: SafeHandleValue, + can_gc: CanGc, + ) -> Rc<Promise> { + // If stream.[[state]] is "closed" or "errored", + if self.is_closed() || self.is_errored() { + // return a promise resolved with undefined. + let promise = Promise::new(global, can_gc); + promise.resolve_native(&()); + return promise; + } + + // TODO: Signal abort on stream.[[controller]].[[abortController]] with reason. + + // TODO: If state is "closed" or "errored", return a promise resolved with undefined. + // Note: state may have changed because of signal above. + + // If stream.[[pendingAbortRequest]] is not undefined, + if self.pending_abort_request.borrow().is_some() { + // return stream.[[pendingAbortRequest]]'s promise. + return self + .pending_abort_request + .borrow() + .as_ref() + .expect("Pending abort request must be Some.") + .promise + .clone(); + } + + // Assert: state is "writable" or "erroring". + assert!(self.is_writable() || self.is_erroring()); + + // Let wasAlreadyErroring be false. + let mut was_already_erroring = false; + rooted!(in(*cx) let undefined_reason = UndefinedValue()); + + // If state is "erroring", + let reason = if self.is_erroring() { + // Set wasAlreadyErroring to true. + was_already_erroring = true; + + // Set reason to undefined. + undefined_reason.handle() + } else { + // Use the provided reason. + provided_reason + }; + + // Let promise be a new promise. + let promise = Promise::new(global, can_gc); + + // Set stream.[[pendingAbortRequest]] to a new pending abort request + // whose promise is promise, + // reason is reason, + // and was already erroring is wasAlreadyErroring. + *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest { + promise: promise.clone(), + reason: Heap::boxed(reason.get()), + was_already_erroring, + }); + + // If wasAlreadyErroring is false, + if !was_already_erroring { + // perform ! WritableStreamStartErroring(stream, reason) + self.start_erroring(cx, global, reason, can_gc); + } + + // Return promise. + promise + } + + /// <https://streams.spec.whatwg.org/#writable-stream-close> + pub(crate) fn close( + &self, + cx: SafeJSContext, + global: &GlobalScope, + can_gc: CanGc, + ) -> Rc<Promise> { + // Let state be stream.[[state]]. + // If state is "closed" or "errored", + if self.is_closed() || self.is_errored() { + // return a promise rejected with a TypeError exception. + let promise = Promise::new(global, can_gc); + promise.reject_error(Error::Type("Stream is closed or errored.".to_string())); + return promise; + } + + // Assert: state is "writable" or "erroring". + assert!(self.is_writable() || self.is_erroring()); + + // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false. + assert!(!self.close_queued_or_in_flight()); + + // Let promise be a new promise. + let promise = Promise::new(global, can_gc); + + // Set stream.[[closeRequest]] to promise. + *self.close_request.borrow_mut() = Some(promise.clone()); + + // Let writer be stream.[[writer]]. + // If writer is not undefined, + if let Some(writer) = self.writer.get() { + // and stream.[[backpressure]] is true, + // and state is "writable", + if self.get_backpressure() && self.is_writable() { + // resolve writer.[[readyPromise]] with undefined. + writer.resolve_ready_promise_with_undefined(); + } + } + + // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]). + let Some(controller) = self.controller.get() else { + unreachable!("Stream must have a controller."); + }; + controller.close(cx, global, can_gc); + + // Return promise. + promise + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size> + /// Note: implement as a stream method, as opposed to a writer one, for convenience. + pub(crate) fn get_desired_size(&self) -> Option<f64> { + // Let stream be writer.[[stream]]. + // Stream is `self`. + + // Let state be stream.[[state]]. + // If state is "errored" or "erroring", return null. + if self.is_errored() || self.is_erroring() { + return None; + } + + // If state is "closed", return 0. + if self.is_closed() { + return Some(0.); + } + + let Some(controller) = self.controller.get() else { + unreachable!("Stream must have a controller."); + }; + Some(controller.get_desired_size()) + } + + /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer> + pub(crate) fn aquire_default_writer( + &self, + cx: SafeJSContext, + global: &GlobalScope, + can_gc: CanGc, + ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> { + // Let writer be a new WritableStreamDefaultWriter object. + let writer = WritableStreamDefaultWriter::new(global, None, can_gc); + + // Perform ? SetUpWritableStreamDefaultWriter(writer, stream). + writer.setup(cx, self)?; + + // Return writer. + Ok(writer) + } + + /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure> + pub(crate) fn update_backpressure( + &self, + backpressure: bool, + global: &GlobalScope, + can_gc: CanGc, + ) { + // Assert: stream.[[state]] is "writable". + self.is_writable(); + + // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false. + assert!(!self.close_queued_or_in_flight()); + + // Let writer be stream.[[writer]]. + let writer = self.get_writer(); + if writer.is_some() && backpressure != self.get_backpressure() { + // If writer is not undefined + let writer = writer.expect("Writer is some, as per the above check."); + // and backpressure is not stream.[[backpressure]], + if backpressure { + // If backpressure is true, set writer.[[readyPromise]] to a new promise. + let promise = Promise::new(global, can_gc); + writer.set_ready_promise(promise); + } else { + // Otherwise, + // Assert: backpressure is false. + assert!(!backpressure); + // Resolve writer.[[readyPromise]] with undefined. + writer.resolve_ready_promise_with_undefined(); + } + }; + + // Set stream.[[backpressure]] to backpressure. + self.set_backpressure(backpressure); + } +} + +impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream { + /// <https://streams.spec.whatwg.org/#ws-constructor> + fn Constructor( + cx: SafeJSContext, + global: &GlobalScope, + proto: Option<SafeHandleObject>, + can_gc: CanGc, + underlying_sink: Option<*mut JSObject>, + strategy: &QueuingStrategy, + ) -> Fallible<DomRoot<WritableStream>> { + // If underlyingSink is missing, set it to null. + rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut())); + + // Let underlyingSinkDict be underlyingSink, + // converted to an IDL value of type UnderlyingSink. + let underlying_sink_dict = if !underlying_sink_obj.is_null() { + rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get())); + match UnderlyingSink::new(cx, obj_val.handle()) { + Ok(ConversionResult::Success(val)) => val, + Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())), + _ => { + return Err(Error::JSFailed); + }, + } + } else { + UnderlyingSink::empty() + }; + + if !underlying_sink_dict.type_.handle().is_undefined() { + // If underlyingSinkDict["type"] exists, throw a RangeError exception. + return Err(Error::Range("type is set".to_string())); + } + + // Perform ! InitializeWritableStream(this). + let stream = WritableStream::new_with_proto(global, proto, can_gc); + + // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy). + let size_algorithm = extract_size_algorithm(strategy); + + // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1). + let high_water_mark = extract_high_water_mark(strategy, 1.0)?; + + // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink + let controller = WritableStreamDefaultController::new( + global, + &underlying_sink_dict, + high_water_mark, + size_algorithm, + can_gc, + ); + + // Note: this must be done before `setup`, + // otherwise `thisOb` is null in the start callback. + controller.set_underlying_sink_this_object(underlying_sink_obj.handle()); + + // Perform ? SetUpWritableStreamDefaultController + controller.setup(cx, global, &stream, &underlying_sink_dict.start, can_gc)?; + + Ok(stream) + } + + /// <https://streams.spec.whatwg.org/#ws-locked> + fn Locked(&self) -> bool { + // Return ! IsWritableStreamLocked(this). + self.is_locked() + } + + /// <https://streams.spec.whatwg.org/#ws-abort> + fn Abort( + &self, + cx: SafeJSContext, + reason: SafeHandleValue, + realm: InRealm, + can_gc: CanGc, + ) -> Rc<Promise> { + let global = GlobalScope::from_safe_context(cx, realm); + + // If ! IsWritableStreamLocked(this) is true, + if self.is_locked() { + // return a promise rejected with a TypeError exception. + let promise = Promise::new(&global, can_gc); + promise.reject_error(Error::Type("Stream is locked.".to_string())); + return promise; + } + + // Return ! WritableStreamAbort(this, reason). + self.abort(cx, &global, reason, can_gc) + } + + /// <https://streams.spec.whatwg.org/#ws-close> + fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> { + let cx = GlobalScope::get_cx(); + let global = GlobalScope::from_safe_context(cx, realm); + + // If ! IsWritableStreamLocked(this) is true, + if self.is_locked() { + // return a promise rejected with a TypeError exception. + let promise = Promise::new(&global, can_gc); + promise.reject_error(Error::Type("Stream is locked.".to_string())); + return promise; + } + + // If ! WritableStreamCloseQueuedOrInFlight(this) is true + if self.close_queued_or_in_flight() { + // return a promise rejected with a TypeError exception. + let promise = Promise::new(&global, can_gc); + promise.reject_error(Error::Type( + "Stream has closed queued or in-flight".to_string(), + )); + return promise; + } + + // Return ! WritableStreamClose(this). + self.close(cx, &global, can_gc) + } + + /// <https://streams.spec.whatwg.org/#ws-get-writer> + fn GetWriter( + &self, + realm: InRealm, + can_gc: CanGc, + ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> { + let cx = GlobalScope::get_cx(); + let global = GlobalScope::from_safe_context(cx, realm); + + // Return ? AcquireWritableStreamDefaultWriter(this). + self.aquire_default_writer(cx, &global, can_gc) + } +} diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs new file mode 100644 index 00000000000..cd027907211 --- /dev/null +++ b/components/script/dom/writablestreamdefaultcontroller.rs @@ -0,0 +1,829 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use std::cell::{Cell, RefCell}; +use std::ptr; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::{Heap, IsPromiseObject, JSObject}; +use js::jsval::{JSVal, UndefinedValue}; +use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle}; + +use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize; +use crate::dom::bindings::callback::ExceptionHandling; +use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{ + UnderlyingSink, UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, + UnderlyingSinkStartCallback, UnderlyingSinkWriteCallback, +}; +use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods; +use crate::dom::bindings::error::Error; +use crate::dom::bindings::reflector::{reflect_dom_object, Reflector}; +use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; +use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize}; +use crate::dom::writablestream::WritableStream; +use crate::realms::{enter_realm, InRealm}; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; + +impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {} + +/// The fulfillment handler for +/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct CloseAlgorithmFulfillmentHandler { + stream: Dom<WritableStream>, +} + +impl Callback for CloseAlgorithmFulfillmentHandler { + fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) { + let stream = self.stream.as_rooted(); + + // Perform ! WritableStreamFinishInFlightClose(stream). + stream.finish_in_flight_close(cx); + } +} + +impl js::gc::Rootable for CloseAlgorithmRejectionHandler {} + +/// The rejection handler for +/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct CloseAlgorithmRejectionHandler { + stream: Dom<WritableStream>, +} + +impl Callback for CloseAlgorithmRejectionHandler { + fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { + let stream = self.stream.as_rooted(); + + let global = GlobalScope::from_safe_context(cx, realm); + + // Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason). + stream.finish_in_flight_close_with_error(cx, &global, v, can_gc); + } +} + +impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {} + +/// The fulfillment handler for +/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct StartAlgorithmFulfillmentHandler { + controller: Dom<WritableStreamDefaultController>, +} + +impl Callback for StartAlgorithmFulfillmentHandler { + /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> + /// Upon fulfillment of startPromise, + fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { + let controller = self.controller.as_rooted(); + let stream = controller + .stream + .get() + .expect("Controller should have a stream."); + + // Assert: stream.[[state]] is "writable" or "erroring". + assert!(stream.is_erroring() || stream.is_writable()); + + // Set controller.[[started]] to true. + controller.started.set(true); + + let global = GlobalScope::from_safe_context(cx, realm); + + // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller). + controller.advance_queue_if_needed(cx, &global, can_gc) + } +} + +impl js::gc::Rootable for StartAlgorithmRejectionHandler {} + +/// The rejection handler for +/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct StartAlgorithmRejectionHandler { + controller: Dom<WritableStreamDefaultController>, +} + +impl Callback for StartAlgorithmRejectionHandler { + /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> + /// Upon rejection of startPromise with reason r, + fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { + let controller = self.controller.as_rooted(); + let stream = controller + .stream + .get() + .expect("Controller should have a stream."); + + // Assert: stream.[[state]] is "writable" or "erroring". + assert!(stream.is_erroring() || stream.is_writable()); + + // Set controller.[[started]] to true. + controller.started.set(true); + + let global = GlobalScope::from_safe_context(cx, realm); + + // Perform ! WritableStreamDealWithRejection(stream, r). + stream.deal_with_rejection(cx, &global, v, can_gc); + } +} + +impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {} + +/// The fulfillment handler for +/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct WriteAlgorithmFulfillmentHandler { + controller: Dom<WritableStreamDefaultController>, +} + +impl Callback for WriteAlgorithmFulfillmentHandler { + fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { + let controller = self.controller.as_rooted(); + let stream = controller + .stream + .get() + .expect("Controller should have a stream."); + + // Perform ! WritableStreamFinishInFlightWrite(stream). + stream.finish_in_flight_write(); + + // Let state be stream.[[state]]. + // Assert: state is "writable" or "erroring". + assert!(stream.is_erroring() || stream.is_writable()); + + // Perform ! DequeueValue(controller). + { + rooted!(in(*cx) let mut rval = UndefinedValue()); + let mut queue = controller.queue.borrow_mut(); + queue.dequeue_value(cx, Some(rval.handle_mut())); + } + + let global = GlobalScope::from_safe_context(cx, realm); + + // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "writable", + if !stream.close_queued_or_in_flight() && stream.is_writable() { + // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller). + let backpressure = controller.get_backpressure(); + + // Perform ! WritableStreamUpdateBackpressure(stream, backpressure). + stream.update_backpressure(backpressure, &global, can_gc); + } + + // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller). + controller.advance_queue_if_needed(cx, &global, can_gc) + } +} + +impl js::gc::Rootable for WriteAlgorithmRejectionHandler {} + +/// The rejection handler for +/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] +struct WriteAlgorithmRejectionHandler { + controller: Dom<WritableStreamDefaultController>, +} + +impl Callback for WriteAlgorithmRejectionHandler { + fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { + let controller = self.controller.as_rooted(); + let stream = controller + .stream + .get() + .expect("Controller should have a stream."); + + // If stream.[[state]] is "writable", + if stream.is_writable() { + // perform ! WritableStreamDefaultControllerClearAlgorithms(controller). + controller.clear_algorithms(); + } + + let global = GlobalScope::from_safe_context(cx, realm); + + // Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason). + stream.finish_in_flight_write_with_error(cx, &global, v, can_gc); + } +} + +/// <https://streams.spec.whatwg.org/#ws-default-controller-class> +#[dom_struct] +pub struct WritableStreamDefaultController { + reflector_: Reflector, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm> + #[ignore_malloc_size_of = "Rc is hard"] + abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm> + #[ignore_malloc_size_of = "Rc is hard"] + close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm> + #[ignore_malloc_size_of = "Rc is hard"] + write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>, + + /// The JS object used as `this` when invoking sink algorithms. + #[ignore_malloc_size_of = "mozjs"] + underlying_sink_obj: Heap<*mut JSObject>, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-queue> + queue: RefCell<QueueWithSizes>, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-started> + started: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategyhwm> + strategy_hwm: f64, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategysizealgorithm> + #[ignore_malloc_size_of = "Rc is hard"] + strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-stream> + stream: MutNullableDom<WritableStream>, +} + +impl WritableStreamDefaultController { + /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink> + #[cfg_attr(crown, allow(crown::unrooted_must_root))] + fn new_inherited( + underlying_sink: &UnderlyingSink, + strategy_hwm: f64, + strategy_size: Rc<QueuingStrategySize>, + ) -> WritableStreamDefaultController { + WritableStreamDefaultController { + reflector_: Reflector::new(), + queue: Default::default(), + stream: Default::default(), + abort: RefCell::new(underlying_sink.abort.clone()), + close: RefCell::new(underlying_sink.close.clone()), + write: RefCell::new(underlying_sink.write.clone()), + underlying_sink_obj: Default::default(), + strategy_hwm, + strategy_size: RefCell::new(Some(strategy_size)), + started: Default::default(), + } + } + + pub(crate) fn new( + global: &GlobalScope, + underlying_sink: &UnderlyingSink, + strategy_hwm: f64, + strategy_size: Rc<QueuingStrategySize>, + can_gc: CanGc, + ) -> DomRoot<WritableStreamDefaultController> { + reflect_dom_object( + Box::new(WritableStreamDefaultController::new_inherited( + underlying_sink, + strategy_hwm, + strategy_size, + )), + global, + can_gc, + ) + } + + pub(crate) fn started(&self) -> bool { + self.started.get() + } + + /// Setting the JS object after the heap has settled down. + pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) { + self.underlying_sink_obj.set(*this_object); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms> + fn clear_algorithms(&self) { + // Set controller.[[writeAlgorithm]] to undefined. + self.write.borrow_mut().take(); + + // Set controller.[[closeAlgorithm]] to undefined. + self.close.borrow_mut().take(); + + // Set controller.[[abortAlgorithm]] to undefined. + self.abort.borrow_mut().take(); + + // Set controller.[[strategySizeAlgorithm]] to undefined. + self.strategy_size.borrow_mut().take(); + } + + /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controllerr> + #[allow(unsafe_code)] + pub(crate) fn setup( + &self, + cx: SafeJSContext, + global: &GlobalScope, + stream: &WritableStream, + start: &Option<Rc<UnderlyingSinkStartCallback>>, + can_gc: CanGc, + ) -> Result<(), Error> { + // Assert: stream implements WritableStream. + // Implied by stream type. + + // Assert: stream.[[controller]] is undefined. + stream.assert_no_controller(); + + // Set controller.[[stream]] to stream. + self.stream.set(Some(stream)); + + // Set stream.[[controller]] to controller. + stream.set_default_controller(self); + + // Perform ! ResetQueue(controller). + + // Set controller.[[abortController]] to a new AbortController. + + // Set controller.[[started]] to false. + + // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm. + + // Set controller.[[strategyHWM]] to highWaterMark. + + // Set controller.[[writeAlgorithm]] to writeAlgorithm. + + // Set controller.[[closeAlgorithm]] to closeAlgorithm. + + // Set controller.[[abortAlgorithm]] to abortAlgorithm. + + // Note: above steps are done in `new_inherited`. + + // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller). + let backpressure = self.get_backpressure(); + + // Perform ! WritableStreamUpdateBackpressure(stream, backpressure). + stream.update_backpressure(backpressure, global, can_gc); + + // Let startResult be the result of performing startAlgorithm. (This may throw an exception.) + // Let startPromise be a promise resolved with startResult. + let start_promise = if let Some(start) = start { + rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>()); + rooted!(in(*cx) let mut result: JSVal); + rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); + start.Call_( + &this_object.handle(), + self, + result.handle_mut(), + ExceptionHandling::Rethrow, + )?; + let is_promise = unsafe { + if result.is_object() { + result_object.set(result.to_object()); + IsPromiseObject(result_object.handle().into_handle()) + } else { + false + } + }; + if is_promise { + let promise = Promise::new_with_js_promise(result_object.handle(), cx); + promise + } else { + let promise = Promise::new(global, can_gc); + promise.resolve_native(&result.get()); + promise + } + } else { + let promise = Promise::new(global, can_gc); + promise.resolve_native(&()); + promise + }; + + let rooted_default_controller = DomRoot::from_ref(self); + + // Upon fulfillment of startPromise, + rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler { + controller: Dom::from_ref(&rooted_default_controller), + })); + + // Upon rejection of startPromise with reason r, + rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler { + controller: Dom::from_ref(&rooted_default_controller), + })); + + let handler = PromiseNativeHandler::new( + global, + fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), + rejection_handler.take().map(|h| Box::new(h) as Box<_>), + ); + let realm = enter_realm(global); + let comp = InRealm::Entered(&realm); + start_promise.append_native_handler(&handler, comp, can_gc); + + Ok(()) + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-close> + pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { + // Perform ! EnqueueValueWithSize(controller, close sentinel, 0). + { + let mut queue = self.queue.borrow_mut(); + queue + .enqueue_value_with_size(EnqueuedValue::CloseSentinel) + .expect("Enqueuing the close sentinel should not fail."); + } + // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller). + self.advance_queue_if_needed(cx, global, can_gc); + } + + /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-writablestreamcontroller-abortsteps> + pub(crate) fn abort_steps( + &self, + cx: SafeJSContext, + global: &GlobalScope, + reason: SafeHandleValue, + can_gc: CanGc, + ) -> Rc<Promise> { + rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); + let algo = self.abort.borrow().clone(); + let result = if let Some(algo) = algo { + algo.Call_( + &this_object.handle(), + Some(reason), + ExceptionHandling::Rethrow, + ) + } else { + let promise = Promise::new(global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }; + result.unwrap_or_else(|e| { + let promise = Promise::new(global, can_gc); + promise.reject_error(e); + promise + }) + } + + pub(crate) fn call_write_algorithm( + &self, + cx: SafeJSContext, + chunk: SafeHandleValue, + global: &GlobalScope, + can_gc: CanGc, + ) -> Rc<Promise> { + rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); + let algo = self.write.borrow().clone(); + let result = if let Some(algo) = algo { + algo.Call_( + &this_object.handle(), + chunk, + self, + ExceptionHandling::Rethrow, + ) + } else { + let promise = Promise::new(global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }; + result.unwrap_or_else(|e| { + let promise = Promise::new(global, can_gc); + promise.reject_error(e); + promise + }) + } + + fn call_close_algorithm( + &self, + cx: SafeJSContext, + global: &GlobalScope, + can_gc: CanGc, + ) -> Rc<Promise> { + rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>()); + this_object.set(self.underlying_sink_obj.get()); + let algo = self.close.borrow().clone(); + let result = if let Some(algo) = algo { + algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow) + } else { + let promise = Promise::new(global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }; + result.unwrap_or_else(|e| { + let promise = Promise::new(global, can_gc); + promise.reject_error(e); + promise + }) + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close> + pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { + // Let stream be controller.[[stream]]. + let Some(stream) = self.stream.get() else { + unreachable!("Controller should have a stream"); + }; + + // Perform ! WritableStreamMarkCloseRequestInFlight(stream). + stream.mark_close_request_in_flight(); + + // Perform ! DequeueValue(controller). + { + let mut queue = self.queue.borrow_mut(); + queue.dequeue_value(cx, None); + } + + // Assert: controller.[[queue]] is empty. + assert!(self.queue.borrow().is_empty()); + + // Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]]. + let sink_close_promise = self.call_close_algorithm(cx, global, can_gc); + + // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + // Upon fulfillment of sinkClosePromise, + rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler { + stream: Dom::from_ref(&stream), + })); + + // Upon rejection of sinkClosePromise with reason reason, + rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler { + stream: Dom::from_ref(&stream), + })); + + // Attach handlers to the promise. + let handler = PromiseNativeHandler::new( + global, + fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), + rejection_handler.take().map(|h| Box::new(h) as Box<_>), + ); + let realm = enter_realm(global); + let comp = InRealm::Entered(&realm); + sink_close_promise.append_native_handler(&handler, comp, can_gc); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed> + fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { + // Let stream be controller.[[stream]]. + let Some(stream) = self.stream.get() else { + unreachable!("Controller should have a stream"); + }; + + // If controller.[[started]] is false, return. + if !self.started.get() { + return; + } + + // If stream.[[inFlightWriteRequest]] is not undefined, return. + if stream.has_in_flight_write_request() { + return; + } + + // Let state be stream.[[state]]. + + // Assert: state is not "closed" or "errored". + assert!(!(stream.is_errored() || stream.is_closed())); + + // If state is "erroring", + if stream.is_erroring() { + // Perform ! WritableStreamFinishErroring(stream). + stream.finish_erroring(cx, global, can_gc); + + // Return. + return; + } + + // Let value be ! PeekQueueValue(controller). + rooted!(in(*cx) let mut value = UndefinedValue()); + let is_closed = { + let queue = self.queue.borrow_mut(); + + // If controller.[[queue]] is empty, return. + if queue.is_empty() { + return; + } + queue.peek_queue_value(cx, value.handle_mut()) + }; + + if is_closed { + // If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller). + self.process_close(cx, global, can_gc); + } else { + // Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value). + self.process_write(cx, value.handle(), global, can_gc); + }; + } + + /// <https://streams.spec.whatwg.org/#ws-default-controller-private-error> + pub(crate) fn perform_error_steps(&self) { + // Perform ! ResetQueue(this). + self.queue.borrow_mut().reset(); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write> + fn process_write( + &self, + cx: SafeJSContext, + chunk: SafeHandleValue, + global: &GlobalScope, + can_gc: CanGc, + ) { + // Let stream be controller.[[stream]]. + let Some(stream) = self.stream.get() else { + unreachable!("Controller should have a stream"); + }; + + // Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream). + stream.mark_first_write_request_in_flight(); + + // Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk. + let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc); + + // Upon fulfillment of sinkWritePromise, + rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler { + controller: Dom::from_ref(self), + })); + + // Upon rejection of sinkWritePromise with reason, + rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler { + controller: Dom::from_ref(self), + })); + + // Attach handlers to the promise. + let handler = PromiseNativeHandler::new( + global, + fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), + rejection_handler.take().map(|h| Box::new(h) as Box<_>), + ); + let realm = enter_realm(global); + let comp = InRealm::Entered(&realm); + sink_write_promise.append_native_handler(&handler, comp, can_gc); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size> + pub(crate) fn get_desired_size(&self) -> f64 { + // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]]. + let queue = self.queue.borrow(); + let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX); + desired_size.clamp(desired_size, self.strategy_hwm) + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure> + fn get_backpressure(&self) -> bool { + // Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller). + let desired_size = self.get_desired_size(); + + // Return true if desiredSize ≤ 0, or false otherwise. + desired_size == 0.0 || desired_size.is_sign_negative() + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size> + pub(crate) fn get_chunk_size( + &self, + cx: SafeJSContext, + global: &GlobalScope, + chunk: SafeHandleValue, + can_gc: CanGc, + ) -> f64 { + // If controller.[[strategySizeAlgorithm]] is undefined, + let Some(strategy_size) = self.strategy_size.borrow().clone() else { + // Assert: controller.[[stream]].[[state]] is "erroring" or "errored". + let Some(stream) = self.stream.get() else { + unreachable!("Controller should have a stream"); + }; + assert!(stream.is_erroring() || stream.is_errored()); + + // Return 1. + return 1.0; + }; + + // Let returnValue be the result of performing controller.[[strategySizeAlgorithm]], + // passing in chunk, and interpreting the result as a completion record. + let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow); + + match result { + // Let chunkSize be result.[[Value]]. + Ok(size) => size, + Err(error) => { + // If result is an abrupt completion, + + // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]). + // Create a rooted value for the error. + rooted!(in(*cx) let mut rooted_error = UndefinedValue()); + error.to_jsval(cx, global, rooted_error.handle_mut()); + self.error_if_needed(cx, rooted_error.handle(), global, can_gc); + + // Return 1. + 1.0 + }, + } + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-write> + pub(crate) fn write( + &self, + cx: SafeJSContext, + global: &GlobalScope, + chunk: SafeHandleValue, + chunk_size: f64, + can_gc: CanGc, + ) { + // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize). + let enqueue_result = { + let mut queue = self.queue.borrow_mut(); + queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize { + value: Heap::boxed(chunk.get()), + size: chunk_size, + })) + }; + + // If enqueueResult is an abrupt completion, + if let Err(error) = enqueue_result { + // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]). + // Create a rooted value for the error. + rooted!(in(*cx) let mut rooted_error = UndefinedValue()); + error.to_jsval(cx, global, rooted_error.handle_mut()); + self.error_if_needed(cx, rooted_error.handle(), global, can_gc); + + // Return. + return; + } + + // Let stream be controller.[[stream]]. + let Some(stream) = self.stream.get() else { + unreachable!("Controller should have a stream"); + }; + + // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "writable", + if !stream.close_queued_or_in_flight() && stream.is_writable() { + // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller). + let backpressure = self.get_backpressure(); + + // Perform ! WritableStreamUpdateBackpressure(stream, backpressure). + stream.update_backpressure(backpressure, global, can_gc); + } + + // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller). + self.advance_queue_if_needed(cx, global, can_gc); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed> + pub(crate) fn error_if_needed( + &self, + cx: SafeJSContext, + error: SafeHandleValue, + global: &GlobalScope, + can_gc: CanGc, + ) { + // Let stream be controller.[[stream]]. + let Some(stream) = self.stream.get() else { + unreachable!("Controller should have a stream"); + }; + + // If stream.[[state]] is "writable", + if stream.is_writable() { + // Perform ! WritableStreamDefaultControllerError(controller, e). + self.error(&stream, cx, error, global, can_gc); + } + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error> + fn error( + &self, + stream: &WritableStream, + cx: SafeJSContext, + e: SafeHandleValue, + global: &GlobalScope, + can_gc: CanGc, + ) { + // Let stream be controller.[[stream]]. + // Done above with the argument. + + // Assert: stream.[[state]] is "writable". + assert!(stream.is_writable()); + + // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + // Perform ! WritableStreamStartErroring(stream, error). + stream.start_erroring(cx, global, e, can_gc); + } +} + +impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder> + for WritableStreamDefaultController +{ + /// <https://streams.spec.whatwg.org/#ws-default-controller-error> + fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) { + // Let state be this.[[stream]].[[state]]. + let Some(stream) = self.stream.get() else { + unreachable!("Controller should have a stream"); + }; + + // If state is not "writable", return. + if !stream.is_writable() { + return; + } + + let global = GlobalScope::from_safe_context(cx, realm); + + // Perform ! WritableStreamDefaultControllerError(this, e). + self.error(&stream, cx, e, &global, can_gc); + } +} diff --git a/components/script/dom/writablestreamdefaultwriter.rs b/components/script/dom/writablestreamdefaultwriter.rs new file mode 100644 index 00000000000..2a01f5347c6 --- /dev/null +++ b/components/script/dom/writablestreamdefaultwriter.rs @@ -0,0 +1,500 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use std::cell::RefCell; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsval::UndefinedValue; +use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue}; + +use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods; +use crate::dom::bindings::error::Error; +use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, DomGlobal, Reflector}; +use crate::dom::bindings::root::{DomRoot, MutNullableDom}; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::writablestream::WritableStream; +use crate::realms::InRealm; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; + +/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter> +#[dom_struct] +pub struct WritableStreamDefaultWriter { + reflector_: Reflector, + + #[ignore_malloc_size_of = "Rc is hard"] + ready_promise: RefCell<Rc<Promise>>, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise> + #[ignore_malloc_size_of = "Rc is hard"] + closed_promise: RefCell<Rc<Promise>>, + + /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream> + stream: MutNullableDom<WritableStream>, +} + +impl WritableStreamDefaultWriter { + #[cfg_attr(crown, allow(crown::unrooted_must_root))] + /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer> + /// The parts that create a new promise. + fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter { + WritableStreamDefaultWriter { + reflector_: Reflector::new(), + stream: Default::default(), + closed_promise: RefCell::new(Promise::new(global, can_gc)), + ready_promise: RefCell::new(Promise::new(global, can_gc)), + } + } + + pub(crate) fn new( + global: &GlobalScope, + proto: Option<SafeHandleObject>, + can_gc: CanGc, + ) -> DomRoot<WritableStreamDefaultWriter> { + reflect_dom_object_with_proto( + Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)), + global, + proto, + can_gc, + ) + } + + /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer> + /// Continuing from `new_inherited`, the rest. + pub(crate) fn setup(&self, cx: SafeJSContext, stream: &WritableStream) -> Result<(), Error> { + // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception. + if stream.is_locked() { + return Err(Error::Type("Stream is locked".to_string())); + } + + // Set writer.[[stream]] to stream. + self.stream.set(Some(stream)); + + // Set stream.[[writer]] to writer. + stream.set_writer(Some(self)); + + // Let state be stream.[[state]]. + + // If state is "writable", + if stream.is_writable() { + // If ! WritableStreamCloseQueuedOrInFlight(stream) is false + // and stream.[[backpressure]] is true, + if !stream.close_queued_or_in_flight() && stream.get_backpressure() { + // set writer.[[readyPromise]] to a new promise. + // Done in `new_inherited`. + } else { + // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined. + // Note: new promise created in `new_inherited`. + self.ready_promise.borrow().resolve_native(&()); + } + + // Set writer.[[closedPromise]] to a new promise. + // Done in `new_inherited`. + return Ok(()); + } + + // Otherwise, if state is "erroring", + if stream.is_erroring() { + rooted!(in(*cx) let mut error = UndefinedValue()); + stream.get_stored_error(error.handle_mut()); + + // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]]. + // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true. + // Note: new promise created in `new_inherited`. + let ready_promise = self.ready_promise.borrow(); + ready_promise.reject_native(&error.handle()); + ready_promise.set_promise_is_handled(); + + // Set writer.[[closedPromise]] to a new promise. + // Done in `new_inherited`. + return Ok(()); + } + + // Otherwise, if state is "closed", + if stream.is_closed() { + // Set writer.[[readyPromise]] to a promise resolved with undefined. + // Note: new promise created in `new_inherited`. + self.ready_promise.borrow().resolve_native(&()); + + // Set writer.[[closedPromise]] to a promise resolved with undefined. + // Note: new promise created in `new_inherited`. + self.closed_promise.borrow().resolve_native(&()); + return Ok(()); + } + + // Otherwise, + // Assert: state is "errored". + assert!(stream.is_errored()); + + // Let storedError be stream.[[storedError]]. + rooted!(in(*cx) let mut error = UndefinedValue()); + stream.get_stored_error(error.handle_mut()); + + // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]]. + // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true. + // Note: new promise created in `new_inherited`. + let ready_promise = self.ready_promise.borrow(); + ready_promise.reject_native(&error.handle()); + ready_promise.set_promise_is_handled(); + + // Set writer.[[closedPromise]] to a promise rejected with storedError. + // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true. + // Note: new promise created in `new_inherited`. + let ready_promise = self.closed_promise.borrow(); + ready_promise.reject_native(&error.handle()); + ready_promise.set_promise_is_handled(); + + Ok(()) + } + + pub(crate) fn reject_closed_promise_with_stored_error(&self, error: &SafeHandleValue) { + self.closed_promise.borrow().reject_native(error); + } + + pub(crate) fn set_close_promise_is_handled(&self) { + self.closed_promise.borrow().set_promise_is_handled(); + } + + pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) { + *self.ready_promise.borrow_mut() = promise; + } + + pub(crate) fn resolve_ready_promise_with_undefined(&self) { + self.ready_promise.borrow().resolve_native(&()); + } + + pub(crate) fn resolve_closed_promise_with_undefined(&self) { + self.closed_promise.borrow().resolve_native(&()); + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected> + pub(crate) fn ensure_ready_promise_rejected( + &self, + global: &GlobalScope, + error: SafeHandleValue, + can_gc: CanGc, + ) { + let ready_promise = self.ready_promise.borrow().clone(); + + // If writer.[[readyPromise]].[[PromiseState]] is "pending", + if ready_promise.is_pending() { + // reject writer.[[readyPromise]] with error. + ready_promise.reject_native(&error); + + // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true. + ready_promise.set_promise_is_handled(); + } else { + // Otherwise, set writer.[[readyPromise]] to a promise rejected with error. + let promise = Promise::new(global, can_gc); + promise.reject_native(&error); + + // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true. + promise.set_promise_is_handled(); + *self.ready_promise.borrow_mut() = promise; + } + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected> + pub(crate) fn ensure_closed_promise_rejected( + &self, + global: &GlobalScope, + error: SafeHandleValue, + can_gc: CanGc, + ) { + let closed_promise = self.closed_promise.borrow().clone(); + + // If writer.[[closedPromise]].[[PromiseState]] is "pending", + if closed_promise.is_pending() { + // reject writer.[[closedPromise]] with error. + closed_promise.reject_native(&error); + + // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true. + closed_promise.set_promise_is_handled(); + } else { + // Otherwise, set writer.[[closedPromise]] to a promise rejected with error. + let promise = Promise::new(global, can_gc); + promise.reject_native(&error); + + // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true. + promise.set_promise_is_handled(); + *self.closed_promise.borrow_mut() = promise; + } + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort> + fn abort( + &self, + cx: SafeJSContext, + global: &GlobalScope, + reason: SafeHandleValue, + can_gc: CanGc, + ) -> Rc<Promise> { + // Let stream be writer.[[stream]]. + let Some(stream) = self.stream.get() else { + // Assert: stream is not undefined. + unreachable!("Stream should be set."); + }; + + // Return ! WritableStreamAbort(stream, reason). + stream.abort(cx, global, reason, can_gc) + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close> + fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> { + // Let stream be writer.[[stream]]. + let Some(stream) = self.stream.get() else { + // Assert: stream is not undefined. + unreachable!("Stream should be set."); + }; + + // Return ! WritableStreamClose(stream). + stream.close(cx, global, can_gc) + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write> + fn write( + &self, + cx: SafeJSContext, + global: &GlobalScope, + chunk: SafeHandleValue, + can_gc: CanGc, + ) -> Rc<Promise> { + // Let stream be writer.[[stream]]. + let Some(stream) = self.stream.get() else { + // Assert: stream is not undefined. + unreachable!("Stream should be set."); + }; + + // Let controller be stream.[[controller]]. + // Note: asserting controller is some. + let Some(controller) = stream.get_controller() else { + unreachable!("Controller should be set."); + }; + + // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk). + let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc); + + // If stream is not equal to writer.[[stream]], + // return a promise rejected with a TypeError exception. + if !self + .stream + .get() + .map_or(false, |current_stream| current_stream == stream) + { + let promise = Promise::new(global, can_gc); + promise.reject_error(Error::Type( + "Stream is not equal to writer stream".to_string(), + )); + return promise; + } + + // Let state be stream.[[state]]. + // If state is "errored", + if stream.is_errored() { + // return a promise rejected with stream.[[storedError]]. + rooted!(in(*cx) let mut error = UndefinedValue()); + stream.get_stored_error(error.handle_mut()); + let promise = Promise::new(global, can_gc); + promise.reject_native(&error.handle()); + return promise; + } + + // If ! WritableStreamCloseQueuedOrInFlight(stream) is true + // or state is "closed", + if stream.close_queued_or_in_flight() || stream.is_closed() { + // return a promise rejected with a TypeError exception + // indicating that the stream is closing or closed + let promise = Promise::new(global, can_gc); + promise.reject_error(Error::Type( + "Stream has been closed, or has close queued or in-flight".to_string(), + )); + return promise; + } + + // If state is "erroring", + if stream.is_erroring() { + // return a promise rejected with stream.[[storedError]]. + rooted!(in(*cx) let mut error = UndefinedValue()); + stream.get_stored_error(error.handle_mut()); + let promise = Promise::new(global, can_gc); + promise.reject_native(&error.handle()); + return promise; + } + + // Assert: state is "writable". + assert!(stream.is_writable()); + + // Let promise be ! WritableStreamAddWriteRequest(stream). + let promise = stream.add_write_request(global, can_gc); + + // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize). + controller.write(cx, global, chunk, chunk_size, can_gc); + + // Return promise. + promise + } + + /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release> + pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { + // Let stream be this.[[stream]]. + let Some(stream) = self.stream.get() else { + // Assert: stream is not undefined. + unreachable!("Stream should be set."); + }; + + // Assert: stream.[[writer]] is writer. + assert!(stream.get_writer().is_some_and(|writer| &*writer == self)); + + // Let releasedError be a new TypeError. + let released_error = Error::Type("Writer has been released".to_string()); + + // Root the js val of the error. + rooted!(in(*cx) let mut error = UndefinedValue()); + released_error.to_jsval(cx, global, error.handle_mut()); + + // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError). + self.ensure_ready_promise_rejected(global, error.handle(), can_gc); + + // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError). + self.ensure_closed_promise_rejected(global, error.handle(), can_gc); + + // Set stream.[[writer]] to undefined. + stream.set_writer(None); + + // Set this.[[stream]] to undefined. + self.stream.set(None); + } +} + +impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter { + /// <https://streams.spec.whatwg.org/#default-writer-closed> + fn Closed(&self) -> Rc<Promise> { + // Return this.[[closedPromise]]. + return self.closed_promise.borrow().clone(); + } + + /// <https://streams.spec.whatwg.org/#default-writer-desired-size> + fn GetDesiredSize(&self) -> Result<Option<f64>, Error> { + // If this.[[stream]] is undefined, throw a TypeError exception. + let Some(stream) = self.stream.get() else { + return Err(Error::Type("Stream is undefined".to_string())); + }; + + // Return ! WritableStreamDefaultWriterGetDesiredSize(this). + Ok(stream.get_desired_size()) + } + + /// <https://streams.spec.whatwg.org/#default-writer-ready> + fn Ready(&self) -> Rc<Promise> { + // Return this.[[readyPromise]]. + return self.ready_promise.borrow().clone(); + } + + /// <https://streams.spec.whatwg.org/#default-writer-abort> + fn Abort( + &self, + cx: SafeJSContext, + reason: SafeHandleValue, + realm: InRealm, + can_gc: CanGc, + ) -> Rc<Promise> { + let global = GlobalScope::from_safe_context(cx, realm); + + // If this.[[stream]] is undefined, + if self.stream.get().is_none() { + // return a promise rejected with a TypeError exception. + let promise = Promise::new(&global, can_gc); + promise.reject_error(Error::Type("Stream is undefined".to_string())); + return promise; + } + + // Return ! WritableStreamDefaultWriterAbort(this, reason). + self.abort(cx, &global, reason, can_gc) + } + + /// <https://streams.spec.whatwg.org/#default-writer-close> + fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> { + let cx = GlobalScope::get_cx(); + let global = GlobalScope::from_safe_context(cx, in_realm); + let promise = Promise::new(&global, can_gc); + + // Let stream be this.[[stream]]. + let Some(stream) = self.stream.get() else { + // If stream is undefined, + // return a promise rejected with a TypeError exception. + promise.reject_error(Error::Type("Stream is undefined".to_string())); + return promise; + }; + + // If ! WritableStreamCloseQueuedOrInFlight(stream) is true + if stream.close_queued_or_in_flight() { + // return a promise rejected with a TypeError exception. + promise.reject_error(Error::Type( + "Stream has closed queued or in-flight".to_string(), + )); + return promise; + } + + self.close(cx, &global, can_gc) + } + + /// <https://streams.spec.whatwg.org/#default-writer-release-lock> + fn ReleaseLock(&self, can_gc: CanGc) { + // Let stream be this.[[stream]]. + let Some(stream) = self.stream.get() else { + // If stream is undefined, return. + return; + }; + + // Assert: stream.[[writer]] is not undefined. + assert!(stream.get_writer().is_some()); + + let global = self.global(); + let cx = GlobalScope::get_cx(); + + // Perform ! WritableStreamDefaultWriterRelease(this). + self.release(cx, &global, can_gc); + } + + /// <https://streams.spec.whatwg.org/#default-writer-write> + fn Write( + &self, + cx: SafeJSContext, + chunk: SafeHandleValue, + realm: InRealm, + can_gc: CanGc, + ) -> Rc<Promise> { + let global = GlobalScope::from_safe_context(cx, realm); + + // If this.[[stream]] is undefined, + if self.stream.get().is_none() { + // return a promise rejected with a TypeError exception. + let global = GlobalScope::from_safe_context(cx, realm); + let promise = Promise::new(&global, can_gc); + promise.reject_error(Error::Type("Stream is undefined".to_string())); + return promise; + } + + // Return ! WritableStreamDefaultWriterWrite(this, chunk). + self.write(cx, &global, chunk, can_gc) + } + + /// <https://streams.spec.whatwg.org/#default-writer-constructor> + fn Constructor( + global: &GlobalScope, + proto: Option<SafeHandleObject>, + can_gc: CanGc, + stream: &WritableStream, + ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> { + let writer = WritableStreamDefaultWriter::new(global, proto, can_gc); + + let cx = GlobalScope::get_cx(); + + // Perform ? SetUpWritableStreamDefaultWriter(this, stream). + writer.setup(cx, stream)?; + + Ok(writer) + } +} |