aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--components/script/dom/writablestream.rs7
-rw-r--r--components/script/dom/writablestreamdefaultcontroller.rs148
2 files changed, 105 insertions, 50 deletions
diff --git a/components/script/dom/writablestream.rs b/components/script/dom/writablestream.rs
index 7de6b013169..bbc5d6316b6 100644
--- a/components/script/dom/writablestream.rs
+++ b/components/script/dom/writablestream.rs
@@ -28,7 +28,9 @@ use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_alg
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
-use crate::dom::writablestreamdefaultcontroller::WritableStreamDefaultController;
+use crate::dom::writablestreamdefaultcontroller::{
+ UnderlyingSinkType, WritableStreamDefaultController,
+};
use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
use crate::realms::{InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
@@ -155,7 +157,7 @@ pub struct WritableStream {
impl WritableStream {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
- /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
+ /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
fn new_inherited() -> WritableStream {
WritableStream {
reflector_: Reflector::new(),
@@ -879,6 +881,7 @@ impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
// Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink
let controller = WritableStreamDefaultController::new(
global,
+ UnderlyingSinkType::Js,
&underlying_sink_dict,
high_water_mark,
size_algorithm,
diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs
index 53de61df506..691e85408e9 100644
--- a/components/script/dom/writablestreamdefaultcontroller.rs
+++ b/components/script/dom/writablestreamdefaultcontroller.rs
@@ -214,11 +214,26 @@ impl Callback for WriteAlgorithmRejectionHandler {
}
}
+/// The type of sink algorithms we are using.
+#[allow(dead_code)]
+#[derive(JSTraceable, MallocSizeOf, PartialEq)]
+pub enum UnderlyingSinkType {
+ /// Algorithms are provided by Js callbacks.
+ Js,
+ /// Algorithms supporting streams transfer are implemented in Rust.
+ /// TODO: implement transfer.
+ Transfer,
+}
+
/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
#[dom_struct]
pub struct WritableStreamDefaultController {
reflector_: Reflector,
+ /// The type of underlying sink used. Besides the default JS one,
+ /// there will be others for stream transfer, and for transform stream.
+ underlying_sink_type: UnderlyingSinkType,
+
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
#[ignore_malloc_size_of = "Rc is hard"]
abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
@@ -256,12 +271,14 @@ impl WritableStreamDefaultController {
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
fn new_inherited(
+ underlying_sink_type: UnderlyingSinkType,
underlying_sink: &UnderlyingSink,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
) -> WritableStreamDefaultController {
WritableStreamDefaultController {
reflector_: Reflector::new(),
+ underlying_sink_type,
queue: Default::default(),
stream: Default::default(),
abort: RefCell::new(underlying_sink.abort.clone()),
@@ -276,6 +293,7 @@ impl WritableStreamDefaultController {
pub(crate) fn new(
global: &GlobalScope,
+ underlying_sink_type: UnderlyingSinkType,
underlying_sink: &UnderlyingSink,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
@@ -283,6 +301,7 @@ impl WritableStreamDefaultController {
) -> DomRoot<WritableStreamDefaultController> {
reflect_dom_object(
Box::new(WritableStreamDefaultController::new_inherited(
+ underlying_sink_type,
underlying_sink,
strategy_hwm,
strategy_size,
@@ -390,6 +409,7 @@ impl WritableStreamDefaultController {
Promise::new_resolved(global, cx, result.get(), can_gc)
}
} else {
+ // Let startAlgorithm be an algorithm that returns undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
@@ -439,71 +459,103 @@ impl WritableStreamDefaultController {
reason: SafeHandleValue,
can_gc: CanGc,
) -> Rc<Promise> {
- rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
- let algo = self.abort.borrow().clone();
- let result = if let Some(algo) = algo {
- algo.Call_(
- &this_object.handle(),
- Some(reason),
- ExceptionHandling::Rethrow,
- can_gc,
- )
- } else {
- Ok(Promise::new_resolved(global, cx, (), can_gc))
+ let result = match self.underlying_sink_type {
+ UnderlyingSinkType::Js => {
+ rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
+ let algo = self.abort.borrow().clone();
+ // Let result be the result of performing this.[[abortAlgorithm]], passing reason.
+ let result = if let Some(algo) = algo {
+ algo.Call_(
+ &this_object.handle(),
+ Some(reason),
+ ExceptionHandling::Rethrow,
+ can_gc,
+ )
+ } else {
+ Ok(Promise::new_resolved(global, cx, (), can_gc))
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e, can_gc);
+ promise
+ })
+ },
+ UnderlyingSinkType::Transfer => {
+ // TODO: implement transfer.
+ Promise::new_resolved(global, cx, (), can_gc)
+ },
};
- result.unwrap_or_else(|e| {
- let promise = Promise::new(global, can_gc);
- promise.reject_error(e, can_gc);
- promise
- })
+
+ // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ result
}
- pub(crate) fn call_write_algorithm(
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
+ fn call_write_algorithm(
&self,
cx: SafeJSContext,
chunk: SafeHandleValue,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
- rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
- let algo = self.write.borrow().clone();
- let result = if let Some(algo) = algo {
- algo.Call_(
- &this_object.handle(),
- chunk,
- self,
- ExceptionHandling::Rethrow,
- can_gc,
- )
- } else {
- Ok(Promise::new_resolved(global, cx, (), can_gc))
- };
- result.unwrap_or_else(|e| {
- let promise = Promise::new(global, can_gc);
- promise.reject_error(e, can_gc);
- promise
- })
+ match self.underlying_sink_type {
+ UnderlyingSinkType::Js => {
+ rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
+ let algo = self.write.borrow().clone();
+ let result = if let Some(algo) = algo {
+ algo.Call_(
+ &this_object.handle(),
+ chunk,
+ self,
+ ExceptionHandling::Rethrow,
+ can_gc,
+ )
+ } else {
+ Ok(Promise::new_resolved(global, cx, (), can_gc))
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e, can_gc);
+ promise
+ })
+ },
+ UnderlyingSinkType::Transfer => {
+ // TODO: implement transfer.
+ Promise::new_resolved(global, cx, (), can_gc)
+ },
+ }
}
+ /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
fn call_close_algorithm(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
- rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
- this_object.set(self.underlying_sink_obj.get());
- let algo = self.close.borrow().clone();
- let result = if let Some(algo) = algo {
- algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
- } else {
- Ok(Promise::new_resolved(global, cx, (), can_gc))
- };
- result.unwrap_or_else(|e| {
- let promise = Promise::new(global, can_gc);
- promise.reject_error(e, can_gc);
- promise
- })
+ match self.underlying_sink_type {
+ UnderlyingSinkType::Js => {
+ rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
+ this_object.set(self.underlying_sink_obj.get());
+ let algo = self.close.borrow().clone();
+ let result = if let Some(algo) = algo {
+ algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
+ } else {
+ Ok(Promise::new_resolved(global, cx, (), can_gc))
+ };
+ result.unwrap_or_else(|e| {
+ let promise = Promise::new(global, can_gc);
+ promise.reject_error(e, can_gc);
+ promise
+ })
+ },
+ UnderlyingSinkType::Transfer => {
+ // TODO: implement transfer.
+ Promise::new_resolved(global, cx, (), can_gc)
+ },
+ }
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>