/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use std::cell::Cell; use std::ptr::{self}; use std::rc::Rc; use dom_struct::dom_struct; use js::conversions::ToJSValConvertible; use js::jsapi::{Heap, JSObject}; use js::jsval::{JSVal, ObjectValue, UndefinedValue}; use js::rust::{ HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, MutableHandleValue as SafeMutableHandleValue, }; use js::typedarray::ArrayBufferViewU8; use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy; use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{ ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode, }; use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods; use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods; use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource; use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult}; use crate::dom::bindings::error::Error; use crate::dom::bindings::import::module::Fallible; use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader; use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto}; use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom}; use crate::dom::bindings::trace::RootedTraceableBox; use crate::dom::bindings::utils::get_dictionary_property; use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm}; use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; use crate::dom::readablebytestreamcontroller::ReadableByteStreamController; use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader; use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController; use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader}; use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm; use crate::dom::types::DefaultTeeUnderlyingSource; use crate::dom::underlyingsourcecontainer::UnderlyingSourceType; use crate::js::conversions::FromJSValConvertible; use crate::realms::{enter_realm, InRealm}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; use super::bindings::buffer_source::HeapBufferSource; use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions; use super::readablestreambyobreader::ReadIntoRequest; /// The fulfillment handler for the reacting to sourceCancelPromise part of /// . #[derive(Clone, JSTraceable, MallocSizeOf)] struct SourceCancelPromiseFulfillmentHandler { #[ignore_malloc_size_of = "Rc are hard"] result: Rc, } impl Callback for SourceCancelPromiseFulfillmentHandler { /// The fulfillment handler for the reacting to sourceCancelPromise part of /// . /// An implementation of fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) { self.result.resolve_native(&()); } } /// The rejection handler for the reacting to sourceCancelPromise part of /// . #[derive(Clone, JSTraceable, MallocSizeOf)] struct SourceCancelPromiseRejectionHandler { #[ignore_malloc_size_of = "Rc are hard"] result: Rc, } impl Callback for SourceCancelPromiseRejectionHandler { /// The rejection handler for the reacting to sourceCancelPromise part of /// . /// An implementation of fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) { self.result.reject_native(&v); } } /// #[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)] pub(crate) enum ReadableStreamState { #[default] Readable, Closed, Errored, } /// #[derive(JSTraceable, MallocSizeOf)] #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] pub(crate) enum ControllerType { /// Byte(MutNullableDom), /// Default(MutNullableDom), } /// #[derive(JSTraceable, MallocSizeOf)] #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] pub(crate) enum ReaderType { /// #[allow(clippy::upper_case_acronyms)] BYOB(MutNullableDom), /// Default(MutNullableDom), } /// #[cfg_attr(crown, allow(crown::unrooted_must_root))] fn create_readable_stream( global: &GlobalScope, underlying_source_type: UnderlyingSourceType, queuing_strategy: QueuingStrategy, can_gc: CanGc, ) -> DomRoot { // If highWaterMark was not passed, set it to 1. let high_water_mark = queuing_strategy.highWaterMark.unwrap_or(1.0); // If sizeAlgorithm was not passed, set it to an algorithm that returns 1. let size_algorithm = queuing_strategy .size .unwrap_or(extract_size_algorithm(&QueuingStrategy::empty())); // Assert: ! IsNonNegativeNumber(highWaterMark) is true. assert!(high_water_mark >= 0.0); // Let stream be a new ReadableStream. // Perform ! InitializeReadableStream(stream). let stream = ReadableStream::new_with_proto( global, None, ControllerType::Default(MutNullableDom::new(None)), can_gc, ); // Let controller be a new ReadableStreamDefaultController. let controller = ReadableStreamDefaultController::new( global, underlying_source_type, high_water_mark, size_algorithm, can_gc, ); // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm). controller .setup(stream.clone(), can_gc) .expect("Setup of default controller cannot fail"); // Return stream. stream } /// #[dom_struct] pub(crate) struct ReadableStream { reflector_: Reflector, /// /// Note: the inner `MutNullableDom` should really be an `Option`, /// because it is never unset once set. controller: ControllerType, /// #[ignore_malloc_size_of = "mozjs"] stored_error: Heap, /// disturbed: Cell, /// reader: ReaderType, /// state: Cell, } impl ReadableStream { #[cfg_attr(crown, allow(crown::unrooted_must_root))] /// fn new_inherited(controller: ControllerType) -> ReadableStream { let reader = match &controller { ControllerType::Default(_) => ReaderType::Default(MutNullableDom::new(None)), ControllerType::Byte(_) => ReaderType::BYOB(MutNullableDom::new(None)), }; ReadableStream { reflector_: Reflector::new(), controller, stored_error: Heap::default(), disturbed: Default::default(), reader, state: Cell::new(Default::default()), } } #[cfg_attr(crown, allow(crown::unrooted_must_root))] fn new_with_proto( global: &GlobalScope, proto: Option, controller: ControllerType, can_gc: CanGc, ) -> DomRoot { reflect_dom_object_with_proto( Box::new(ReadableStream::new_inherited(controller)), global, proto, can_gc, ) } /// Used as part of /// pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) { match self.controller { ControllerType::Default(ref ctrl) => ctrl.set(Some(controller)), ControllerType::Byte(_) => { unreachable!("set_default_controller called in setup of default controller.") }, } } /// Used as part of /// pub(crate) fn assert_no_controller(&self) { let has_no_controller = match self.controller { ControllerType::Default(ref ctrl) => ctrl.get().is_none(), ControllerType::Byte(ref ctrl) => ctrl.get().is_none(), }; assert!(has_no_controller); } /// Build a stream backed by a Rust source that has already been read into memory. pub(crate) fn new_from_bytes( global: &GlobalScope, bytes: Vec, can_gc: CanGc, ) -> Fallible> { let stream = ReadableStream::new_with_external_underlying_source( global, UnderlyingSourceType::Memory(bytes.len()), can_gc, )?; stream.enqueue_native(bytes); stream.controller_close_native(); Ok(stream) } /// Build a stream backed by a Rust underlying source. /// Note: external sources are always paired with a default controller. #[cfg_attr(crown, allow(crown::unrooted_must_root))] pub(crate) fn new_with_external_underlying_source( global: &GlobalScope, source: UnderlyingSourceType, can_gc: CanGc, ) -> Fallible> { assert!(source.is_native()); let stream = ReadableStream::new_with_proto( global, None, ControllerType::Default(MutNullableDom::new(None)), can_gc, ); let controller = ReadableStreamDefaultController::new( global, source, 1.0, extract_size_algorithm(&QueuingStrategy::empty()), can_gc, ); controller.setup(stream.clone(), can_gc)?; Ok(stream) } /// Call into the release steps of the controller, pub(crate) fn perform_release_steps(&self) -> Fallible<()> { match &self.controller { ControllerType::Default(controller) => controller .get() .map(|controller_ref| controller_ref.perform_release_steps()) .unwrap_or_else(|| Err(Error::Type("Stream should have controller.".to_string()))), ControllerType::Byte(_) => todo!(), } } /// Call into the pull steps of the controller, /// as part of /// pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) { match self.controller { ControllerType::Default(ref controller) => controller .get() .expect("Stream should have controller.") .perform_pull_steps(read_request, can_gc), ControllerType::Byte(_) => { unreachable!( "Pulling a chunk from a stream with a byte controller using a default reader" ) }, } } /// Call into the pull steps of the controller, /// as part of /// pub(crate) fn perform_pull_into_steps( &self, read_into_request: &ReadIntoRequest, view: HeapBufferSource, options: &ReadableStreamBYOBReaderReadOptions, can_gc: CanGc, ) { match self.controller { ControllerType::Byte(ref controller) => controller .get() .expect("Stream should have controller.") .perform_pull_into(read_into_request, view, options, can_gc), ControllerType::Default(_) => unreachable!( "Pulling a chunk from a stream with a default controller using a BYOB reader" ), } } /// pub(crate) fn add_read_request(&self, read_request: &ReadRequest) { match self.reader { // Assert: stream.[[reader]] implements ReadableStreamDefaultReader. ReaderType::Default(ref reader) => { let Some(reader) = reader.get() else { panic!("Attempt to add a read request without having first acquired a reader."); }; // Assert: stream.[[state]] is "readable". assert!(self.is_readable()); // Append readRequest to stream.[[reader]].[[readRequests]]. reader.add_read_request(read_request); }, ReaderType::BYOB(_) => { unreachable!("Adding a read request can only be done on a default reader.") }, } } #[allow(dead_code)] /// pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) { match self.reader { // Assert: stream.[[reader]] implements ReadableStreamBYOBReader. ReaderType::Default(_) => { unreachable!("Adding a read into request can only be done on a BYOB reader.") }, ReaderType::BYOB(ref reader) => { let Some(reader) = reader.get() else { unreachable!("Attempt to add a read into request without having first acquired a reader."); }; // Assert: stream.[[state]] is "readable" or "closed". assert!(self.is_readable() || self.is_closed()); // Append readRequest to stream.[[reader]].[[readIntoRequests]]. reader.add_read_into_request(read_request); }, } } /// Endpoint to enqueue chunks directly from Rust. /// Note: in other use cases this call happens via the controller. pub(crate) fn enqueue_native(&self, bytes: Vec) { match self.controller { ControllerType::Default(ref controller) => controller .get() .expect("Stream should have controller.") .enqueue_native(bytes), _ => unreachable!( "Enqueueing chunk to a stream from Rust on other than default controller" ), } } /// pub(crate) fn error(&self, e: SafeHandleValue) { // Assert: stream.[[state]] is "readable". assert!(self.is_readable()); // Set stream.[[state]] to "errored". self.state.set(ReadableStreamState::Errored); // Set stream.[[storedError]] to e. self.stored_error.set(e.get()); // Let reader be stream.[[reader]]. match self.reader { ReaderType::Default(ref reader) => { let Some(reader) = reader.get() else { // If reader is undefined, return. return; }; reader.error(e); }, // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e). _ => todo!(), } } /// pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) { handle_mut.set(self.stored_error.get()); } /// /// Note: in other use cases this call happens via the controller. #[allow(unsafe_code)] pub(crate) fn error_native(&self, error: Error) { let cx = GlobalScope::get_cx(); rooted!(in(*cx) let mut error_val = UndefinedValue()); unsafe { error.to_jsval(*cx, &self.global(), error_val.handle_mut()) }; self.error(error_val.handle()); } /// Call into the controller's `Close` method. /// pub(crate) fn controller_close_native(&self) { match self.controller { ControllerType::Default(ref controller) => { let _ = controller .get() .expect("Stream should have controller.") .Close(); }, ControllerType::Byte(_) => { unreachable!("Native closing is only done on default controllers.") }, } } /// Returns a boolean reflecting whether the stream has all data in memory. /// Useful for native source integration only. pub(crate) fn in_memory(&self) -> bool { match self.controller { ControllerType::Default(ref controller) => controller .get() .expect("Stream should have controller.") .in_memory(), ControllerType::Byte(_) => unreachable!( "Checking if source is in memory for a stream with a non-default controller" ), } } /// Return bytes for synchronous use, if the stream has all data in memory. /// Useful for native source integration only. pub(crate) fn get_in_memory_bytes(&self) -> Option> { match self.controller { ControllerType::Default(ref controller) => controller .get() .expect("Stream should have controller.") .get_in_memory_bytes(), ControllerType::Byte(_) => { unreachable!("Getting in-memory bytes for a stream with a non-default controller") }, } } /// Acquires a reader and locks the stream, /// must be done before `read_a_chunk`. /// Native call to /// pub(crate) fn acquire_default_reader( &self, can_gc: CanGc, ) -> Fallible> { // Let reader be a new ReadableStreamDefaultReader. let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc); // Perform ? SetUpReadableStreamDefaultReader(reader, stream). reader.set_up(self, &self.global(), can_gc)?; // Return reader. Ok(reader) } /// pub(crate) fn acquire_byob_reader( &self, can_gc: CanGc, ) -> Fallible> { // Let reader be a new ReadableStreamBYOBReader. let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc); // Perform ? SetUpReadableStreamBYOBReader(reader, stream). reader.set_up(self, &self.global(), can_gc)?; // Return reader. Ok(reader) } pub(crate) fn get_default_controller(&self) -> DomRoot { match self.controller { ControllerType::Default(ref controller) => { controller.get().expect("Stream should have controller.") }, ControllerType::Byte(_) => unreachable!( "Getting default controller for a stream with a non-default controller" ), } } /// Read a chunk from the stream, /// must be called after `start_reading`, /// and before `stop_reading`. /// Native call to /// pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc { match self.reader { ReaderType::Default(ref reader) => { let Some(reader) = reader.get() else { unreachable!( "Attempt to read stream chunk without having first acquired a reader." ); }; reader.Read(can_gc) }, ReaderType::BYOB(_) => { unreachable!("Native reading of a chunk can only be done with a default reader.") }, } } /// Releases the lock on the reader, /// must be done after `start_reading`. /// Native call to /// pub(crate) fn stop_reading(&self) { match self.reader { ReaderType::Default(ref reader) => { let Some(reader) = reader.get() else { unreachable!("Attempt to stop reading without having first acquired a reader."); }; reader.release().expect("Reader release cannot fail."); }, ReaderType::BYOB(_) => { unreachable!("Native stop reading can only be done with a default reader.") }, } } /// pub(crate) fn is_locked(&self) -> bool { match self.reader { ReaderType::Default(ref reader) => reader.get().is_some(), ReaderType::BYOB(ref reader) => reader.get().is_some(), } } pub(crate) fn is_disturbed(&self) -> bool { self.disturbed.get() } pub(crate) fn set_is_disturbed(&self, disturbed: bool) { self.disturbed.set(disturbed); } pub(crate) fn is_closed(&self) -> bool { self.state.get() == ReadableStreamState::Closed } pub(crate) fn is_errored(&self) -> bool { self.state.get() == ReadableStreamState::Errored } pub(crate) fn is_readable(&self) -> bool { self.state.get() == ReadableStreamState::Readable } pub(crate) fn has_default_reader(&self) -> bool { match self.reader { ReaderType::Default(ref reader) => reader.get().is_some(), ReaderType::BYOB(_) => false, } } pub(crate) fn has_byte_controller(&self) -> bool { matches!(self.controller, ControllerType::Byte(_)) } /// pub(crate) fn get_num_read_requests(&self) -> usize { assert!(self.has_default_reader()); match self.reader { ReaderType::Default(ref reader) => { let reader = reader .get() .expect("Stream must have a reader when get num read requests is called into."); reader.get_num_read_requests() }, ReaderType::BYOB(_) => unreachable!( "Stream must have a default reader when get num read requests is called into." ), } } /// #[cfg_attr(crown, allow(crown::unrooted_must_root))] pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool) { // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true. assert!(self.has_default_reader()); match self.reader { ReaderType::Default(ref reader) => { // step 2 - Let reader be stream.[[reader]]. let reader = reader .get() .expect("Stream must have a reader when a read request is fulfilled."); // step 3 - Assert: reader.[[readRequests]] is not empty. assert_ne!(reader.get_num_read_requests(), 0); // step 4 & 5 // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]]. let request = reader.remove_read_request(); if done { // step 6 - If done is true, perform readRequest’s close steps. request.close_steps(); } else { // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk. let result = RootedTraceableBox::new(Heap::default()); result.set(*chunk); request.chunk_steps(result); } }, ReaderType::BYOB(_) => unreachable!( "Stream must have a default reader when fulfill read requests is called into." ), } } /// pub(crate) fn close(&self) { // Assert: stream.[[state]] is "readable". assert!(self.is_readable()); // Set stream.[[state]] to "closed". self.state.set(ReadableStreamState::Closed); // Let reader be stream.[[reader]]. match self.reader { ReaderType::Default(ref reader) => { let Some(reader) = reader.get() else { // If reader is undefined, return. return; }; // step 5 & 6 reader.close(); }, ReaderType::BYOB(ref _reader) => {}, } } /// #[allow(unsafe_code)] pub(crate) fn cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc { // Set stream.[[disturbed]] to true. self.disturbed.set(true); // If stream.[[state]] is "closed", return a promise resolved with undefined. if self.is_closed() { let promise = Promise::new(&self.reflector_.global(), can_gc); promise.resolve_native(&()); return promise; } // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]]. if self.is_errored() { let promise = Promise::new(&self.reflector_.global(), can_gc); unsafe { let cx = GlobalScope::get_cx(); rooted!(in(*cx) let mut rval = UndefinedValue()); self.stored_error.to_jsval(*cx, rval.handle_mut()); promise.reject_native(&rval.handle()); return promise; } } // Perform ! ReadableStreamClose(stream). self.close(); // If reader is not undefined and reader implements ReadableStreamBYOBReader, match self.reader { ReaderType::BYOB(ref reader) => { if let Some(reader) = reader.get() { // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel reader.close(); } }, ReaderType::Default(ref _reader) => {}, } // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason). let source_cancel_promise = match self.controller { ControllerType::Default(ref controller) => controller .get() .expect("Stream should have controller.") .perform_cancel_steps(reason, can_gc), ControllerType::Byte(_) => { todo!() }, }; // Create a new promise, // and setup a handler in order to react to the fulfillment of sourceCancelPromise. let global = self.reflector_.global(); let result_promise = Promise::new(&global, can_gc); let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler { result: result_promise.clone(), }); let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler { result: result_promise.clone(), }); let handler = PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler)); let realm = enter_realm(&*global); let comp = InRealm::Entered(&realm); source_cancel_promise.append_native_handler(&handler, comp, can_gc); // Return the result of reacting to sourceCancelPromise // with a fulfillment step that returns undefined. result_promise } #[cfg_attr(crown, allow(crown::unrooted_must_root))] pub(crate) fn set_reader(&self, new_reader: Option) { match (&self.reader, new_reader) { (ReaderType::Default(ref reader), Some(ReaderType::Default(new_reader))) => { reader.set(new_reader.get().as_deref()); }, (ReaderType::BYOB(ref reader), Some(ReaderType::BYOB(new_reader))) => { reader.set(new_reader.get().as_deref()); }, (ReaderType::Default(ref reader), None) => { reader.set(None); }, (ReaderType::BYOB(ref reader), None) => { reader.set(None); }, (_, _) => { unreachable!("Setting a mismatched reader type is not allowed."); }, } } /// #[cfg_attr(crown, allow(crown::unrooted_must_root))] fn default_tee( &self, clone_for_branch_2: bool, can_gc: CanGc, ) -> Fallible>> { // Assert: stream implements ReadableStream. // Assert: cloneForBranch2 is a boolean. let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2)); // Let reader be ? AcquireReadableStreamDefaultReader(stream). let reader = self.acquire_default_reader(can_gc)?; self.set_reader(Some(ReaderType::Default(MutNullableDom::new(Some( &reader, ))))); // Let reading be false. let reading = Rc::new(Cell::new(false)); // Let readAgain be false. let read_again = Rc::new(Cell::new(false)); // Let canceled1 be false. let canceled_1 = Rc::new(Cell::new(false)); // Let canceled2 be false. let canceled_2 = Rc::new(Cell::new(false)); // Let reason1 be undefined. let reason_1 = Rc::new(Heap::boxed(UndefinedValue())); // Let reason2 be undefined. let reason_2 = Rc::new(Heap::boxed(UndefinedValue())); // Let cancelPromise be a new promise. let cancel_promise = Promise::new(&self.reflector_.global(), can_gc); let tee_source_1 = DefaultTeeUnderlyingSource::new( &reader, self, reading.clone(), read_again.clone(), canceled_1.clone(), canceled_2.clone(), clone_for_branch_2.clone(), reason_1.clone(), reason_2.clone(), cancel_promise.clone(), TeeCancelAlgorithm::Cancel1Algorithm, can_gc, ); let underlying_source_type_branch_1 = UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1)); let tee_source_2 = DefaultTeeUnderlyingSource::new( &reader, self, reading, read_again, canceled_1.clone(), canceled_2.clone(), clone_for_branch_2, reason_1, reason_2, cancel_promise.clone(), TeeCancelAlgorithm::Cancel2Algorithm, can_gc, ); let underlying_source_type_branch_2 = UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2)); // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm). let branch_1 = create_readable_stream( &self.reflector_.global(), underlying_source_type_branch_1, QueuingStrategy::empty(), can_gc, ); tee_source_1.set_branch_1(&branch_1); tee_source_2.set_branch_1(&branch_1); // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm). let branch_2 = create_readable_stream( &self.reflector_.global(), underlying_source_type_branch_2, QueuingStrategy::empty(), can_gc, ); tee_source_1.set_branch_2(&branch_2); tee_source_2.set_branch_2(&branch_2); // Upon rejection of reader.[[closedPromise]] with reason r, reader.append_native_handler_to_closed_promise( &branch_1, &branch_2, canceled_1, canceled_2, cancel_promise, can_gc, ); // Return « branch_1, branch_2 ». Ok(vec![branch_1, branch_2]) } /// fn tee( &self, clone_for_branch_2: bool, can_gc: CanGc, ) -> Fallible>> { // Assert: stream implements ReadableStream. // Assert: cloneForBranch2 is a boolean. match self.controller { ControllerType::Default(ref _controller) => { // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2). self.default_tee(clone_for_branch_2, can_gc) }, ControllerType::Byte(ref _controller) => { // If stream.[[controller]] implements ReadableByteStreamController, // return ? ReadableByteStreamTee(stream). todo!() }, } } } impl ReadableStreamMethods for ReadableStream { /// fn Constructor( cx: SafeJSContext, global: &GlobalScope, proto: Option, can_gc: CanGc, underlying_source: Option<*mut JSObject>, strategy: &QueuingStrategy, ) -> Fallible> { // If underlyingSource is missing, set it to null. rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut())); // Let underlyingSourceDict be underlyingSource, // converted to an IDL value of type UnderlyingSource. let underlying_source_dict = if !underlying_source_obj.is_null() { rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get())); match JsUnderlyingSource::new(cx, obj_val.handle()) { Ok(ConversionResult::Success(val)) => val, Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())), _ => { return Err(Error::JSFailed); }, } } else { JsUnderlyingSource::empty() }; // Perform ! InitializeReadableStream(this). let stream = if underlying_source_dict.type_.is_some() { ReadableStream::new_with_proto( global, proto, ControllerType::Byte(MutNullableDom::new(None)), can_gc, ) } else { ReadableStream::new_with_proto( global, proto, ControllerType::Default(MutNullableDom::new(None)), can_gc, ) }; if underlying_source_dict.type_.is_some() { // TODO: If underlyingSourceDict["type"] is "bytes" return Err(Error::Type("Bytes streams not implemented".to_string())); } else { // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1). let high_water_mark = extract_high_water_mark(strategy, 1.0)?; // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy). let size_algorithm = extract_size_algorithm(strategy); let controller = ReadableStreamDefaultController::new( global, UnderlyingSourceType::Js(underlying_source_dict, Heap::default()), high_water_mark, size_algorithm, can_gc, ); // Note: this must be done before `setup`, // otherwise `thisOb` is null in the start callback. controller.set_underlying_source_this_object(underlying_source_obj.handle()); // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource controller.setup(stream.clone(), can_gc)?; }; Ok(stream) } /// fn Locked(&self) -> bool { self.is_locked() } /// fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc { if self.is_locked() { // If ! IsReadableStreamLocked(this) is true, // return a promise rejected with a TypeError exception. let promise = Promise::new(&self.global(), can_gc); promise.reject_error(Error::Type("stream is not locked".to_owned())); promise } else { // Return ! ReadableStreamCancel(this, reason). self.cancel(reason, can_gc) } } /// fn GetReader( &self, options: &ReadableStreamGetReaderOptions, can_gc: CanGc, ) -> Fallible { // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this). if options.mode.is_none() { return Ok(ReadableStreamReader::ReadableStreamDefaultReader( self.acquire_default_reader(can_gc)?, )); } // 2. Assert: options["mode"] is "byob". assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob); // 3. Return ? AcquireReadableStreamBYOBReader(this). Ok(ReadableStreamReader::ReadableStreamBYOBReader( self.acquire_byob_reader(can_gc)?, )) } /// fn Tee(&self, can_gc: CanGc) -> Fallible>> { // Return ? ReadableStreamTee(this, false). self.tee(false, can_gc) } } #[allow(unsafe_code)] /// Get the `done` property of an object that a read promise resolved to. pub(crate) fn get_read_promise_done(cx: SafeJSContext, v: &SafeHandleValue) -> Result { if !v.is_object() { return Err(Error::Type("Unknown format for done property.".to_string())); } unsafe { rooted!(in(*cx) let object = v.to_object()); rooted!(in(*cx) let mut done = UndefinedValue()); match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut()) { Ok(true) => match bool::from_jsval(*cx, done.handle(), ()) { Ok(ConversionResult::Success(val)) => Ok(val), Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())), _ => Err(Error::Type("Unknown format for done property.".to_string())), }, Ok(false) => Err(Error::Type("Promise has no done property.".to_string())), Err(()) => Err(Error::JSFailed), } } } #[allow(unsafe_code)] /// Get the `value` property of an object that a read promise resolved to. pub(crate) fn get_read_promise_bytes( cx: SafeJSContext, v: &SafeHandleValue, ) -> Result, Error> { if !v.is_object() { return Err(Error::Type( "Unknown format for for bytes read.".to_string(), )); } unsafe { rooted!(in(*cx) let object = v.to_object()); rooted!(in(*cx) let mut bytes = UndefinedValue()); match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut()) { Ok(true) => { match Vec::::from_jsval(*cx, bytes.handle(), ConversionBehavior::EnforceRange) { Ok(ConversionResult::Success(val)) => Ok(val), Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())), _ => Err(Error::Type("Unknown format for bytes read.".to_string())), } }, Ok(false) => Err(Error::Type("Promise has no value property.".to_string())), Err(()) => Err(Error::JSFailed), } } }