aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGregory Terzian <2792687+gterzian@users.noreply.github.com>2025-04-08 00:48:42 +0800
committerGitHub <noreply@github.com>2025-04-07 16:48:42 +0000
commita5c547259f4a1eda8f266e6cc3f673d21d66723b (patch)
tree3746f769258f8bf7322cb402b8b93b9afaff2bc6
parent777b74252a86c2abaeb0b5220aa87312a695ebcf (diff)
downloadservo-a5c547259f4a1eda8f266e6cc3f673d21d66723b.tar.gz
servo-a5c547259f4a1eda8f266e6cc3f673d21d66723b.zip
Streams: add an underlying sink type (#36385)
Introduces the concept of different types of underlying sinks for the writable controller, and a minor fix to the abort algorithm. The dead code is already used in the wip at https://github.com/servo/servo/pull/36181/, and will also be used in a another wip in parallel to implement transform stream, so the concept is introduced here with dead code to facilitate the work in parallel and prevent too much merge conflicts down the road. Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
-rw-r--r--components/script/dom/writablestream.rs7
-rw-r--r--components/script/dom/writablestreamdefaultcontroller.rs148
2 files changed, 105 insertions, 50 deletions
diff --git a/components/script/dom/writablestream.rs b/components/script/dom/writablestream.rs
index 7de6b013169..bbc5d6316b6 100644
--- a/components/script/dom/writablestream.rs
+++ b/components/script/dom/writablestream.rs
@@ -28,7 +28,9 @@ use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_alg
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
-use crate::dom::writablestreamdefaultcontroller::WritableStreamDefaultController;
+use crate::dom::writablestreamdefaultcontroller::{
+ UnderlyingSinkType, WritableStreamDefaultController,
+};
use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
use crate::realms::{InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
@@ -155,7 +157,7 @@ pub struct WritableStream {
impl WritableStream {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
- /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
+ /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
fn new_inherited() -> WritableStream {
WritableStream {
reflector_: Reflector::new(),
@@ -879,6 +881,7 @@ impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
// Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink
let controller = WritableStreamDefaultController::new(
global,
+ UnderlyingSinkType::Js,
&underlying_sink_dict,
high_water_mark,
size_algorithm,
diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs
index 53de61df506..691e85408e9 100644
--- a/components/script/dom/writablestreamdefaultcontroller.rs
+++ b/components/script/dom/writablestreamdefaultcontroller.rs
@@ -214,11 +214,26 @@ impl Callback for WriteAlgorithmRejectionHandler {
}
}
+/// The type of sink algorithms we are using.
+#[allow(dead_code)]
+#[derive(JSTraceable, MallocSizeOf, PartialEq)]
+pub enum UnderlyingSinkType {
+ /// Algorithms are provided by Js callbacks.
+ Js,
+ /// Algorithms supporting streams transfer are implemented in Rust.
+ /// TODO: implement transfer.
+ Transfer,
+}
+
/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
#[dom_struct]
pub struct WritableStreamDefaultController {
reflector_: Reflector,
+ /// The type of underlying sink used. Besides the default JS one,
+ /// there will be others for stream transfer, and for transform stream.
+ underlying_sink_type: UnderlyingSinkType,
+
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
#[ignore_malloc_size_of = "Rc is hard"]
abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
@@ -256,12 +271,14 @@ impl WritableStreamDefaultController {
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
fn new_inherited(
+ underlying_sink_type: UnderlyingSinkType,
underlying_sink: &UnderlyingSink,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
) -> WritableStreamDefaultController {
WritableStreamDefaultController {
reflector_: Reflector::new(),
+ underlying_sink_type,
queue: Default::default(),
stream: Default::default(),
abort: RefCell::new(underlying_sink.abort.clone()),
@@ -276,6 +293,7 @@ impl WritableStreamDefaultController {
pub(crate) fn new(
global: &GlobalScope,
+ underlying_sink_type: UnderlyingSinkType,
underlying_sink: &UnderlyingSink,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
@@ -283,6 +301,7 @@ impl WritableStreamDefaultController {
) -> DomRoot<WritableStreamDefaultController> {
reflect_dom_object(
Box::new(WritableStreamDefaultController::new_inherited(
+ underlying_sink_type,
underlying_sink,
strategy_hwm,
strategy_size,
@@ -390,6 +409,7 @@ impl WritableStreamDefaultController {
Promise::new_resolved(global, cx, result.get(), can_gc)
}
} else {
+ // Let startAlgorithm be an algorithm that returns undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
@@ -439,71 +459,103 @@ impl WritableStreamDefaultController {
reason: SafeHandleValue,
can_gc: CanGc,
) -> Rc<Promise> {
- rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
- let algo = self.abort.borrow().clone();
- let result = if let Some(algo) = algo {
- algo.Call_(
- &this_object.handle(),
- Some(reason),
- ExceptionHandling::Rethrow,
- can_gc,
- )
- } else {
- Ok(Promise::new_resolved(global, cx, (), can_gc))
+ let result = match self.underlying_sink_type {
+ UnderlyingSinkType::Js => {
+ rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
+ let algo = self.abort.borrow().clone();
+ // Let result be the result of performing this.[[abortAlgorithm]], passing reason.
+ let result = if let Some(algo) = algo {
+ algo.Call_(
+ &this_object.handle(),
+ Some(reason),
+ ExceptionHandling::Rethrow,
+ can_gc,
+ )
+ } else {
+ Ok(Promise::new_resolved(global, cx, (), can_gc))
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e, can_gc);
+ promise
+ })
+ },
+ UnderlyingSinkType::Transfer => {
+ // TODO: implement transfer.
+ Promise::new_resolved(global, cx, (), can_gc)
+ },
};
- result.unwrap_or_else(|e| {
- let promise = Promise::new(global, can_gc);
- promise.reject_error(e, can_gc);
- promise
- })
+
+ // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ result
}
- pub(crate) fn call_write_algorithm(
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
+ fn call_write_algorithm(
&self,
cx: SafeJSContext,
chunk: SafeHandleValue,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
- rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
- let algo = self.write.borrow().clone();
- let result = if let Some(algo) = algo {
- algo.Call_(
- &this_object.handle(),
- chunk,
- self,
- ExceptionHandling::Rethrow,
- can_gc,
- )
- } else {
- Ok(Promise::new_resolved(global, cx, (), can_gc))
- };
- result.unwrap_or_else(|e| {
- let promise = Promise::new(global, can_gc);
- promise.reject_error(e, can_gc);
- promise
- })
+ match self.underlying_sink_type {
+ UnderlyingSinkType::Js => {
+ rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
+ let algo = self.write.borrow().clone();
+ let result = if let Some(algo) = algo {
+ algo.Call_(
+ &this_object.handle(),
+ chunk,
+ self,
+ ExceptionHandling::Rethrow,
+ can_gc,
+ )
+ } else {
+ Ok(Promise::new_resolved(global, cx, (), can_gc))
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e, can_gc);
+ promise
+ })
+ },
+ UnderlyingSinkType::Transfer => {
+ // TODO: implement transfer.
+ Promise::new_resolved(global, cx, (), can_gc)
+ },
+ }
}
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
fn call_close_algorithm(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
- rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
- this_object.set(self.underlying_sink_obj.get());
- let algo = self.close.borrow().clone();
- let result = if let Some(algo) = algo {
- algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
- } else {
- Ok(Promise::new_resolved(global, cx, (), can_gc))
- };
- result.unwrap_or_else(|e| {
- let promise = Promise::new(global, can_gc);
- promise.reject_error(e, can_gc);
- promise
- })
+ match self.underlying_sink_type {
+ UnderlyingSinkType::Js => {
+ rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
+ this_object.set(self.underlying_sink_obj.get());
+ let algo = self.close.borrow().clone();
+ let result = if let Some(algo) = algo {
+ algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
+ } else {
+ Ok(Promise::new_resolved(global, cx, (), can_gc))
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e, can_gc);
+ promise
+ })
+ },
+ UnderlyingSinkType::Transfer => {
+ // TODO: implement transfer.
+ Promise::new_resolved(global, cx, (), can_gc)
+ },
+ }
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>