diff options
author | Gregory Terzian <2792687+gterzian@users.noreply.github.com> | 2025-04-08 00:48:42 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-07 16:48:42 +0000 |
commit | a5c547259f4a1eda8f266e6cc3f673d21d66723b (patch) | |
tree | 3746f769258f8bf7322cb402b8b93b9afaff2bc6 | |
parent | 777b74252a86c2abaeb0b5220aa87312a695ebcf (diff) | |
download | servo-a5c547259f4a1eda8f266e6cc3f673d21d66723b.tar.gz servo-a5c547259f4a1eda8f266e6cc3f673d21d66723b.zip |
Streams: add an underlying sink type (#36385)
Introduces the concept of different types of underlying sinks for the
writable controller, and a minor fix to the abort algorithm.
The dead code is already used in the wip at
https://github.com/servo/servo/pull/36181/, and will also be used in a
another wip in parallel to implement transform stream, so the concept is
introduced here with dead code to facilitate the work in parallel and
prevent too much merge conflicts down the road.
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
-rw-r--r-- | components/script/dom/writablestream.rs | 7 | ||||
-rw-r--r-- | components/script/dom/writablestreamdefaultcontroller.rs | 148 |
2 files changed, 105 insertions, 50 deletions
diff --git a/components/script/dom/writablestream.rs b/components/script/dom/writablestream.rs index 7de6b013169..bbc5d6316b6 100644 --- a/components/script/dom/writablestream.rs +++ b/components/script/dom/writablestream.rs @@ -28,7 +28,9 @@ use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_alg use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; -use crate::dom::writablestreamdefaultcontroller::WritableStreamDefaultController; +use crate::dom::writablestreamdefaultcontroller::{ + UnderlyingSinkType, WritableStreamDefaultController, +}; use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter; use crate::realms::{InRealm, enter_realm}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; @@ -155,7 +157,7 @@ pub struct WritableStream { impl WritableStream { #[cfg_attr(crown, allow(crown::unrooted_must_root))] - /// <https://streams.spec.whatwg.org/#initialize-readable-stream> + /// <https://streams.spec.whatwg.org/#initialize-writable-stream> fn new_inherited() -> WritableStream { WritableStream { reflector_: Reflector::new(), @@ -879,6 +881,7 @@ impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream { // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink let controller = WritableStreamDefaultController::new( global, + UnderlyingSinkType::Js, &underlying_sink_dict, high_water_mark, size_algorithm, diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs index 53de61df506..691e85408e9 100644 --- a/components/script/dom/writablestreamdefaultcontroller.rs +++ b/components/script/dom/writablestreamdefaultcontroller.rs @@ -214,11 +214,26 @@ impl Callback for WriteAlgorithmRejectionHandler { } } +/// The type of sink algorithms we are using. +#[allow(dead_code)] +#[derive(JSTraceable, MallocSizeOf, PartialEq)] +pub enum UnderlyingSinkType { + /// Algorithms are provided by Js callbacks. + Js, + /// Algorithms supporting streams transfer are implemented in Rust. + /// TODO: implement transfer. + Transfer, +} + /// <https://streams.spec.whatwg.org/#ws-default-controller-class> #[dom_struct] pub struct WritableStreamDefaultController { reflector_: Reflector, + /// The type of underlying sink used. Besides the default JS one, + /// there will be others for stream transfer, and for transform stream. + underlying_sink_type: UnderlyingSinkType, + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm> #[ignore_malloc_size_of = "Rc is hard"] abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>, @@ -256,12 +271,14 @@ impl WritableStreamDefaultController { /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink> #[cfg_attr(crown, allow(crown::unrooted_must_root))] fn new_inherited( + underlying_sink_type: UnderlyingSinkType, underlying_sink: &UnderlyingSink, strategy_hwm: f64, strategy_size: Rc<QueuingStrategySize>, ) -> WritableStreamDefaultController { WritableStreamDefaultController { reflector_: Reflector::new(), + underlying_sink_type, queue: Default::default(), stream: Default::default(), abort: RefCell::new(underlying_sink.abort.clone()), @@ -276,6 +293,7 @@ impl WritableStreamDefaultController { pub(crate) fn new( global: &GlobalScope, + underlying_sink_type: UnderlyingSinkType, underlying_sink: &UnderlyingSink, strategy_hwm: f64, strategy_size: Rc<QueuingStrategySize>, @@ -283,6 +301,7 @@ impl WritableStreamDefaultController { ) -> DomRoot<WritableStreamDefaultController> { reflect_dom_object( Box::new(WritableStreamDefaultController::new_inherited( + underlying_sink_type, underlying_sink, strategy_hwm, strategy_size, @@ -390,6 +409,7 @@ impl WritableStreamDefaultController { Promise::new_resolved(global, cx, result.get(), can_gc) } } else { + // Let startAlgorithm be an algorithm that returns undefined. Promise::new_resolved(global, cx, (), can_gc) }; @@ -439,71 +459,103 @@ impl WritableStreamDefaultController { reason: SafeHandleValue, can_gc: CanGc, ) -> Rc<Promise> { - rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); - let algo = self.abort.borrow().clone(); - let result = if let Some(algo) = algo { - algo.Call_( - &this_object.handle(), - Some(reason), - ExceptionHandling::Rethrow, - can_gc, - ) - } else { - Ok(Promise::new_resolved(global, cx, (), can_gc)) + let result = match self.underlying_sink_type { + UnderlyingSinkType::Js => { + rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); + let algo = self.abort.borrow().clone(); + // Let result be the result of performing this.[[abortAlgorithm]], passing reason. + let result = if let Some(algo) = algo { + algo.Call_( + &this_object.handle(), + Some(reason), + ExceptionHandling::Rethrow, + can_gc, + ) + } else { + Ok(Promise::new_resolved(global, cx, (), can_gc)) + }; + result.unwrap_or_else(|e| { + let promise = Promise::new(global, can_gc); + promise.reject_error(e, can_gc); + promise + }) + }, + UnderlyingSinkType::Transfer => { + // TODO: implement transfer. + Promise::new_resolved(global, cx, (), can_gc) + }, }; - result.unwrap_or_else(|e| { - let promise = Promise::new(global, can_gc); - promise.reject_error(e, can_gc); - promise - }) + + // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + result } - pub(crate) fn call_write_algorithm( + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm> + fn call_write_algorithm( &self, cx: SafeJSContext, chunk: SafeHandleValue, global: &GlobalScope, can_gc: CanGc, ) -> Rc<Promise> { - rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); - let algo = self.write.borrow().clone(); - let result = if let Some(algo) = algo { - algo.Call_( - &this_object.handle(), - chunk, - self, - ExceptionHandling::Rethrow, - can_gc, - ) - } else { - Ok(Promise::new_resolved(global, cx, (), can_gc)) - }; - result.unwrap_or_else(|e| { - let promise = Promise::new(global, can_gc); - promise.reject_error(e, can_gc); - promise - }) + match self.underlying_sink_type { + UnderlyingSinkType::Js => { + rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); + let algo = self.write.borrow().clone(); + let result = if let Some(algo) = algo { + algo.Call_( + &this_object.handle(), + chunk, + self, + ExceptionHandling::Rethrow, + can_gc, + ) + } else { + Ok(Promise::new_resolved(global, cx, (), can_gc)) + }; + result.unwrap_or_else(|e| { + let promise = Promise::new(global, can_gc); + promise.reject_error(e, can_gc); + promise + }) + }, + UnderlyingSinkType::Transfer => { + // TODO: implement transfer. + Promise::new_resolved(global, cx, (), can_gc) + }, + } } + /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm> fn call_close_algorithm( &self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc, ) -> Rc<Promise> { - rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>()); - this_object.set(self.underlying_sink_obj.get()); - let algo = self.close.borrow().clone(); - let result = if let Some(algo) = algo { - algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc) - } else { - Ok(Promise::new_resolved(global, cx, (), can_gc)) - }; - result.unwrap_or_else(|e| { - let promise = Promise::new(global, can_gc); - promise.reject_error(e, can_gc); - promise - }) + match self.underlying_sink_type { + UnderlyingSinkType::Js => { + rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>()); + this_object.set(self.underlying_sink_obj.get()); + let algo = self.close.borrow().clone(); + let result = if let Some(algo) = algo { + algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc) + } else { + Ok(Promise::new_resolved(global, cx, (), can_gc)) + }; + result.unwrap_or_else(|e| { + let promise = Promise::new(global, can_gc); + promise.reject_error(e, can_gc); + promise + }) + }, + UnderlyingSinkType::Transfer => { + // TODO: implement transfer. + Promise::new_resolved(global, cx, (), can_gc) + }, + } } /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close> |