aboutsummaryrefslogtreecommitdiffstats
path: root/components/script/dom/readablestreamdefaultcontroller.rs
diff options
context:
space:
mode:
authorGregory Terzian <2792687+gterzian@users.noreply.github.com>2024-12-18 05:14:00 +0800
committerGitHub <noreply@github.com>2024-12-17 21:14:00 +0000
commit379bbb41dde5c46ff39cfc9027d7df49fae733b8 (patch)
treeb8224b9e9d088885fcb3dff405118d5ef932080f /components/script/dom/readablestreamdefaultcontroller.rs
parent026d3717177def1b77e8790f3f045feea66df872 (diff)
downloadservo-379bbb41dde5c46ff39cfc9027d7df49fae733b8.tar.gz
servo-379bbb41dde5c46ff39cfc9027d7df49fae733b8.zip
Dom: Re-implement `ReadableStream` Part 1 : Default `Reader` and `Controller` (#34064)
* Re-implement readablestream: basics and default reader and controller --------- Co-authored-by: Jason Tsai <jason@pews.dev> Signed-off-by: Wu Wayne <yuweiwu@pm.me> Add remaining WebIDLs of ReadableStream (#32605) * Add Reader's WebIDL files * Add necessary methods in ReadableStream.webidl Signed-off-by: Wu Wayne <yuweiwu@pm.me> Create safe wrapper for JSFunctions (#32620) * Create safe wrapper for JSFunctions Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Add assert to check if the name ends in a null character Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Create macro to wrap unsafe extern "C" function calls Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Remove WRAPPER_FN Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Add macro example documentation Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Use C-string literals Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Ensure name is Cstr type Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Scope #[allow(unsafe_code)] Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> --------- Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> Signed-off-by: Wu Wayne <yuweiwu@pm.me> Start implementation of default controller and reader Start implementation of default controller and reader * implement basic internal slots, with todos Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * enum for controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * re-implement native controller methods Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add calling into pull algo Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * more details on chunk enqueuing Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add fulfill read request, clean-up warnings Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * read request and reader typing Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * allow for more than one non-native underlying source type Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add todo for should pull Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add underlying source dom struct container Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove rc around source type * add default controller init in stream constructor * setup source container with prototype of source dict * clean-up docs, dispatch of controller in pull algo call * turn off SM streams * remove prototype setting on underlying source container * fix read request promise resolving * tidy * clean-up js conversions in read req handlers * add queue with sizes concept * use dom in pull promise handlers * Demonstrate using dictionary as callback this object. * move value with size to a struct * fmt * put readable stream state in a cell * nits in expectations * remove allow unroot by passing read result directly to promise resolving * tidy * root default controller inside call_pull_if_needed --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: Josh Matthews <josh@joshmatthews.net> Signed-off-by: Wu Wayne <yuweiwu@pm.me> ReadableStream: implement Cancel and Locked (#33136) * implement Locked * implement Cancel and close Signed-off-by: Wu Wayne <yuweiwu@pm.me> Add GetPromiseIsHandled and SetAnyPromiseIsHandled to Promise Signed-off-by: Taym <haddadi.taym@gmail.com> mach fmt Signed-off-by: Taym <haddadi.taym@gmail.com> Readablestream default controller: get desired size (#33497) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> stream: implement controller close (#33498) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> implement stream default controller error (#33503) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Readablestream default controller: enqueue (#33528) * Implement ReadableStreamDefaultControllerMethods::Enqueue Signed-off-by: Wu Wayne <yuweiwu@pm.me> * Add spec comments Signed-off-by: Wu Wayne <yuweiwu@pm.me> --------- Signed-off-by: Wu Wayne <yuweiwu@pm.me> readablestream default controller: fulfill read requests (#33542) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Fix extract_size_algorithm (#33561) Signed-off-by: Wu Wayne <yuweiwu@pm.me> Readablestream default controller: use strategy size (#33551) * readablestream default controller: use strategy size, fallible enqueue Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> docs * readablestream default controller: clear strategy size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * prevent potential re-borrow panics when calling into the strategy size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * document readablestream constructor Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Readablestream: impl default controller should pull, start algo (#33586) * implement should-pull algo for default controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add start algorithm setup for default controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> implement promise native handling for start and pull algorithms (#33603) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Implement ReadableStreamDefaultReader (#33160) * Implement ReadableStreamDefaultReader Make the stream mutable readable-stream-reader-generic-release Proper error types when releasing Closed Cancel Signed-off-by: Taym <haddadi.taym@gmail.com> * follow the spec more closely Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Implement ReadableStreamDefaultReader read (#34007) * Implement ReadableStreamDefaultReader read Signed-off-by: Taym <haddadi.taym@gmail.com> * Perform readRequest’s error steps with stream.stored_error Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Improve ReadableStreamDefaultReader close (#34014) * improve ReadableStreamDefaultReader close Signed-off-by: Taym <haddadi.taym@gmail.com> * remove resolve_closed_promise Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Use Rc<Box<[u8]>> for queue to optimize get_in_memory_bytes Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> * Improve read_a_chunk and stop_reading implemntation (#34077) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Implement ReadableStreamDefaultReader::Constructor (#34056) * Implement ReadableStreamDefaultReader::Constructor Signed-off-by: Taym <haddadi.taym@gmail.com> * make start_reading returns ReadableStreamDefaultReader Signed-off-by: Taym <haddadi.taym@gmail.com> * Fix can_gc Signed-off-by: Taym <haddadi.taym@gmail.com> * Add canGc to ReadableStream::GetReader Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Readablestream fix CanGc (#34080) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * correct ReadableStream::error_native implementation and fix clippy warnings (#34088) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * turn assertion of stream present on controller on a early return with false (#34097) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix already mutably borrowed crash (#34105) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Refactor `get_in_memory_bytes` to return `Option<Vec<u8>> and avoid `unreachable!` panic (#34123) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Set ReadableStream ReadableStreamDefaultReader in start_reading (#34125) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix Unhandled rejection with value: object `TypeError: stream is not locked` (#34204) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix assert!(self.is_readable()) crash in ReadableStream::close (#34207) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix call to to_js_object in underlying source algos (#34098) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * do not assume presence of a stream when performing pull steps (#34244) * do not assume presence of a stream when performing pull steps Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add doc comments Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * gracefully handle failure of underlying source algorithms (#34243) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * ensure result of calling start algo is an object (#34245) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * return js failed error if underlying source constructor threw (#34246) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Use JSVal for ValueWithSize::value (#34259) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix release reader lock, (#34255) fix setting stream on controller in new, fix matching fallthrough, reduce visibility of controller error method Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * in stream cancel, reject promist if locked (#34271) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix UnderlyingSourceContainer::call_start_algorithm (#34277) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * implement controller cancel steps, fix stream cancel method (#34301) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix conditional in perform pull steps (#34324) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * set reader closed promise to one resolved with undefined if stream closed on init (#34321) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix init of stream and controller (#34323) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Stream: Fix reborrow in controller enqueue, and fix error and exception handling. (#34338) * fix re-borrow in controller enqueue Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * do not call to_jsval on JSFailed error in enqueue Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix error and exception handling in controller enqueue Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove TODO about correctness of stored error, since this was done as part of the switch to a js val. Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Stream: Fix incorrect "this" object in underlying source callbacks (#34368) * in controller close, throw type error if stream cannot be closed Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * store original js object for underlying source, for use as this object in callbacks Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix conditional logic in enqueue to ensure pull is called into (#34375) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Stream: Fix bytelength queueing strategy (#34376) * fix handling of value that is not an object in bytelength queuing strategy Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * return type error if strategy size call fails, to prevent panic because no exception is pending Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * set correct default count queuing size strategy (#34389) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * use proto in stream constructor (#34441) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix edge cases in get_desired_size (#34440) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Stream: fix algo and strategy calls error handling. (#34424) * fix error handling in cancel steps Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * in pull steps, reject promise if pull algo throws Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * if start algorithm fails, rethrow the error Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * when the strategy size fails, directly get the pending exception and use it to error the stream Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add error handling to enqueue value with size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * when enqueueing a value errors, ensure we error and stream with the same error used to throw an exception Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix native use of streams (#34468) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Implement readablestreamdefaulttee (#34405) * Implement readablestreamdefaulttee Signed-off-by: Taym <haddadi.taym@gmail.com> * Create UnderlyingSourceType::Tee each stream Signed-off-by: Taym <haddadi.taym@gmail.com> * Use Dom instead of DomRoot Signed-off-by: Taym <haddadi.taym@gmail.com> * Queue a microtask for readRequest chunk steps Signed-off-by: Taym <haddadi.taym@gmail.com> * fix create_readable_stream Signed-off-by: Taym <haddadi.taym@gmail.com> * Remove unnecessary Rc Signed-off-by: Taym <haddadi.taym@gmail.com> * Use correct doc link Signed-off-by: Taym <haddadi.taym@gmail.com> * Add #[allow(crown::unrooted_must_root)] Signed-off-by: Taym <haddadi.taym@gmail.com> * Fix crash in ClosedPromiseRejectionHandler Signed-off-by: Taym <haddadi.taym@gmail.com> * reflect TeeReadRequest and TeeUnderlyingSource Signed-off-by: Taym <haddadi.taym@gmail.com> * fix can_gc Signed-off-by: Taym <haddadi.taym@gmail.com> * reflect tee source, and fix use of mutable dom for tee source and request Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * Fix typo that resolves multiple test failures in 'Tee' tests Signed-off-by: Taym <haddadi.taym@gmail.com> * Fix readable-streams/tee.any.js test Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js (#34531) And fix crate::DomTypeHolder usage * Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js Signed-off-by: Taym <haddadi.taym@gmail.com> * make reader rooted in Constructor and acquire_default_reader Signed-off-by: Taym <haddadi.taym@gmail.com> * Remove spaces Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Streams: fetch stream chunks should be uint8 arrays (#34553) * fetch stream chunks should be uint8 arrays Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix clippy Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update wpt test for ReadableStream reimplementation (#34579) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix ignore_malloc_size_of in readablestream tee (#34578) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Remove incorrect use of handle array, fail test safely by giving only one reason (#34560) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update more wpt test for ReadableStream reimplementation (#34598) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix doc and rename Tee to DefaultTee (#34612) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix: Address review comments Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update response-stream-with-broken-then.any.js.ini test expectation Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix reflect_dom_object can_gc Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix compositeReason for DefaultTeeUnderlyingSource (#34627) * Fix compositeReason for DefaultTeeUnderlyingSource Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update test Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> --------- Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Last fixes stream (#34636) * remove now unsused from_js method of readable stream * fix documenation of error steps * return type error instread of panicking on a todo, when trying to construct a stream of type bytes Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> * fix crown rooting related errors (#34662) Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: Wu Wayne <yuweiwu@pm.me> Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Diffstat (limited to 'components/script/dom/readablestreamdefaultcontroller.rs')
-rw-r--r--components/script/dom/readablestreamdefaultcontroller.rs878
1 files changed, 878 insertions, 0 deletions
diff --git a/components/script/dom/readablestreamdefaultcontroller.rs b/components/script/dom/readablestreamdefaultcontroller.rs
new file mode 100644
index 00000000000..1496031d998
--- /dev/null
+++ b/components/script/dom/readablestreamdefaultcontroller.rs
@@ -0,0 +1,878 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
+use std::ptr;
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::{Heap, JSObject};
+use js::jsval::{JSVal, UndefinedValue};
+use js::rust::wrappers::JS_GetPendingException;
+use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
+use js::typedarray::Uint8;
+
+use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
+use crate::dom::bindings::buffer_source::create_buffer_source;
+use crate::dom::bindings::callback::ExceptionHandling;
+use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
+use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
+use crate::dom::bindings::import::module::{throw_dom_exception, Error, Fallible};
+use crate::dom::bindings::refcounted::Trusted;
+use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector};
+use crate::dom::bindings::root::{DomRoot, MutNullableDom};
+use crate::dom::bindings::trace::RootedTraceableBox;
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
+use crate::dom::readablestream::ReadableStream;
+use crate::dom::readablestreamdefaultreader::ReadRequest;
+use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
+use crate::js::conversions::ToJSValConvertible;
+use crate::realms::{enter_realm, InRealm};
+use crate::script_runtime::{CanGc, JSContext, JSContext as SafeJSContext};
+
+/// The fulfillment handler for
+/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+struct PullAlgorithmFulfillmentHandler {
+ #[ignore_malloc_size_of = "Trusted are hard"]
+ controller: Trusted<ReadableStreamDefaultController>,
+}
+
+impl Callback for PullAlgorithmFulfillmentHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+ /// Upon fulfillment of pullPromise
+ fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
+ let controller = self.controller.root();
+
+ // Set controller.[[pulling]] to false.
+ controller.pulling.set(false);
+
+ // If controller.[[pullAgain]] is true,
+ if controller.pull_again.get() {
+ // Set controller.[[pullAgain]] to false.
+ controller.pull_again.set(false);
+
+ // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
+ controller.call_pull_if_needed(can_gc);
+ }
+ }
+}
+
+/// The rejection handler for
+/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+struct PullAlgorithmRejectionHandler {
+ #[ignore_malloc_size_of = "Trusted are hard"]
+ controller: Trusted<ReadableStreamDefaultController>,
+}
+
+impl Callback for PullAlgorithmRejectionHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+ /// Upon rejection of pullPromise with reason e.
+ fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
+ let controller = self.controller.root();
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, e).
+ controller.error(v);
+ }
+}
+
+/// The fulfillment handler for
+/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+struct StartAlgorithmFulfillmentHandler {
+ #[ignore_malloc_size_of = "Trusted are hard"]
+ controller: Trusted<ReadableStreamDefaultController>,
+}
+
+impl Callback for StartAlgorithmFulfillmentHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ /// Upon fulfillment of startPromise,
+ fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
+ let controller = self.controller.root();
+
+ // Set controller.[[started]] to true.
+ controller.started.set(true);
+
+ // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
+ controller.call_pull_if_needed(can_gc);
+ }
+}
+
+/// The rejection handler for
+/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+struct StartAlgorithmRejectionHandler {
+ #[ignore_malloc_size_of = "Trusted are hard"]
+ controller: Trusted<ReadableStreamDefaultController>,
+}
+
+impl Callback for StartAlgorithmRejectionHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ /// Upon rejection of startPromise with reason r,
+ fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
+ let controller = self.controller.root();
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, r).
+ controller.error(v);
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#value-with-size>
+#[derive(JSTraceable)]
+#[crown::unrooted_must_root_lint::must_root]
+pub struct ValueWithSize {
+ value: Box<Heap<JSVal>>,
+ size: f64,
+}
+
+/// <https://streams.spec.whatwg.org/#value-with-size>
+#[derive(JSTraceable)]
+#[crown::unrooted_must_root_lint::must_root]
+pub enum EnqueuedValue {
+ /// A value enqueued from Rust.
+ Native(Box<[u8]>),
+ /// A Js value.
+ Js(ValueWithSize),
+}
+
+impl EnqueuedValue {
+ fn size(&self) -> f64 {
+ match self {
+ EnqueuedValue::Native(v) => v.len() as f64,
+ EnqueuedValue::Js(v) => v.size,
+ }
+ }
+
+ #[allow(unsafe_code)]
+ fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue) {
+ match self {
+ EnqueuedValue::Native(chunk) => {
+ rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
+ create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut())
+ .expect("failed to create buffer source for native chunk.");
+ unsafe { array_buffer_ptr.to_jsval(*cx, rval) };
+ },
+ EnqueuedValue::Js(value_with_size) => unsafe {
+ value_with_size.value.to_jsval(*cx, rval);
+ },
+ }
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#is-non-negative-number>
+fn is_non_negative_number(value: &EnqueuedValue) -> bool {
+ let value_with_size = match value {
+ EnqueuedValue::Native(_) => return true,
+ EnqueuedValue::Js(value_with_size) => value_with_size,
+ };
+
+ // If v is not a Number, return false.
+ // Checked as part of the WebIDL.
+
+ // If v is NaN, return false.
+ if value_with_size.size.is_nan() {
+ return false;
+ }
+
+ // If v < 0, return false.
+ if value_with_size.size.is_sign_negative() {
+ return false;
+ }
+
+ true
+}
+
+/// <https://streams.spec.whatwg.org/#queue-with-sizes>
+#[derive(Default, JSTraceable, MallocSizeOf)]
+#[crown::unrooted_must_root_lint::must_root]
+pub struct QueueWithSizes {
+ #[ignore_malloc_size_of = "EnqueuedValue::Js"]
+ queue: VecDeque<EnqueuedValue>,
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
+ total_size: f64,
+}
+
+impl QueueWithSizes {
+ /// <https://streams.spec.whatwg.org/#dequeue-value>
+ #[allow(crown::unrooted_must_root)]
+ fn dequeue_value(&mut self) -> EnqueuedValue {
+ let value = self
+ .queue
+ .pop_front()
+ .expect("Buffer cannot be empty when dequeue value is called into.");
+ self.total_size -= value.size();
+ value
+ }
+
+ /// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
+ #[allow(crown::unrooted_must_root)]
+ fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> {
+ // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
+ if !is_non_negative_number(&value) {
+ return Err(Error::Range(
+ "The size of the enqueued chunk is not a non-negative number.".to_string(),
+ ));
+ }
+
+ // If size is +∞, throw a RangeError exception.
+ if value.size().is_infinite() {
+ return Err(Error::Range(
+ "The size of the enqueued chunk is infinite.".to_string(),
+ ));
+ }
+
+ self.total_size += value.size();
+ self.queue.push_back(value);
+
+ Ok(())
+ }
+
+ fn is_empty(&self) -> bool {
+ self.queue.is_empty()
+ }
+
+ /// Only used with native sources.
+ fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
+ self.queue
+ .iter()
+ .try_fold(Vec::new(), |mut acc, value| match value {
+ EnqueuedValue::Native(chunk) => {
+ acc.extend(chunk.iter().copied());
+ Some(acc)
+ },
+ _ => {
+ warn!("get_in_memory_bytes called on a controller with non-native source.");
+ None
+ },
+ })
+ }
+
+ /// <https://streams.spec.whatwg.org/#reset-queue>
+ fn reset(&mut self) {
+ self.queue.clear();
+ self.total_size = Default::default();
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
+#[dom_struct]
+pub struct ReadableStreamDefaultController {
+ reflector_: Reflector,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
+ queue: RefCell<QueueWithSizes>,
+
+ /// A mutable reference to the underlying source is used to implement these two
+ /// internal slots:
+ ///
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
+ underlying_source: MutNullableDom<UnderlyingSourceContainer>,
+
+ stream: MutNullableDom<ReadableStream>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
+ strategy_hwm: f64,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
+ #[ignore_malloc_size_of = "mozjs"]
+ strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
+ close_requested: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
+ started: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
+ pulling: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
+ pull_again: Cell<bool>,
+}
+
+impl ReadableStreamDefaultController {
+ #[allow(crown::unrooted_must_root)]
+ fn new_inherited(
+ global: &GlobalScope,
+ underlying_source_type: UnderlyingSourceType,
+ strategy_hwm: f64,
+ strategy_size: Rc<QueuingStrategySize>,
+ can_gc: CanGc,
+ ) -> ReadableStreamDefaultController {
+ ReadableStreamDefaultController {
+ reflector_: Reflector::new(),
+ queue: RefCell::new(Default::default()),
+ stream: MutNullableDom::new(None),
+ underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
+ global,
+ underlying_source_type,
+ can_gc,
+ ))),
+ strategy_hwm,
+ strategy_size: RefCell::new(Some(strategy_size)),
+ close_requested: Default::default(),
+ started: Default::default(),
+ pulling: Default::default(),
+ pull_again: Default::default(),
+ }
+ }
+
+ #[allow(crown::unrooted_must_root)]
+ pub fn new(
+ global: &GlobalScope,
+ underlying_source: UnderlyingSourceType,
+ strategy_hwm: f64,
+ strategy_size: Rc<QueuingStrategySize>,
+ can_gc: CanGc,
+ ) -> DomRoot<ReadableStreamDefaultController> {
+ reflect_dom_object(
+ Box::new(ReadableStreamDefaultController::new_inherited(
+ global,
+ underlying_source,
+ strategy_hwm,
+ strategy_size,
+ can_gc,
+ )),
+ global,
+ can_gc,
+ )
+ }
+
+ /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ #[allow(unsafe_code)]
+ pub fn setup(&self, stream: DomRoot<ReadableStream>, can_gc: CanGc) -> Result<(), Error> {
+ // Assert: stream.[[controller]] is undefined
+ stream.assert_no_controller();
+
+ // Set controller.[[stream]] to stream.
+ self.stream.set(Some(&stream));
+
+ let global = &*self.global();
+ let rooted_default_controller = DomRoot::from_ref(self);
+
+ // Perform ! ResetQueue(controller).
+ // Set controller.[[started]], controller.[[closeRequested]],
+ // controller.[[pullAgain]], and controller.[[pulling]] to false.
+ // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
+ // and controller.[[strategyHWM]] to highWaterMark.
+ // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
+ // and controller.[[strategyHWM]] to highWaterMark.
+ // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
+
+ // Note: the above steps are done in `new`.
+
+ // Set stream.[[controller]] to controller.
+ stream.set_default_controller(&rooted_default_controller);
+
+ if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
+ // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
+ let start_result = underlying_source
+ .call_start_algorithm(
+ Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
+ can_gc,
+ )
+ .unwrap_or_else(|| {
+ let promise = Promise::new(global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ });
+
+ // Let startPromise be a promise resolved with startResult.
+ let start_promise = start_result?;
+
+ // Upon fulfillment of startPromise,
+ let fulfillment_handler = Box::new(StartAlgorithmFulfillmentHandler {
+ controller: Trusted::new(&*rooted_default_controller),
+ });
+
+ // Upon rejection of startPromise with reason r,
+ let rejection_handler = Box::new(StartAlgorithmRejectionHandler {
+ controller: Trusted::new(&*rooted_default_controller),
+ });
+ let handler = PromiseNativeHandler::new(
+ global,
+ Some(fulfillment_handler),
+ Some(rejection_handler),
+ );
+ let realm = enter_realm(global);
+ let comp = InRealm::Entered(&realm);
+ start_promise.append_native_handler(&handler, comp, can_gc);
+ };
+
+ Ok(())
+ }
+
+ /// Setting the JS object after the heap has settled down.
+ pub fn set_underlying_source_this_object(&self, this_object: HandleObject) {
+ if let Some(underlying_source) = self.underlying_source.get() {
+ underlying_source.set_underlying_source_this_object(this_object);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#dequeue-value>
+ #[allow(crown::unrooted_must_root)]
+ fn dequeue_value(&self) -> EnqueuedValue {
+ let mut queue = self.queue.borrow_mut();
+ queue.dequeue_value()
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
+ fn should_call_pull(&self) -> bool {
+ // Let stream be controller.[[stream]].
+ // Note: the spec does not assert that stream is not undefined here,
+ // so we return false if it is.
+ let Some(stream) = self.stream.get() else {
+ debug!("`should_call_pull` called on a controller without a stream.");
+ return false;
+ };
+
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
+ if !self.can_close_or_enqueue() {
+ return false;
+ }
+
+ // If controller.[[started]] is false, return false.
+ if !self.started.get() {
+ return false;
+ }
+
+ // If ! IsReadableStreamLocked(stream) is true
+ // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
+ if stream.is_locked() && stream.get_num_read_requests() > 0 {
+ return true;
+ }
+
+ // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
+ // Assert: desiredSize is not null.
+ let desired_size = self.get_desired_size().expect("desiredSize is not null.");
+
+ if desired_size > 0. {
+ return true;
+ }
+
+ false
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+ #[allow(unsafe_code)]
+ fn call_pull_if_needed(&self, can_gc: CanGc) {
+ if !self.should_call_pull() {
+ return;
+ }
+
+ // If controller.[[pulling]] is true,
+ if self.pulling.get() {
+ // Set controller.[[pullAgain]] to true.
+ self.pull_again.set(true);
+
+ return;
+ }
+
+ // Set controller.[[pulling]] to true.
+ self.pulling.set(true);
+
+ // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
+ // Continues into the resolve and reject handling of the native handler.
+ let global = self.global();
+ let rooted_default_controller = DomRoot::from_ref(self);
+ let controller =
+ Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
+
+ let Some(underlying_source) = self.underlying_source.get() else {
+ return;
+ };
+
+ let fulfillment_handler = Box::new(PullAlgorithmFulfillmentHandler {
+ controller: Trusted::new(&*rooted_default_controller),
+ });
+ let rejection_handler = Box::new(PullAlgorithmRejectionHandler {
+ controller: Trusted::new(&*rooted_default_controller),
+ });
+ let handler =
+ PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler));
+
+ let realm = enter_realm(&*global);
+ let comp = InRealm::Entered(&realm);
+ let result = underlying_source
+ .call_pull_algorithm(controller, can_gc)
+ .unwrap_or_else(|| {
+ let promise = Promise::new(&global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ });
+ let promise = result.unwrap_or_else(|error| {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ // TODO: check if `self.global()` is the right globalscope.
+ unsafe {
+ error
+ .clone()
+ .to_jsval(*cx, &self.global(), rval.handle_mut())
+ };
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_native(&rval.handle());
+ promise
+ });
+ promise.append_native_handler(&handler, comp, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
+ #[allow(unsafe_code)]
+ pub fn perform_cancel_steps(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
+ // Perform ! ResetQueue(this).
+ self.queue.borrow_mut().reset();
+
+ let underlying_source = self
+ .underlying_source
+ .get()
+ .expect("Controller should have a source when the cancel steps are called into.");
+ let global = self.global();
+
+ // Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
+ let result = underlying_source
+ .call_cancel_algorithm(reason, can_gc)
+ .unwrap_or_else(|| {
+ let promise = Promise::new(&global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ });
+ let promise = result.unwrap_or_else(|error| {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ // TODO: check if `self.global()` is the right globalscope.
+ unsafe {
+ error
+ .clone()
+ .to_jsval(*cx, &self.global(), rval.handle_mut())
+ };
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_native(&rval.handle());
+ promise
+ });
+
+ // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
+ self.clear_algorithms();
+
+ // Return result(the promise).
+ promise
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
+ #[allow(crown::unrooted_must_root)]
+ pub fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
+ // Let stream be this.[[stream]].
+ // Note: the spec does not assert that there is a stream.
+ let Some(stream) = self.stream.get() else {
+ return;
+ };
+
+ // if queue contains bytes, perform chunk steps.
+ if !self.queue.borrow().is_empty() {
+ // Rooting: chunk will be tied to a rooted value
+ // before calling into a function that can GC.
+ let chunk = self.dequeue_value();
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ let result = RootedTraceableBox::new(Heap::default());
+ chunk.to_jsval(cx, rval.handle_mut());
+ result.set(*rval);
+
+ // If this.[[closeRequested]] is true and this.[[queue]] is empty
+ if self.close_requested.get() && self.queue.borrow().is_empty() {
+ // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ // Perform ! ReadableStreamClose(stream).
+ stream.close();
+ } else {
+ // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
+ self.call_pull_if_needed(can_gc);
+ }
+ // Perform readRequest’s chunk steps, given chunk.
+ read_request.chunk_steps(result);
+ } else {
+ // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
+ stream.add_read_request(read_request);
+
+ // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
+ self.call_pull_if_needed(can_gc);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
+ pub fn perform_release_steps(&self) {
+ // step 1 - Return.
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
+ #[allow(unsafe_code)]
+ pub fn enqueue(
+ &self,
+ cx: SafeJSContext,
+ chunk: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> Result<(), Error> {
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
+ if !self.can_close_or_enqueue() {
+ return Ok(());
+ }
+
+ let stream = self
+ .stream
+ .get()
+ .expect("Controller must have a stream when a chunk is enqueued.");
+
+ // If ! IsReadableStreamLocked(stream) is true
+ // and ! ReadableStreamGetNumReadRequests(stream) > 0,
+ // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
+ if stream.is_locked() && stream.get_num_read_requests() > 0 {
+ stream.fulfill_read_request(chunk, false);
+ } else {
+ // Otherwise,
+ // Let result be the result of performing controller.[[strategySizeAlgorithm]],
+ // passing in chunk, and interpreting the result as a completion record.
+ // Note: the clone is necessary to prevent potential re-borrow panics.
+ let strategy_size = {
+ let reference = self.strategy_size.borrow();
+ reference.clone()
+ };
+ let size = if let Some(strategy_size) = strategy_size {
+ // Note: the Rethrow exception handling is necessary,
+ // otherwise returning JSFailed will panic because no exception is pending.
+ let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow);
+ match result {
+ // Let chunkSize be result.[[Value]].
+ Ok(size) => size,
+ Err(error) => {
+ // If result is an abrupt completion,
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
+ self.error(rval.handle());
+
+ // Return result.
+ // Note: we need to return a type error, because no exception is pending.
+ return Err(error);
+ },
+ }
+ } else {
+ 0.
+ };
+
+ {
+ // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
+ let res = {
+ let mut queue = self.queue.borrow_mut();
+ queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
+ value: Heap::boxed(chunk.get()),
+ size,
+ }))
+ };
+ if let Err(error) = res {
+ // If enqueueResult is an abrupt completion,
+
+ // First, throw the exception.
+ // Note: this must be done manually here,
+ // because `enqueue_value_with_size` does not call into JS.
+ throw_dom_exception(cx, &self.global(), error);
+
+ // Then, get a handle to the JS val for the exception,
+ // and use that to error the stream.
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
+ self.error(rval.handle());
+
+ // Return enqueueResult.
+ // Note: because we threw the exception above,
+ // there is a pending exception and we can return JSFailed.
+ return Err(Error::JSFailed);
+ }
+ }
+ }
+
+ // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
+ self.call_pull_if_needed(can_gc);
+
+ Ok(())
+ }
+
+ /// Native call to
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
+ #[allow(crown::unrooted_must_root)]
+ pub fn enqueue_native(&self, chunk: Vec<u8>) {
+ let stream = self
+ .stream
+ .get()
+ .expect("Controller must have a stream when a chunk is enqueued.");
+ if stream.is_locked() && stream.get_num_read_requests() > 0 {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ let enqueued_chunk = EnqueuedValue::Native(chunk.into_boxed_slice());
+ enqueued_chunk.to_jsval(cx, rval.handle_mut());
+ stream.fulfill_read_request(rval.handle(), false);
+ } else {
+ let mut queue = self.queue.borrow_mut();
+ queue
+ .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
+ .expect("Enqueuing a chunk from Rust should not fail.");
+ }
+ }
+
+ /// Does the stream have all data in memory?
+ pub fn in_memory(&self) -> bool {
+ let Some(underlying_source) = self.underlying_source.get() else {
+ return false;
+ };
+ underlying_source.in_memory()
+ }
+
+ /// Return bytes synchronously if the stream has all data in memory.
+ pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
+ let underlying_source = self.underlying_source.get()?;
+ if underlying_source.in_memory() {
+ return self.queue.borrow().get_in_memory_bytes();
+ }
+ None
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
+ fn clear_algorithms(&self) {
+ // Set controller.[[pullAlgorithm]] to undefined.
+ // Set controller.[[cancelAlgorithm]] to undefined.
+ self.underlying_source.set(None);
+
+ // Set controller.[[strategySizeAlgorithm]] to undefined.
+ *self.strategy_size.borrow_mut() = None;
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
+ pub fn close(&self) {
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
+ if !self.can_close_or_enqueue() {
+ return;
+ }
+
+ let Some(stream) = self.stream.get() else {
+ return;
+ };
+
+ // Set controller.[[closeRequested]] to true.
+ self.close_requested.set(true);
+
+ if self.queue.borrow().is_empty() {
+ // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ // Perform ! ReadableStreamClose(stream).
+ stream.close();
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
+ fn get_desired_size(&self) -> Option<f64> {
+ let stream = self.stream.get()?;
+
+ // If state is "errored", return null.
+ if stream.is_errored() {
+ return None;
+ }
+
+ // If state is "closed", return 0.
+ if stream.is_closed() {
+ return Some(0.0);
+ }
+
+ // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
+ let queue = self.queue.borrow();
+ let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX);
+ Some(desired_size.clamp(desired_size, self.strategy_hwm))
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
+ fn can_close_or_enqueue(&self) -> bool {
+ let Some(stream) = self.stream.get() else {
+ return false;
+ };
+
+ // If controller.[[closeRequested]] is false and state is "readable", return true.
+ if !self.close_requested.get() && stream.is_readable() {
+ return true;
+ }
+
+ // Otherwise, return false.
+ false
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
+ pub fn error(&self, e: SafeHandleValue) {
+ let Some(stream) = self.stream.get() else {
+ return;
+ };
+
+ // If stream.[[state]] is not "readable", return.
+ if !stream.is_readable() {
+ return;
+ }
+
+ // Perform ! ResetQueue(controller).
+ self.queue.borrow_mut().reset();
+
+ // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ stream.error(e);
+ }
+}
+
+impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
+ for ReadableStreamDefaultController
+{
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
+ fn GetDesiredSize(&self) -> Option<f64> {
+ self.get_desired_size()
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-close>
+ fn Close(&self) -> Fallible<()> {
+ if !self.can_close_or_enqueue() {
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
+ // throw a TypeError exception.
+ return Err(Error::Type("Stream cannot be closed.".to_string()));
+ }
+
+ // Perform ! ReadableStreamDefaultControllerClose(this).
+ self.close();
+
+ Ok(())
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
+ fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
+ if !self.can_close_or_enqueue() {
+ return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
+ }
+
+ // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
+ self.enqueue(cx, chunk, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-error>
+ fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue) -> Fallible<()> {
+ self.error(e);
+ Ok(())
+ }
+}