diff options
author | Gregory Terzian <2792687+gterzian@users.noreply.github.com> | 2024-12-18 05:14:00 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-17 21:14:00 +0000 |
commit | 379bbb41dde5c46ff39cfc9027d7df49fae733b8 (patch) | |
tree | b8224b9e9d088885fcb3dff405118d5ef932080f /components/script/dom/readablestreamdefaultcontroller.rs | |
parent | 026d3717177def1b77e8790f3f045feea66df872 (diff) | |
download | servo-379bbb41dde5c46ff39cfc9027d7df49fae733b8.tar.gz servo-379bbb41dde5c46ff39cfc9027d7df49fae733b8.zip |
Dom: Re-implement `ReadableStream` Part 1 : Default `Reader` and `Controller` (#34064)
* Re-implement readablestream: basics and default reader and controller
---------
Co-authored-by: Jason Tsai <jason@pews.dev>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Add remaining WebIDLs of ReadableStream (#32605)
* Add Reader's WebIDL files
* Add necessary methods in ReadableStream.webidl
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Create safe wrapper for JSFunctions (#32620)
* Create safe wrapper for JSFunctions
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Add assert to check if the name ends in a null character
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Create macro to wrap unsafe extern "C" function calls
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Remove WRAPPER_FN
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Add macro example documentation
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Use C-string literals
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Ensure name is Cstr type
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Scope #[allow(unsafe_code)]
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
---------
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Start implementation of default controller and reader
Start implementation of default controller and reader
* implement basic internal slots, with todos
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* enum for controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* re-implement native controller methods
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add calling into pull algo
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* more details on chunk enqueuing
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add fulfill read request, clean-up warnings
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* read request and reader typing
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* allow for more than one non-native underlying source type
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add todo for should pull
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add underlying source dom struct container
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove rc around source type
* add default controller init in stream constructor
* setup source container with prototype of source dict
* clean-up docs, dispatch of controller in pull algo call
* turn off SM streams
* remove prototype setting on underlying source container
* fix read request promise resolving
* tidy
* clean-up js conversions in read req handlers
* add queue with sizes concept
* use dom in pull promise handlers
* Demonstrate using dictionary as callback this object.
* move value with size to a struct
* fmt
* put readable stream state in a cell
* nits in expectations
* remove allow unroot by passing read result directly to promise resolving
* tidy
* root default controller inside call_pull_if_needed
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
ReadableStream: implement Cancel and Locked (#33136)
* implement Locked
* implement Cancel and close
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Add GetPromiseIsHandled and SetAnyPromiseIsHandled to Promise
Signed-off-by: Taym <haddadi.taym@gmail.com>
mach fmt
Signed-off-by: Taym <haddadi.taym@gmail.com>
Readablestream default controller: get desired size (#33497)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
stream: implement controller close (#33498)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
implement stream default controller error (#33503)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Readablestream default controller: enqueue (#33528)
* Implement ReadableStreamDefaultControllerMethods::Enqueue
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
* Add spec comments
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
---------
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
readablestream default controller: fulfill read requests (#33542)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Fix extract_size_algorithm (#33561)
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Readablestream default controller: use strategy size (#33551)
* readablestream default controller: use strategy size, fallible enqueue
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
docs
* readablestream default controller: clear strategy size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* prevent potential re-borrow panics when calling into the strategy size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* document readablestream constructor
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Readablestream: impl default controller should pull, start algo (#33586)
* implement should-pull algo for default controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add start algorithm setup for default controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
implement promise native handling for start and pull algorithms (#33603)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Implement ReadableStreamDefaultReader (#33160)
* Implement ReadableStreamDefaultReader
Make the stream mutable
readable-stream-reader-generic-release
Proper error types when releasing
Closed
Cancel
Signed-off-by: Taym <haddadi.taym@gmail.com>
* follow the spec more closely
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Implement ReadableStreamDefaultReader read (#34007)
* Implement ReadableStreamDefaultReader read
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Perform readRequest’s error steps with stream.stored_error
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Improve ReadableStreamDefaultReader close (#34014)
* improve ReadableStreamDefaultReader close
Signed-off-by: Taym <haddadi.taym@gmail.com>
* remove resolve_closed_promise
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Use Rc<Box<[u8]>> for queue to optimize get_in_memory_bytes
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Improve read_a_chunk and stop_reading implemntation (#34077)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Implement ReadableStreamDefaultReader::Constructor (#34056)
* Implement ReadableStreamDefaultReader::Constructor
Signed-off-by: Taym <haddadi.taym@gmail.com>
* make start_reading returns ReadableStreamDefaultReader
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Fix can_gc
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Add canGc to ReadableStream::GetReader
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Readablestream fix CanGc (#34080)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* correct ReadableStream::error_native implementation and fix clippy warnings (#34088)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* turn assertion of stream present on controller on a early return with false (#34097)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix already mutably borrowed crash (#34105)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Refactor `get_in_memory_bytes` to return `Option<Vec<u8>> and avoid `unreachable!` panic (#34123)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Set ReadableStream ReadableStreamDefaultReader in start_reading (#34125)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix Unhandled rejection with value: object `TypeError: stream is not locked` (#34204)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix assert!(self.is_readable()) crash in ReadableStream::close (#34207)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix call to to_js_object in underlying source algos (#34098)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* do not assume presence of a stream when performing pull steps (#34244)
* do not assume presence of a stream when performing pull steps
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add doc comments
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* gracefully handle failure of underlying source algorithms (#34243)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* ensure result of calling start algo is an object (#34245)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* return js failed error if underlying source constructor threw (#34246)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Use JSVal for ValueWithSize::value (#34259)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix release reader lock, (#34255)
fix setting stream on controller in new,
fix matching fallthrough,
reduce visibility of controller error method
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* in stream cancel, reject promist if locked (#34271)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix UnderlyingSourceContainer::call_start_algorithm (#34277)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* implement controller cancel steps, fix stream cancel method (#34301)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix conditional in perform pull steps (#34324)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* set reader closed promise to one resolved with undefined if stream closed on init (#34321)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix init of stream and controller (#34323)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Stream: Fix reborrow in controller enqueue, and fix error and exception handling. (#34338)
* fix re-borrow in controller enqueue
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* do not call to_jsval on JSFailed error in enqueue
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix error and exception handling in controller enqueue
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove TODO about correctness of stored error, since this was done as part of the switch to a js val.
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Stream: Fix incorrect "this" object in underlying source callbacks (#34368)
* in controller close, throw type error if stream cannot be closed
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* store original js object for underlying source, for use as this object in callbacks
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix conditional logic in enqueue to ensure pull is called into (#34375)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Stream: Fix bytelength queueing strategy (#34376)
* fix handling of value that is not an object in bytelength queuing strategy
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* return type error if strategy size call fails, to prevent panic because no exception is pending
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* set correct default count queuing size strategy (#34389)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* use proto in stream constructor (#34441)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix edge cases in get_desired_size (#34440)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Stream: fix algo and strategy calls error handling. (#34424)
* fix error handling in cancel steps
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* in pull steps, reject promise if pull algo throws
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* if start algorithm fails, rethrow the error
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* when the strategy size fails, directly get the pending exception and use it to error the stream
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add error handling to enqueue value with size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* when enqueueing a value errors, ensure we error and stream with the same error used to throw an exception
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix native use of streams (#34468)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Implement readablestreamdefaulttee (#34405)
* Implement readablestreamdefaulttee
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Create UnderlyingSourceType::Tee each stream
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Use Dom instead of DomRoot
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Queue a microtask for readRequest chunk steps
Signed-off-by: Taym <haddadi.taym@gmail.com>
* fix create_readable_stream
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Remove unnecessary Rc
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Use correct doc link
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Add #[allow(crown::unrooted_must_root)]
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Fix crash in ClosedPromiseRejectionHandler
Signed-off-by: Taym <haddadi.taym@gmail.com>
* reflect TeeReadRequest and TeeUnderlyingSource
Signed-off-by: Taym <haddadi.taym@gmail.com>
* fix can_gc
Signed-off-by: Taym <haddadi.taym@gmail.com>
* reflect tee source, and fix use of mutable dom for tee source and request
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* Fix typo that resolves multiple test failures in 'Tee' tests
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Fix readable-streams/tee.any.js test
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js (#34531)
And fix crate::DomTypeHolder usage
* Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js
Signed-off-by: Taym <haddadi.taym@gmail.com>
* make reader rooted in Constructor and acquire_default_reader
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Remove spaces
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Streams: fetch stream chunks should be uint8 arrays (#34553)
* fetch stream chunks should be uint8 arrays
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix clippy
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Update wpt test for ReadableStream reimplementation (#34579)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix ignore_malloc_size_of in readablestream tee (#34578)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Remove incorrect use of handle array, fail test safely by giving only one reason (#34560)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Update more wpt test for ReadableStream reimplementation (#34598)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix doc and rename Tee to DefaultTee (#34612)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix: Address review comments
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Update response-stream-with-broken-then.any.js.ini test expectation
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix reflect_dom_object can_gc
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix compositeReason for DefaultTeeUnderlyingSource (#34627)
* Fix compositeReason for DefaultTeeUnderlyingSource
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Update test
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Last fixes stream (#34636)
* remove now unsused from_js method of readable stream
* fix documenation of error steps
* return type error instread of panicking on a todo, when trying to construct a stream of type bytes
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
* fix crown rooting related errors (#34662)
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Wu Wayne <yuweiwu@pm.me>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Diffstat (limited to 'components/script/dom/readablestreamdefaultcontroller.rs')
-rw-r--r-- | components/script/dom/readablestreamdefaultcontroller.rs | 878 |
1 files changed, 878 insertions, 0 deletions
diff --git a/components/script/dom/readablestreamdefaultcontroller.rs b/components/script/dom/readablestreamdefaultcontroller.rs new file mode 100644 index 00000000000..1496031d998 --- /dev/null +++ b/components/script/dom/readablestreamdefaultcontroller.rs @@ -0,0 +1,878 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::ptr; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::{Heap, JSObject}; +use js::jsval::{JSVal, UndefinedValue}; +use js::rust::wrappers::JS_GetPendingException; +use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue}; +use js::typedarray::Uint8; + +use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize; +use crate::dom::bindings::buffer_source::create_buffer_source; +use crate::dom::bindings::callback::ExceptionHandling; +use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods; +use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller; +use crate::dom::bindings::import::module::{throw_dom_exception, Error, Fallible}; +use crate::dom::bindings::refcounted::Trusted; +use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector}; +use crate::dom::bindings::root::{DomRoot, MutNullableDom}; +use crate::dom::bindings::trace::RootedTraceableBox; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; +use crate::dom::readablestream::ReadableStream; +use crate::dom::readablestreamdefaultreader::ReadRequest; +use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType}; +use crate::js::conversions::ToJSValConvertible; +use crate::realms::{enter_realm, InRealm}; +use crate::script_runtime::{CanGc, JSContext, JSContext as SafeJSContext}; + +/// The fulfillment handler for +/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +struct PullAlgorithmFulfillmentHandler { + #[ignore_malloc_size_of = "Trusted are hard"] + controller: Trusted<ReadableStreamDefaultController>, +} + +impl Callback for PullAlgorithmFulfillmentHandler { + /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> + /// Upon fulfillment of pullPromise + fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) { + let controller = self.controller.root(); + + // Set controller.[[pulling]] to false. + controller.pulling.set(false); + + // If controller.[[pullAgain]] is true, + if controller.pull_again.get() { + // Set controller.[[pullAgain]] to false. + controller.pull_again.set(false); + + // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller). + controller.call_pull_if_needed(can_gc); + } + } +} + +/// The rejection handler for +/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +struct PullAlgorithmRejectionHandler { + #[ignore_malloc_size_of = "Trusted are hard"] + controller: Trusted<ReadableStreamDefaultController>, +} + +impl Callback for PullAlgorithmRejectionHandler { + /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> + /// Upon rejection of pullPromise with reason e. + fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) { + let controller = self.controller.root(); + + // Perform ! ReadableStreamDefaultControllerError(controller, e). + controller.error(v); + } +} + +/// The fulfillment handler for +/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +struct StartAlgorithmFulfillmentHandler { + #[ignore_malloc_size_of = "Trusted are hard"] + controller: Trusted<ReadableStreamDefaultController>, +} + +impl Callback for StartAlgorithmFulfillmentHandler { + /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + /// Upon fulfillment of startPromise, + fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) { + let controller = self.controller.root(); + + // Set controller.[[started]] to true. + controller.started.set(true); + + // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller). + controller.call_pull_if_needed(can_gc); + } +} + +/// The rejection handler for +/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +struct StartAlgorithmRejectionHandler { + #[ignore_malloc_size_of = "Trusted are hard"] + controller: Trusted<ReadableStreamDefaultController>, +} + +impl Callback for StartAlgorithmRejectionHandler { + /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + /// Upon rejection of startPromise with reason r, + fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) { + let controller = self.controller.root(); + + // Perform ! ReadableStreamDefaultControllerError(controller, r). + controller.error(v); + } +} + +/// <https://streams.spec.whatwg.org/#value-with-size> +#[derive(JSTraceable)] +#[crown::unrooted_must_root_lint::must_root] +pub struct ValueWithSize { + value: Box<Heap<JSVal>>, + size: f64, +} + +/// <https://streams.spec.whatwg.org/#value-with-size> +#[derive(JSTraceable)] +#[crown::unrooted_must_root_lint::must_root] +pub enum EnqueuedValue { + /// A value enqueued from Rust. + Native(Box<[u8]>), + /// A Js value. + Js(ValueWithSize), +} + +impl EnqueuedValue { + fn size(&self) -> f64 { + match self { + EnqueuedValue::Native(v) => v.len() as f64, + EnqueuedValue::Js(v) => v.size, + } + } + + #[allow(unsafe_code)] + fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue) { + match self { + EnqueuedValue::Native(chunk) => { + rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>()); + create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut()) + .expect("failed to create buffer source for native chunk."); + unsafe { array_buffer_ptr.to_jsval(*cx, rval) }; + }, + EnqueuedValue::Js(value_with_size) => unsafe { + value_with_size.value.to_jsval(*cx, rval); + }, + } + } +} + +/// <https://streams.spec.whatwg.org/#is-non-negative-number> +fn is_non_negative_number(value: &EnqueuedValue) -> bool { + let value_with_size = match value { + EnqueuedValue::Native(_) => return true, + EnqueuedValue::Js(value_with_size) => value_with_size, + }; + + // If v is not a Number, return false. + // Checked as part of the WebIDL. + + // If v is NaN, return false. + if value_with_size.size.is_nan() { + return false; + } + + // If v < 0, return false. + if value_with_size.size.is_sign_negative() { + return false; + } + + true +} + +/// <https://streams.spec.whatwg.org/#queue-with-sizes> +#[derive(Default, JSTraceable, MallocSizeOf)] +#[crown::unrooted_must_root_lint::must_root] +pub struct QueueWithSizes { + #[ignore_malloc_size_of = "EnqueuedValue::Js"] + queue: VecDeque<EnqueuedValue>, + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize> + total_size: f64, +} + +impl QueueWithSizes { + /// <https://streams.spec.whatwg.org/#dequeue-value> + #[allow(crown::unrooted_must_root)] + fn dequeue_value(&mut self) -> EnqueuedValue { + let value = self + .queue + .pop_front() + .expect("Buffer cannot be empty when dequeue value is called into."); + self.total_size -= value.size(); + value + } + + /// <https://streams.spec.whatwg.org/#enqueue-value-with-size> + #[allow(crown::unrooted_must_root)] + fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> { + // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception. + if !is_non_negative_number(&value) { + return Err(Error::Range( + "The size of the enqueued chunk is not a non-negative number.".to_string(), + )); + } + + // If size is +∞, throw a RangeError exception. + if value.size().is_infinite() { + return Err(Error::Range( + "The size of the enqueued chunk is infinite.".to_string(), + )); + } + + self.total_size += value.size(); + self.queue.push_back(value); + + Ok(()) + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + /// Only used with native sources. + fn get_in_memory_bytes(&self) -> Option<Vec<u8>> { + self.queue + .iter() + .try_fold(Vec::new(), |mut acc, value| match value { + EnqueuedValue::Native(chunk) => { + acc.extend(chunk.iter().copied()); + Some(acc) + }, + _ => { + warn!("get_in_memory_bytes called on a controller with non-native source."); + None + }, + }) + } + + /// <https://streams.spec.whatwg.org/#reset-queue> + fn reset(&mut self) { + self.queue.clear(); + self.total_size = Default::default(); + } +} + +/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller> +#[dom_struct] +pub struct ReadableStreamDefaultController { + reflector_: Reflector, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue> + queue: RefCell<QueueWithSizes>, + + /// A mutable reference to the underlying source is used to implement these two + /// internal slots: + /// + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm> + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm> + underlying_source: MutNullableDom<UnderlyingSourceContainer>, + + stream: MutNullableDom<ReadableStream>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm> + strategy_hwm: f64, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm> + #[ignore_malloc_size_of = "mozjs"] + strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested> + close_requested: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started> + started: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling> + pulling: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain> + pull_again: Cell<bool>, +} + +impl ReadableStreamDefaultController { + #[allow(crown::unrooted_must_root)] + fn new_inherited( + global: &GlobalScope, + underlying_source_type: UnderlyingSourceType, + strategy_hwm: f64, + strategy_size: Rc<QueuingStrategySize>, + can_gc: CanGc, + ) -> ReadableStreamDefaultController { + ReadableStreamDefaultController { + reflector_: Reflector::new(), + queue: RefCell::new(Default::default()), + stream: MutNullableDom::new(None), + underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new( + global, + underlying_source_type, + can_gc, + ))), + strategy_hwm, + strategy_size: RefCell::new(Some(strategy_size)), + close_requested: Default::default(), + started: Default::default(), + pulling: Default::default(), + pull_again: Default::default(), + } + } + + #[allow(crown::unrooted_must_root)] + pub fn new( + global: &GlobalScope, + underlying_source: UnderlyingSourceType, + strategy_hwm: f64, + strategy_size: Rc<QueuingStrategySize>, + can_gc: CanGc, + ) -> DomRoot<ReadableStreamDefaultController> { + reflect_dom_object( + Box::new(ReadableStreamDefaultController::new_inherited( + global, + underlying_source, + strategy_hwm, + strategy_size, + can_gc, + )), + global, + can_gc, + ) + } + + /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + #[allow(unsafe_code)] + pub fn setup(&self, stream: DomRoot<ReadableStream>, can_gc: CanGc) -> Result<(), Error> { + // Assert: stream.[[controller]] is undefined + stream.assert_no_controller(); + + // Set controller.[[stream]] to stream. + self.stream.set(Some(&stream)); + + let global = &*self.global(); + let rooted_default_controller = DomRoot::from_ref(self); + + // Perform ! ResetQueue(controller). + // Set controller.[[started]], controller.[[closeRequested]], + // controller.[[pullAgain]], and controller.[[pulling]] to false. + // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm + // and controller.[[strategyHWM]] to highWaterMark. + // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm + // and controller.[[strategyHWM]] to highWaterMark. + // Set controller.[[cancelAlgorithm]] to cancelAlgorithm. + + // Note: the above steps are done in `new`. + + // Set stream.[[controller]] to controller. + stream.set_default_controller(&rooted_default_controller); + + if let Some(underlying_source) = rooted_default_controller.underlying_source.get() { + // Let startResult be the result of performing startAlgorithm. (This might throw an exception.) + let start_result = underlying_source + .call_start_algorithm( + Controller::ReadableStreamDefaultController(rooted_default_controller.clone()), + can_gc, + ) + .unwrap_or_else(|| { + let promise = Promise::new(global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }); + + // Let startPromise be a promise resolved with startResult. + let start_promise = start_result?; + + // Upon fulfillment of startPromise, + let fulfillment_handler = Box::new(StartAlgorithmFulfillmentHandler { + controller: Trusted::new(&*rooted_default_controller), + }); + + // Upon rejection of startPromise with reason r, + let rejection_handler = Box::new(StartAlgorithmRejectionHandler { + controller: Trusted::new(&*rooted_default_controller), + }); + let handler = PromiseNativeHandler::new( + global, + Some(fulfillment_handler), + Some(rejection_handler), + ); + let realm = enter_realm(global); + let comp = InRealm::Entered(&realm); + start_promise.append_native_handler(&handler, comp, can_gc); + }; + + Ok(()) + } + + /// Setting the JS object after the heap has settled down. + pub fn set_underlying_source_this_object(&self, this_object: HandleObject) { + if let Some(underlying_source) = self.underlying_source.get() { + underlying_source.set_underlying_source_this_object(this_object); + } + } + + /// <https://streams.spec.whatwg.org/#dequeue-value> + #[allow(crown::unrooted_must_root)] + fn dequeue_value(&self) -> EnqueuedValue { + let mut queue = self.queue.borrow_mut(); + queue.dequeue_value() + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull> + fn should_call_pull(&self) -> bool { + // Let stream be controller.[[stream]]. + // Note: the spec does not assert that stream is not undefined here, + // so we return false if it is. + let Some(stream) = self.stream.get() else { + debug!("`should_call_pull` called on a controller without a stream."); + return false; + }; + + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return. + if !self.can_close_or_enqueue() { + return false; + } + + // If controller.[[started]] is false, return false. + if !self.started.get() { + return false; + } + + // If ! IsReadableStreamLocked(stream) is true + // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true. + if stream.is_locked() && stream.get_num_read_requests() > 0 { + return true; + } + + // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller). + // Assert: desiredSize is not null. + let desired_size = self.get_desired_size().expect("desiredSize is not null."); + + if desired_size > 0. { + return true; + } + + false + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> + #[allow(unsafe_code)] + fn call_pull_if_needed(&self, can_gc: CanGc) { + if !self.should_call_pull() { + return; + } + + // If controller.[[pulling]] is true, + if self.pulling.get() { + // Set controller.[[pullAgain]] to true. + self.pull_again.set(true); + + return; + } + + // Set controller.[[pulling]] to true. + self.pulling.set(true); + + // Let pullPromise be the result of performing controller.[[pullAlgorithm]]. + // Continues into the resolve and reject handling of the native handler. + let global = self.global(); + let rooted_default_controller = DomRoot::from_ref(self); + let controller = + Controller::ReadableStreamDefaultController(rooted_default_controller.clone()); + + let Some(underlying_source) = self.underlying_source.get() else { + return; + }; + + let fulfillment_handler = Box::new(PullAlgorithmFulfillmentHandler { + controller: Trusted::new(&*rooted_default_controller), + }); + let rejection_handler = Box::new(PullAlgorithmRejectionHandler { + controller: Trusted::new(&*rooted_default_controller), + }); + let handler = + PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler)); + + let realm = enter_realm(&*global); + let comp = InRealm::Entered(&realm); + let result = underlying_source + .call_pull_algorithm(controller, can_gc) + .unwrap_or_else(|| { + let promise = Promise::new(&global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }); + let promise = result.unwrap_or_else(|error| { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + // TODO: check if `self.global()` is the right globalscope. + unsafe { + error + .clone() + .to_jsval(*cx, &self.global(), rval.handle_mut()) + }; + let promise = Promise::new(&global, can_gc); + promise.reject_native(&rval.handle()); + promise + }); + promise.append_native_handler(&handler, comp, can_gc); + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel> + #[allow(unsafe_code)] + pub fn perform_cancel_steps(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> { + // Perform ! ResetQueue(this). + self.queue.borrow_mut().reset(); + + let underlying_source = self + .underlying_source + .get() + .expect("Controller should have a source when the cancel steps are called into."); + let global = self.global(); + + // Let result be the result of performing this.[[cancelAlgorithm]], passing reason. + let result = underlying_source + .call_cancel_algorithm(reason, can_gc) + .unwrap_or_else(|| { + let promise = Promise::new(&global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }); + let promise = result.unwrap_or_else(|error| { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + // TODO: check if `self.global()` is the right globalscope. + unsafe { + error + .clone() + .to_jsval(*cx, &self.global(), rval.handle_mut()) + }; + let promise = Promise::new(&global, can_gc); + promise.reject_native(&rval.handle()); + promise + }); + + // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this). + self.clear_algorithms(); + + // Return result(the promise). + promise + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull> + #[allow(crown::unrooted_must_root)] + pub fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) { + // Let stream be this.[[stream]]. + // Note: the spec does not assert that there is a stream. + let Some(stream) = self.stream.get() else { + return; + }; + + // if queue contains bytes, perform chunk steps. + if !self.queue.borrow().is_empty() { + // Rooting: chunk will be tied to a rooted value + // before calling into a function that can GC. + let chunk = self.dequeue_value(); + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + let result = RootedTraceableBox::new(Heap::default()); + chunk.to_jsval(cx, rval.handle_mut()); + result.set(*rval); + + // If this.[[closeRequested]] is true and this.[[queue]] is empty + if self.close_requested.get() && self.queue.borrow().is_empty() { + // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + // Perform ! ReadableStreamClose(stream). + stream.close(); + } else { + // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this). + self.call_pull_if_needed(can_gc); + } + // Perform readRequest’s chunk steps, given chunk. + read_request.chunk_steps(result); + } else { + // Perform ! ReadableStreamAddReadRequest(stream, readRequest). + stream.add_read_request(read_request); + + // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this). + self.call_pull_if_needed(can_gc); + } + } + + /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps> + pub fn perform_release_steps(&self) { + // step 1 - Return. + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue> + #[allow(unsafe_code)] + pub fn enqueue( + &self, + cx: SafeJSContext, + chunk: SafeHandleValue, + can_gc: CanGc, + ) -> Result<(), Error> { + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return. + if !self.can_close_or_enqueue() { + return Ok(()); + } + + let stream = self + .stream + .get() + .expect("Controller must have a stream when a chunk is enqueued."); + + // If ! IsReadableStreamLocked(stream) is true + // and ! ReadableStreamGetNumReadRequests(stream) > 0, + // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false). + if stream.is_locked() && stream.get_num_read_requests() > 0 { + stream.fulfill_read_request(chunk, false); + } else { + // Otherwise, + // Let result be the result of performing controller.[[strategySizeAlgorithm]], + // passing in chunk, and interpreting the result as a completion record. + // Note: the clone is necessary to prevent potential re-borrow panics. + let strategy_size = { + let reference = self.strategy_size.borrow(); + reference.clone() + }; + let size = if let Some(strategy_size) = strategy_size { + // Note: the Rethrow exception handling is necessary, + // otherwise returning JSFailed will panic because no exception is pending. + let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow); + match result { + // Let chunkSize be result.[[Value]]. + Ok(size) => size, + Err(error) => { + // If result is an abrupt completion, + rooted!(in(*cx) let mut rval = UndefinedValue()); + unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) }; + + // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]). + self.error(rval.handle()); + + // Return result. + // Note: we need to return a type error, because no exception is pending. + return Err(error); + }, + } + } else { + 0. + }; + + { + // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize). + let res = { + let mut queue = self.queue.borrow_mut(); + queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize { + value: Heap::boxed(chunk.get()), + size, + })) + }; + if let Err(error) = res { + // If enqueueResult is an abrupt completion, + + // First, throw the exception. + // Note: this must be done manually here, + // because `enqueue_value_with_size` does not call into JS. + throw_dom_exception(cx, &self.global(), error); + + // Then, get a handle to the JS val for the exception, + // and use that to error the stream. + rooted!(in(*cx) let mut rval = UndefinedValue()); + unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) }; + + // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]). + self.error(rval.handle()); + + // Return enqueueResult. + // Note: because we threw the exception above, + // there is a pending exception and we can return JSFailed. + return Err(Error::JSFailed); + } + } + } + + // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller). + self.call_pull_if_needed(can_gc); + + Ok(()) + } + + /// Native call to + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue> + #[allow(crown::unrooted_must_root)] + pub fn enqueue_native(&self, chunk: Vec<u8>) { + let stream = self + .stream + .get() + .expect("Controller must have a stream when a chunk is enqueued."); + if stream.is_locked() && stream.get_num_read_requests() > 0 { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + let enqueued_chunk = EnqueuedValue::Native(chunk.into_boxed_slice()); + enqueued_chunk.to_jsval(cx, rval.handle_mut()); + stream.fulfill_read_request(rval.handle(), false); + } else { + let mut queue = self.queue.borrow_mut(); + queue + .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice())) + .expect("Enqueuing a chunk from Rust should not fail."); + } + } + + /// Does the stream have all data in memory? + pub fn in_memory(&self) -> bool { + let Some(underlying_source) = self.underlying_source.get() else { + return false; + }; + underlying_source.in_memory() + } + + /// Return bytes synchronously if the stream has all data in memory. + pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> { + let underlying_source = self.underlying_source.get()?; + if underlying_source.in_memory() { + return self.queue.borrow().get_in_memory_bytes(); + } + None + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms> + fn clear_algorithms(&self) { + // Set controller.[[pullAlgorithm]] to undefined. + // Set controller.[[cancelAlgorithm]] to undefined. + self.underlying_source.set(None); + + // Set controller.[[strategySizeAlgorithm]] to undefined. + *self.strategy_size.borrow_mut() = None; + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close> + pub fn close(&self) { + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return. + if !self.can_close_or_enqueue() { + return; + } + + let Some(stream) = self.stream.get() else { + return; + }; + + // Set controller.[[closeRequested]] to true. + self.close_requested.set(true); + + if self.queue.borrow().is_empty() { + // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + // Perform ! ReadableStreamClose(stream). + stream.close(); + } + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size> + fn get_desired_size(&self) -> Option<f64> { + let stream = self.stream.get()?; + + // If state is "errored", return null. + if stream.is_errored() { + return None; + } + + // If state is "closed", return 0. + if stream.is_closed() { + return Some(0.0); + } + + // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]]. + let queue = self.queue.borrow(); + let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX); + Some(desired_size.clamp(desired_size, self.strategy_hwm)) + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue> + fn can_close_or_enqueue(&self) -> bool { + let Some(stream) = self.stream.get() else { + return false; + }; + + // If controller.[[closeRequested]] is false and state is "readable", return true. + if !self.close_requested.get() && stream.is_readable() { + return true; + } + + // Otherwise, return false. + false + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error> + pub fn error(&self, e: SafeHandleValue) { + let Some(stream) = self.stream.get() else { + return; + }; + + // If stream.[[state]] is not "readable", return. + if !stream.is_readable() { + return; + } + + // Perform ! ResetQueue(controller). + self.queue.borrow_mut().reset(); + + // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + stream.error(e); + } +} + +impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder> + for ReadableStreamDefaultController +{ + /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size> + fn GetDesiredSize(&self) -> Option<f64> { + self.get_desired_size() + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-close> + fn Close(&self) -> Fallible<()> { + if !self.can_close_or_enqueue() { + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, + // throw a TypeError exception. + return Err(Error::Type("Stream cannot be closed.".to_string())); + } + + // Perform ! ReadableStreamDefaultControllerClose(this). + self.close(); + + Ok(()) + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue> + fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> { + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception. + if !self.can_close_or_enqueue() { + return Err(Error::Type("Stream cannot be enqueued to.".to_string())); + } + + // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk). + self.enqueue(cx, chunk, can_gc) + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-error> + fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue) -> Fallible<()> { + self.error(e); + Ok(()) + } +} |