/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use std::cell::Cell; use std::ptr::{self, NonNull}; use std::rc::Rc; use dom_struct::dom_struct; use js::jsapi::JSObject; use js::jsval::{ObjectValue, UndefinedValue}; use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue}; use crate::dom::bindings::cell::DomRefCell; use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy; use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{ ReadableStreamGetReaderOptions, ReadableStreamMethods, }; use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods; use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource; use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult}; use crate::dom::bindings::error::Error; use crate::dom::bindings::import::module::Fallible; use crate::dom::bindings::import::module::UnionTypes::{ ReadableStreamDefaultControllerOrReadableByteStreamController as Controller, ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader, }; use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector}; use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; use crate::dom::bindings::utils::get_dictionary_property; use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; use crate::dom::readablebytestreamcontroller::ReadableByteStreamController; use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader; use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController; use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader}; use crate::dom::underlyingsourcecontainer::UnderlyingSourceType; use crate::js::conversions::FromJSValConvertible; use crate::realms::InRealm; use crate::script_runtime::JSContext as SafeJSContext; /// #[derive(Default, JSTraceable, MallocSizeOf)] pub enum ReadableStreamState { #[default] Readable, Closed, Errored, } /// #[derive(JSTraceable, MallocSizeOf)] #[crown::unrooted_must_root_lint::must_root] pub enum ControllerType { /// Byte(Dom), /// Default(Dom), } /// #[derive(JSTraceable, MallocSizeOf)] #[crown::unrooted_must_root_lint::must_root] pub enum ReaderType { /// BYOB(MutNullableDom), /// Default(MutNullableDom), } /// #[dom_struct] pub struct ReadableStream { reflector_: Reflector, /// controller: ControllerType, /// stored_error: DomRefCell>, /// disturbed: Cell, /// reader: ReaderType, /// state: DomRefCell, } impl ReadableStream { /// #[allow(non_snake_case)] pub fn Constructor( cx: SafeJSContext, global: &GlobalScope, _proto: Option, underlying_source: Option<*mut JSObject>, _strategy: &QueuingStrategy, ) -> Fallible> { // Step 1 rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut())); // Step 2 let underlying_source_dict = if !underlying_source_obj.is_null() { rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get())); match JsUnderlyingSource::new(cx, obj_val.handle()) { Ok(ConversionResult::Success(val)) => val, Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())), _ => { return Err(Error::Type( "Unknown format for underlying source.".to_string(), )) }, } } else { JsUnderlyingSource::empty() }; let controller = if underlying_source_dict.type_.is_some() { // TODO: byte controller. todo!() } else { ReadableStreamDefaultController::new( global, UnderlyingSourceType::Js(underlying_source_dict), ) }; Ok(ReadableStream::new( global, Controller::ReadableStreamDefaultController(controller), )) } #[allow(crown::unrooted_must_root)] fn new_inherited(controller: Controller) -> ReadableStream { let reader = match &controller { Controller::ReadableStreamDefaultController(_) => { ReaderType::Default(MutNullableDom::new(None)) }, Controller::ReadableByteStreamController(_) => { ReaderType::BYOB(MutNullableDom::new(None)) }, }; ReadableStream { reflector_: Reflector::new(), controller: match controller { Controller::ReadableStreamDefaultController(root) => { ControllerType::Default(Dom::from_ref(&*root)) }, Controller::ReadableByteStreamController(root) => { ControllerType::Byte(Dom::from_ref(&*root)) }, }, stored_error: DomRefCell::new(None), disturbed: Default::default(), reader: reader, state: DomRefCell::new(ReadableStreamState::Readable), } } fn new(global: &GlobalScope, controller: Controller) -> DomRoot { reflect_dom_object(Box::new(ReadableStream::new_inherited(controller)), global) } /// Used from RustCodegen.py /// TODO: remove here and its use in codegen. #[allow(unsafe_code)] pub unsafe fn from_js( _cx: SafeJSContext, _obj: *mut JSObject, _realm: InRealm, ) -> Result, ()> { Err(()) } /// Build a stream backed by a Rust source that has already been read into memory. pub fn new_from_bytes(global: &GlobalScope, bytes: Vec) -> DomRoot { let stream = ReadableStream::new_with_external_underlying_source( global, UnderlyingSourceType::Memory(bytes.len()), ); stream.enqueue_native(bytes); stream.close_native(); stream } /// Build a stream backed by a Rust underlying source. /// Note: external sources are always paired with a default controller. #[allow(unsafe_code)] pub fn new_with_external_underlying_source( global: &GlobalScope, source: UnderlyingSourceType, ) -> DomRoot { assert!(source.is_native()); let controller = ReadableStreamDefaultController::new(global, source); let stream = ReadableStream::new( global, Controller::ReadableStreamDefaultController(controller.clone()), ); controller.set_stream(&stream); stream } /// Call into the pull steps of the controller, /// as part of /// pub fn perform_pull_steps(&self, read_request: ReadRequest) { match self.controller { ControllerType::Default(ref controller) => controller.perform_pull_steps(read_request), _ => todo!(), } } /// pub fn add_read_request(&self, read_request: ReadRequest) { match self.reader { ReaderType::Default(ref reader) => { let Some(reader) = reader.get() else { panic!("Attempt to read stream chunk without having acquired a reader."); }; reader.add_read_request(read_request); }, _ => unreachable!("Adding a read request can only be done on a default reader."), } } /// Get a pointer to the underlying JS object. /// TODO: remove, /// by using at call point the `ReadableStream` directly instead of a JSObject. pub fn get_js_stream(&self) -> NonNull { NonNull::new(*self.reflector().get_jsobject()) .expect("Couldn't get a non-null pointer to JS stream object.") } /// Endpoint to enqueue chunks directly from Rust. /// Note: in other use cases this call happens via the controller. pub fn enqueue_native(&self, bytes: Vec) { match self.controller { ControllerType::Default(ref controller) => controller.enqueue_native(bytes), _ => unreachable!( "Enqueueing chunk to a stream from Rust on other than default controller" ), } } /// /// Note: in other use cases this call happens via the controller. pub fn error_native(&self, _error: Error) { *self.state.borrow_mut() = ReadableStreamState::Errored; match self.controller { ControllerType::Default(ref controller) => controller.error(), _ => unreachable!("Native closing a stream with a non-default controller"), } } /// /// Note: in other use cases this call happens via the controller. pub fn close_native(&self) { match self.controller { ControllerType::Default(ref controller) => controller.close(), _ => unreachable!("Native closing a stream with a non-default controller"), } } /// Returns a boolean reflecting whether the stream has all data in memory. /// Useful for native source integration only. pub fn in_memory(&self) -> bool { match self.controller { ControllerType::Default(ref controller) => controller.in_memory(), _ => unreachable!( "Checking if source is in memory for a stream with a non-default controller" ), } } /// Return bytes for synchronous use, if the stream has all data in memory. /// Useful for native source integration only. pub fn get_in_memory_bytes(&self) -> Option> { match self.controller { ControllerType::Default(ref controller) => controller.get_in_memory_bytes(), _ => unreachable!("Getting in-memory bytes for a stream with a non-default controller"), } } /// Native call to /// /// TODO: restructure this on related methods so the caller gets a reader? pub fn start_reading(&self) -> Result<(), ()> { if self.is_locked() { return Err(()); } let global = self.global(); match self.reader { ReaderType::Default(ref reader) => { reader.set(Some(&*ReadableStreamDefaultReader::new(&*global, self))) }, _ => unreachable!("Native start reading can only be done on a default reader."), } Ok(()) } /// Native call to /// /// TODO: restructure this on related methods so the caller reads from a reader? pub fn read_a_chunk(&self) -> Rc { match self.reader { ReaderType::Default(ref reader) => { let Some(reader) = reader.get() else { panic!("Attempt to read stream chunk without having acquired a reader."); }; reader.Read() }, _ => unreachable!("Native reading a chunk can only be done on a default reader."), } } /// Native call to /// /// TODO: restructure this on related methods so the caller releases a reader? pub fn stop_reading(&self) { match self.reader { ReaderType::Default(ref reader) => { let Some(rooted_reader) = reader.get() else { panic!("Attempt to read stream chunk without having acquired a reader."); }; rooted_reader.ReleaseLock(); reader.set(None); }, _ => unreachable!("Native stop reading a chunk can only be done on a default reader."), } } pub fn is_locked(&self) -> bool { match self.reader { ReaderType::Default(ref reader) => reader.get().is_some(), ReaderType::BYOB(ref reader) => reader.get().is_some(), } } pub fn is_disturbed(&self) -> bool { self.disturbed.get() } pub fn is_closed(&self) -> bool { matches!(*self.state.borrow(), ReadableStreamState::Closed) } pub fn is_errored(&self) -> bool { matches!(*self.state.borrow(), ReadableStreamState::Errored) } pub fn is_readable(&self) -> bool { matches!(*self.state.borrow(), ReadableStreamState::Readable) } pub fn has_default_reader(&self) -> bool { match self.reader { ReaderType::Default(ref reader) => reader.get().is_some(), ReaderType::BYOB(_) => false, } } /// pub fn get_num_read_requests(&self) -> usize { assert!(self.has_default_reader()); match self.reader { ReaderType::Default(ref reader) => { let reader = reader .get() .expect("Stream must have a reader when get num read requests is called into."); reader.get_num_read_requests() }, _ => unreachable!( "Stream must have a default reader when get num read requests is called into." ), } } /// pub fn fulfill_read_request(&self, chunk: Vec, done: bool) { assert!(self.has_default_reader()); match self.reader { ReaderType::Default(ref reader) => { let reader = reader .get() .expect("Stream must have a reader when a read request is fulfilled."); let request = reader.remove_read_request(); if !done { request.chunk_steps(chunk); } else { request.close_steps(); } }, _ => unreachable!( "Stream must have a default reader when fulfill read requests is called into." ), } } } impl ReadableStreamMethods for ReadableStream { /// fn Locked(&self) -> bool { // TODO false } /// fn Cancel(&self, _cx: SafeJSContext, _reason: SafeHandleValue) -> Rc { // TODO Promise::new(&self.reflector_.global()) } /// fn GetReader( &self, _options: &ReadableStreamGetReaderOptions, ) -> Fallible { if self.is_locked() { return Err(Error::Type("Stream is locked".to_string())); } match self.reader { ReaderType::Default(ref reader) => { reader.set(Some(&*ReadableStreamDefaultReader::new( &*self.global(), self, ))); return Ok(ReadableStreamReader::ReadableStreamDefaultReader( reader.get().unwrap(), )); }, _ => todo!(), } } } #[allow(unsafe_code)] /// Get the `done` property of an object that a read promise resolved to. pub fn get_read_promise_done(cx: SafeJSContext, v: &SafeHandleValue) -> Result { unsafe { rooted!(in(*cx) let object = v.to_object()); rooted!(in(*cx) let mut done = UndefinedValue()); match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut()) { Ok(true) => match bool::from_jsval(*cx, done.handle(), ()) { Ok(ConversionResult::Success(val)) => Ok(val), Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())), _ => Err(Error::Type("Unknown format for done property.".to_string())), }, Ok(false) => Err(Error::Type("Promise has no done property.".to_string())), Err(()) => Err(Error::JSFailed), } } } #[allow(unsafe_code)] /// Get the `value` property of an object that a read promise resolved to. pub fn get_read_promise_bytes(cx: SafeJSContext, v: &SafeHandleValue) -> Result, Error> { unsafe { rooted!(in(*cx) let object = v.to_object()); rooted!(in(*cx) let mut bytes = UndefinedValue()); match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut()) { Ok(true) => { match Vec::::from_jsval(*cx, bytes.handle(), ConversionBehavior::EnforceRange) { Ok(ConversionResult::Success(val)) => Ok(val), Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())), _ => Err(Error::Type("Unknown format for bytes read.".to_string())), } }, Ok(false) => Err(Error::Type("Promise has no value property.".to_string())), Err(()) => Err(Error::JSFailed), } } }