/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ use std::cell::RefCell; use std::rc::Rc; use dom_struct::dom_struct; use js::jsapi::{ ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject, }; use js::jsval::UndefinedValue; use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue}; use super::bindings::cell::DomRefCell; use super::bindings::codegen::Bindings::TransformerBinding::{ Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback, }; use super::types::TransformStream; use crate::dom::bindings::callback::ExceptionHandling; use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods; use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible}; use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object}; use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; use crate::realms::{InRealm, enter_realm}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; impl js::gc::Rootable for TransformTransformPromiseRejection {} /// Reacting to transformPromise as part of /// #[derive(JSTraceable, MallocSizeOf)] #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] struct TransformTransformPromiseRejection { controller: Dom, } impl Callback for TransformTransformPromiseRejection { /// Reacting to transformPromise with the following fulfillment steps: #[allow(unsafe_code)] fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) { // Perform ! TransformStreamError(controller.[[stream]], r). self.controller .error(cx, &self.controller.global(), v, can_gc); // Throw r. // Note: this is done part of perform_transform(). } } /// #[dom_struct] pub struct TransformStreamDefaultController { reflector_: Reflector, /// #[ignore_malloc_size_of = "Rc is hard"] cancel: RefCell>>, /// #[ignore_malloc_size_of = "Rc is hard"] flush: RefCell>>, /// #[ignore_malloc_size_of = "Rc is hard"] transform: RefCell>>, /// The JS object used as `this` when invoking sink algorithms. #[ignore_malloc_size_of = "mozjs"] transform_obj: Heap<*mut JSObject>, /// stream: MutNullableDom, /// #[ignore_malloc_size_of = "Rc is hard"] finish_promise: DomRefCell>>, } impl TransformStreamDefaultController { #[cfg_attr(crown, allow(crown::unrooted_must_root))] fn new_inherited(transformer: &Transformer) -> TransformStreamDefaultController { TransformStreamDefaultController { reflector_: Reflector::new(), cancel: RefCell::new(transformer.cancel.clone()), flush: RefCell::new(transformer.flush.clone()), transform: RefCell::new(transformer.transform.clone()), finish_promise: DomRefCell::new(None), stream: MutNullableDom::new(None), transform_obj: Default::default(), } } #[cfg_attr(crown, allow(crown::unrooted_must_root))] pub(crate) fn new( global: &GlobalScope, transformer: &Transformer, can_gc: CanGc, ) -> DomRoot { reflect_dom_object( Box::new(TransformStreamDefaultController::new_inherited(transformer)), global, can_gc, ) } /// Setting the JS object after the heap has settled down. pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) { self.transform_obj.set(*this_object); } pub(crate) fn set_stream(&self, stream: &TransformStream) { self.stream.set(Some(stream)); } pub(crate) fn get_finish_promise(&self) -> Option> { self.finish_promise.borrow().clone() } pub(crate) fn set_finish_promise(&self, promise: Rc) { *self.finish_promise.borrow_mut() = Some(promise); } /// pub(crate) fn transform_stream_default_controller_perform_transform( &self, cx: SafeJSContext, global: &GlobalScope, chunk: SafeHandleValue, can_gc: CanGc, ) -> Fallible> { // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk. let transform_promise = self.perform_transform(cx, global, chunk, can_gc)?; // Return the result of reacting to transformPromise with the following rejection steps given the argument r: rooted!(in(*cx) let mut reject_handler = Some(TransformTransformPromiseRejection { controller: Dom::from_ref(self), })); let handler = PromiseNativeHandler::new( global, None, reject_handler.take().map(|h| Box::new(h) as Box<_>), can_gc, ); let realm = enter_realm(global); let comp = InRealm::Entered(&realm); transform_promise.append_native_handler(&handler, comp, can_gc); Ok(transform_promise) } pub(crate) fn perform_transform( &self, cx: SafeJSContext, global: &GlobalScope, chunk: SafeHandleValue, can_gc: CanGc, ) -> Fallible> { // If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which // takes an argument chunk and returns the result of invoking transformerDict["transform"] with argument list // « chunk, controller » and callback this value transformer. let algo = self.transform.borrow().clone(); let result = if let Some(transform) = algo { rooted!(in(*cx) let this_object = self.transform_obj.get()); let call_result = transform.Call_( &this_object.handle(), chunk, self, ExceptionHandling::Rethrow, can_gc, ); match call_result { Ok(p) => p, Err(e) => { let p = Promise::new(global, can_gc); p.reject_error(e, can_gc); p }, } } else { // Let transformAlgorithm be the following steps, taking a chunk argument: // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk). // If result is an abrupt completion, return a promise rejected with result.[[Value]]. let promise = if let Err(error) = self.enqueue(cx, global, chunk, can_gc) { rooted!(in(*cx) let mut error_val = UndefinedValue()); error.to_jsval(cx, global, error_val.handle_mut(), can_gc); Promise::new_rejected(global, cx, error_val.handle(), can_gc) } else { // Otherwise, return a promise resolved with undefined. Promise::new_resolved(global, cx, (), can_gc) }; promise }; Ok(result) } pub(crate) fn perform_cancel( &self, cx: SafeJSContext, global: &GlobalScope, chunk: SafeHandleValue, can_gc: CanGc, ) -> Fallible> { // If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument // reason and returns the result of invoking transformerDict["cancel"] with argument list « reason » // and callback this value transformer. let algo = self.cancel.borrow().clone(); let result = if let Some(cancel) = algo { rooted!(in(*cx) let this_object = self.transform_obj.get()); let call_result = cancel.Call_( &this_object.handle(), chunk, ExceptionHandling::Rethrow, can_gc, ); match call_result { Ok(p) => p, Err(e) => { let p = Promise::new(global, can_gc); p.reject_error(e, can_gc); p }, } } else { // Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined. Promise::new_resolved(global, cx, (), can_gc) }; Ok(result) } pub(crate) fn perform_flush( &self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc, ) -> Fallible> { // If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns the result of // invoking transformerDict["flush"] with argument list « controller » and callback this value transformer. let algo = self.flush.borrow().clone(); let result = if let Some(flush) = algo { rooted!(in(*cx) let this_object = self.transform_obj.get()); let call_result = flush.Call_( &this_object.handle(), self, ExceptionHandling::Rethrow, can_gc, ); match call_result { Ok(p) => p, Err(e) => { let p = Promise::new(global, can_gc); p.reject_error(e, can_gc); p }, } } else { // Let flushAlgorithm be an algorithm which returns a promise resolved with undefined. Promise::new_resolved(global, cx, (), can_gc) }; Ok(result) } /// #[allow(unsafe_code)] pub(crate) fn enqueue( &self, cx: SafeJSContext, global: &GlobalScope, chunk: SafeHandleValue, can_gc: CanGc, ) -> Fallible<()> { // Let stream be controller.[[stream]]. let stream = self.stream.get().expect("stream is null"); // Let readableController be stream.[[readable]].[[controller]]. let readable = stream.get_readable(); let readable_controller = readable.get_default_controller(); // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController) // is false, throw a TypeError exception. if !readable_controller.can_close_or_enqueue() { return Err(Error::Type( "ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(), )); } // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk). // If enqueueResult is an abrupt completion, if let Err(error) = readable_controller.enqueue(cx, chunk, can_gc) { // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]). rooted!(in(*cx) let mut rooted_error = UndefinedValue()); error .clone() .to_jsval(cx, global, rooted_error.handle_mut(), can_gc); stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc); // Throw stream.[[readable]].[[storedError]]. unsafe { if !JS_IsExceptionPending(*cx) { rooted!(in(*cx) let mut stored_error = UndefinedValue()); readable.get_stored_error(stored_error.handle_mut()); JS_SetPendingException( *cx, stored_error.handle().into(), ExceptionStackBehavior::Capture, ); } } return Err(error); } // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController). let backpressure = readable_controller.has_backpressure(); // If backpressure is not stream.[[backpressure]], if backpressure != stream.get_backpressure() { // Assert: backpressure is true. assert!(backpressure); // Perform ! TransformStreamSetBackpressure(stream, true). stream.set_backpressure(global, true, can_gc); } Ok(()) } /// pub(crate) fn error( &self, cx: SafeJSContext, global: &GlobalScope, reason: SafeHandleValue, can_gc: CanGc, ) { // Perform ! TransformStreamError(controller.[[stream]], e). self.stream .get() .expect("stream is undefined") .error(cx, global, reason, can_gc); } /// pub(crate) fn clear_algorithms(&self) { // Set controller.[[transformAlgorithm]] to undefined. self.transform.replace(None); // Set controller.[[flushAlgorithm]] to undefined. self.flush.replace(None); // Set controller.[[cancelAlgorithm]] to undefined. self.cancel.replace(None); } /// pub(crate) fn terminate(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { // Let stream be controller.[[stream]]. let stream = self.stream.get().expect("stream is null"); // Let readableController be stream.[[readable]].[[controller]]. let readable = stream.get_readable(); let readable_controller = readable.get_default_controller(); // Perform ! ReadableStreamDefaultControllerClose(readableController). readable_controller.close(can_gc); // Let error be a TypeError exception indicating that the stream has been terminated. let error = Error::Type("stream has been terminated".to_owned()); // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error). rooted!(in(*cx) let mut rooted_error = UndefinedValue()); error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc); stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc); } } impl TransformStreamDefaultControllerMethods for TransformStreamDefaultController { /// fn GetDesiredSize(&self) -> Option { // Let readableController be this.[[stream]].[[readable]].[[controller]]. let readable_controller = self .stream .get() .expect("stream is null") .get_readable() .get_default_controller(); // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController). readable_controller.get_desired_size() } /// fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> { // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk). self.enqueue(cx, &self.global(), chunk, can_gc) } /// fn Error(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Fallible<()> { // Perform ? TransformStreamDefaultControllerError(this, e). self.error(cx, &self.global(), reason, can_gc); Ok(()) } /// fn Terminate(&self, can_gc: CanGc) -> Fallible<()> { // Perform ? TransformStreamDefaultControllerTerminate(this). self.terminate(GlobalScope::get_cx(), &self.global(), can_gc); Ok(()) } }