diff options
author | Gregory Terzian <2792687+gterzian@users.noreply.github.com> | 2024-12-18 05:14:00 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-17 21:14:00 +0000 |
commit | 379bbb41dde5c46ff39cfc9027d7df49fae733b8 (patch) | |
tree | b8224b9e9d088885fcb3dff405118d5ef932080f /components | |
parent | 026d3717177def1b77e8790f3f045feea66df872 (diff) | |
download | servo-379bbb41dde5c46ff39cfc9027d7df49fae733b8.tar.gz servo-379bbb41dde5c46ff39cfc9027d7df49fae733b8.zip |
Dom: Re-implement `ReadableStream` Part 1 : Default `Reader` and `Controller` (#34064)
* Re-implement readablestream: basics and default reader and controller
---------
Co-authored-by: Jason Tsai <jason@pews.dev>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Add remaining WebIDLs of ReadableStream (#32605)
* Add Reader's WebIDL files
* Add necessary methods in ReadableStream.webidl
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Create safe wrapper for JSFunctions (#32620)
* Create safe wrapper for JSFunctions
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Add assert to check if the name ends in a null character
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Create macro to wrap unsafe extern "C" function calls
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Remove WRAPPER_FN
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Add macro example documentation
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Use C-string literals
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Ensure name is Cstr type
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
* Scope #[allow(unsafe_code)]
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
---------
Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Start implementation of default controller and reader
Start implementation of default controller and reader
* implement basic internal slots, with todos
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* enum for controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* re-implement native controller methods
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add calling into pull algo
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* more details on chunk enqueuing
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add fulfill read request, clean-up warnings
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* read request and reader typing
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* allow for more than one non-native underlying source type
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add todo for should pull
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add underlying source dom struct container
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove rc around source type
* add default controller init in stream constructor
* setup source container with prototype of source dict
* clean-up docs, dispatch of controller in pull algo call
* turn off SM streams
* remove prototype setting on underlying source container
* fix read request promise resolving
* tidy
* clean-up js conversions in read req handlers
* add queue with sizes concept
* use dom in pull promise handlers
* Demonstrate using dictionary as callback this object.
* move value with size to a struct
* fmt
* put readable stream state in a cell
* nits in expectations
* remove allow unroot by passing read result directly to promise resolving
* tidy
* root default controller inside call_pull_if_needed
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
ReadableStream: implement Cancel and Locked (#33136)
* implement Locked
* implement Cancel and close
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Add GetPromiseIsHandled and SetAnyPromiseIsHandled to Promise
Signed-off-by: Taym <haddadi.taym@gmail.com>
mach fmt
Signed-off-by: Taym <haddadi.taym@gmail.com>
Readablestream default controller: get desired size (#33497)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
stream: implement controller close (#33498)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
implement stream default controller error (#33503)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Readablestream default controller: enqueue (#33528)
* Implement ReadableStreamDefaultControllerMethods::Enqueue
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
* Add spec comments
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
---------
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
readablestream default controller: fulfill read requests (#33542)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Fix extract_size_algorithm (#33561)
Signed-off-by: Wu Wayne <yuweiwu@pm.me>
Readablestream default controller: use strategy size (#33551)
* readablestream default controller: use strategy size, fallible enqueue
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
docs
* readablestream default controller: clear strategy size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* prevent potential re-borrow panics when calling into the strategy size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* document readablestream constructor
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Readablestream: impl default controller should pull, start algo (#33586)
* implement should-pull algo for default controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add start algorithm setup for default controller
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
implement promise native handling for start and pull algorithms (#33603)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Implement ReadableStreamDefaultReader (#33160)
* Implement ReadableStreamDefaultReader
Make the stream mutable
readable-stream-reader-generic-release
Proper error types when releasing
Closed
Cancel
Signed-off-by: Taym <haddadi.taym@gmail.com>
* follow the spec more closely
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Implement ReadableStreamDefaultReader read (#34007)
* Implement ReadableStreamDefaultReader read
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Perform readRequest’s error steps with stream.stored_error
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Improve ReadableStreamDefaultReader close (#34014)
* improve ReadableStreamDefaultReader close
Signed-off-by: Taym <haddadi.taym@gmail.com>
* remove resolve_closed_promise
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Use Rc<Box<[u8]>> for queue to optimize get_in_memory_bytes
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Improve read_a_chunk and stop_reading implemntation (#34077)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Implement ReadableStreamDefaultReader::Constructor (#34056)
* Implement ReadableStreamDefaultReader::Constructor
Signed-off-by: Taym <haddadi.taym@gmail.com>
* make start_reading returns ReadableStreamDefaultReader
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Fix can_gc
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Add canGc to ReadableStream::GetReader
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Readablestream fix CanGc (#34080)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* correct ReadableStream::error_native implementation and fix clippy warnings (#34088)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* turn assertion of stream present on controller on a early return with false (#34097)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix already mutably borrowed crash (#34105)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Refactor `get_in_memory_bytes` to return `Option<Vec<u8>> and avoid `unreachable!` panic (#34123)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Set ReadableStream ReadableStreamDefaultReader in start_reading (#34125)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix Unhandled rejection with value: object `TypeError: stream is not locked` (#34204)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix assert!(self.is_readable()) crash in ReadableStream::close (#34207)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix call to to_js_object in underlying source algos (#34098)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* do not assume presence of a stream when performing pull steps (#34244)
* do not assume presence of a stream when performing pull steps
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add doc comments
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* gracefully handle failure of underlying source algorithms (#34243)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* ensure result of calling start algo is an object (#34245)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* return js failed error if underlying source constructor threw (#34246)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Use JSVal for ValueWithSize::value (#34259)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix release reader lock, (#34255)
fix setting stream on controller in new,
fix matching fallthrough,
reduce visibility of controller error method
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* in stream cancel, reject promist if locked (#34271)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix UnderlyingSourceContainer::call_start_algorithm (#34277)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* implement controller cancel steps, fix stream cancel method (#34301)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix conditional in perform pull steps (#34324)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* set reader closed promise to one resolved with undefined if stream closed on init (#34321)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix init of stream and controller (#34323)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Stream: Fix reborrow in controller enqueue, and fix error and exception handling. (#34338)
* fix re-borrow in controller enqueue
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* do not call to_jsval on JSFailed error in enqueue
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix error and exception handling in controller enqueue
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* remove TODO about correctness of stored error, since this was done as part of the switch to a js val.
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Stream: Fix incorrect "this" object in underlying source callbacks (#34368)
* in controller close, throw type error if stream cannot be closed
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* store original js object for underlying source, for use as this object in callbacks
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix conditional logic in enqueue to ensure pull is called into (#34375)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Stream: Fix bytelength queueing strategy (#34376)
* fix handling of value that is not an object in bytelength queuing strategy
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* return type error if strategy size call fails, to prevent panic because no exception is pending
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* set correct default count queuing size strategy (#34389)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* use proto in stream constructor (#34441)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix edge cases in get_desired_size (#34440)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Stream: fix algo and strategy calls error handling. (#34424)
* fix error handling in cancel steps
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* in pull steps, reject promise if pull algo throws
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* if start algorithm fails, rethrow the error
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* when the strategy size fails, directly get the pending exception and use it to error the stream
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* add error handling to enqueue value with size
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* when enqueueing a value errors, ensure we error and stream with the same error used to throw an exception
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix native use of streams (#34468)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Implement readablestreamdefaulttee (#34405)
* Implement readablestreamdefaulttee
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Create UnderlyingSourceType::Tee each stream
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Use Dom instead of DomRoot
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Queue a microtask for readRequest chunk steps
Signed-off-by: Taym <haddadi.taym@gmail.com>
* fix create_readable_stream
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Remove unnecessary Rc
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Use correct doc link
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Add #[allow(crown::unrooted_must_root)]
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Fix crash in ClosedPromiseRejectionHandler
Signed-off-by: Taym <haddadi.taym@gmail.com>
* reflect TeeReadRequest and TeeUnderlyingSource
Signed-off-by: Taym <haddadi.taym@gmail.com>
* fix can_gc
Signed-off-by: Taym <haddadi.taym@gmail.com>
* reflect tee source, and fix use of mutable dom for tee source and request
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* Fix typo that resolves multiple test failures in 'Tee' tests
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Fix readable-streams/tee.any.js test
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js (#34531)
And fix crate::DomTypeHolder usage
* Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js
Signed-off-by: Taym <haddadi.taym@gmail.com>
* make reader rooted in Constructor and acquire_default_reader
Signed-off-by: Taym <haddadi.taym@gmail.com>
* Remove spaces
Signed-off-by: Taym <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Streams: fetch stream chunks should be uint8 arrays (#34553)
* fetch stream chunks should be uint8 arrays
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
* fix clippy
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
---------
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Update wpt test for ReadableStream reimplementation (#34579)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix ignore_malloc_size_of in readablestream tee (#34578)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Remove incorrect use of handle array, fail test safely by giving only one reason (#34560)
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Update more wpt test for ReadableStream reimplementation (#34598)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix doc and rename Tee to DefaultTee (#34612)
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix: Address review comments
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Update response-stream-with-broken-then.any.js.ini test expectation
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* fix reflect_dom_object can_gc
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Fix compositeReason for DefaultTeeUnderlyingSource (#34627)
* Fix compositeReason for DefaultTeeUnderlyingSource
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Update test
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
---------
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
* Last fixes stream (#34636)
* remove now unsused from_js method of readable stream
* fix documenation of error steps
* return type error instread of panicking on a todo, when trying to construct a stream of type bytes
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
* fix crown rooting related errors (#34662)
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
---------
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Wu Wayne <yuweiwu@pm.me>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Diffstat (limited to 'components')
36 files changed, 3728 insertions, 412 deletions
diff --git a/components/script/Cargo.toml b/components/script/Cargo.toml index 6a9a5bf9a9c..98aec5908ec 100644 --- a/components/script/Cargo.toml +++ b/components/script/Cargo.toml @@ -72,7 +72,7 @@ image = { workspace = true } indexmap = { workspace = true } ipc-channel = { workspace = true } itertools = { workspace = true } -js = { package = "mozjs", git = "https://github.com/servo/mozjs", features = ["streams", "crown"] } +js = { package = "mozjs", git = "https://github.com/servo/mozjs", features = ["crown"] } jstraceable_derive = { path = "../jstraceable_derive" } keyboard-types = { workspace = true } libc = { workspace = true } diff --git a/components/script/body.rs b/components/script/body.rs index 6d7dea4c0ad..17693550554 100644 --- a/components/script/body.rs +++ b/components/script/body.rs @@ -187,7 +187,8 @@ impl TransmitBodyConnectHandler { // TODO: Step 2, If body is null. // Step 3, get a reader for stream. - rooted_stream.start_reading().expect("Couldn't acquire a reader for the body stream."); + rooted_stream.acquire_default_reader(CanGc::note()) + .expect("Couldn't acquire a reader for the body stream."); // Note: this algorithm continues when the first chunk is requested by `net`. }), @@ -242,7 +243,7 @@ impl TransmitBodyConnectHandler { let global = rooted_stream.global(); // Step 4, the result of reading a chunk from body’s stream with reader. - let promise = rooted_stream.read_a_chunk(); + let promise = rooted_stream.read_a_chunk(CanGc::note()); // Step 5, the parallel steps waiting for and handling the result of the read promise, // are a combination of the promise native handler here, @@ -692,7 +693,7 @@ impl Callback for ConsumeBodyPromiseHandler { let global = stream.global(); // Run the above step again. - let read_promise = stream.read_a_chunk(); + let read_promise = stream.read_a_chunk(can_gc); let promise_handler = Box::new(ConsumeBodyPromiseHandler { result_promise: self.result_promise.clone(), @@ -763,7 +764,7 @@ fn consume_body_with_promise<T: BodyMixin + DomObject>( }; // Step 3. - if stream.start_reading().is_err() { + if stream.acquire_default_reader(can_gc).is_err() { return promise.reject_error(Error::Type( "The response's stream is disturbed or locked".to_string(), )); @@ -774,7 +775,7 @@ fn consume_body_with_promise<T: BodyMixin + DomObject>( // Step 1 of // https://fetch.spec.whatwg.org/#concept-read-all-bytes-from-readablestream - let read_promise = stream.read_a_chunk(); + let read_promise = stream.read_a_chunk(can_gc); let promise_handler = Box::new(ConsumeBodyPromiseHandler { result_promise: promise.clone(), diff --git a/components/script/dom/bindings/callback.rs b/components/script/dom/bindings/callback.rs index 374627c16da..bb4e59ba75c 100644 --- a/components/script/dom/bindings/callback.rs +++ b/components/script/dom/bindings/callback.rs @@ -15,7 +15,7 @@ use js::jsapi::{ }; use js::jsval::{JSVal, ObjectValue, UndefinedValue}; use js::rust::wrappers::{JS_GetProperty, JS_WrapObject}; -use js::rust::{MutableHandleObject, Runtime}; +use js::rust::{HandleObject, MutableHandleObject, Runtime}; use crate::dom::bindings::codegen::Bindings::WindowBinding::Window_Binding::WindowMethods; use crate::dom::bindings::error::{report_pending_exception, Error, Fallible}; @@ -205,9 +205,29 @@ impl CallbackInterface { } } +pub trait ThisReflector { + fn jsobject(&self) -> *mut JSObject; +} + +impl<T: DomObject> ThisReflector for T { + fn jsobject(&self) -> *mut JSObject { + self.reflector().get_jsobject().get() + } +} + +impl<'a> ThisReflector for HandleObject<'a> { + fn jsobject(&self) -> *mut JSObject { + self.get() + } +} + /// Wraps the reflector for `p` into the realm of `cx`. -pub fn wrap_call_this_object<T: DomObject>(cx: JSContext, p: &T, mut rval: MutableHandleObject) { - rval.set(p.reflector().get_jsobject().get()); +pub fn wrap_call_this_object<T: ThisReflector>( + cx: JSContext, + p: &T, + mut rval: MutableHandleObject, +) { + rval.set(p.jsobject()); assert!(!rval.get().is_null()); unsafe { diff --git a/components/script/dom/bindings/codegen/Bindings.conf b/components/script/dom/bindings/codegen/Bindings.conf index f4e2c9e2dfb..d0635ead024 100644 --- a/components/script/dom/bindings/codegen/Bindings.conf +++ b/components/script/dom/bindings/codegen/Bindings.conf @@ -538,6 +538,22 @@ DOMInterfaces = { 'canGc': ['SimulateDeviceConnection', 'DisconnectAllDevices'], }, +'ReadableStream': { + 'canGc': ['GetReader', 'Cancel', 'Tee'], +}, + +"ReadableStreamDefaultController": { + "canGc": ["Enqueue"] +}, + +"ReadableStreamBYOBReader": { + "canGc": ["Read", "Closed", "Cancel"] +}, + +"ReadableStreamDefaultReader": { + "canGc": ["Read", "Cancel"] +}, + } Dictionaries = { diff --git a/components/script/dom/bindings/codegen/CodegenRust.py b/components/script/dom/bindings/codegen/CodegenRust.py index e25ed68d3e8..a0f926b4c50 100644 --- a/components/script/dom/bindings/codegen/CodegenRust.py +++ b/components/script/dom/bindings/codegen/CodegenRust.py @@ -933,6 +933,7 @@ def getJSToNativeConversionInfo(type, descriptorProvider, failureCode=None, """ { use crate::realms::{AlreadyInRealm, InRealm}; + use crate::dom::readablestream::ReadableStream; let in_realm_proof = AlreadyInRealm::assert_for_cx(cx); match ReadableStream::from_js(cx, $${val}.get().to_object(), InRealm::Already(&in_realm_proof)) { Ok(val) => val, @@ -949,7 +950,7 @@ def getJSToNativeConversionInfo(type, descriptorProvider, failureCode=None, templateBody = wrapObjectTemplate(templateBody, "None", isDefinitelyObject, type, failureCode) - declType = CGGeneric("DomRoot<ReadableStream>") + declType = CGGeneric("DomRoot<dom::readablestream::ReadableStream>") return handleOptional(templateBody, declType, handleDefault("None")) @@ -7718,7 +7719,7 @@ class CGCallback(CGClass): f"unsafe {{ self.{method.name}({', '.join(argnamesWithoutThis)}) }}") return [ClassMethod(f'{method.name}_', method.returnType, args, bodyInHeader=True, - templateArgs=["T: DomObject"], + templateArgs=["T: ThisReflector"], body=bodyWithThis, visibility='pub'), ClassMethod(f'{method.name}__', method.returnType, argsWithoutThis, diff --git a/components/script/dom/bindings/codegen/run.py b/components/script/dom/bindings/codegen/run.py index 2dee39814b2..400d022a167 100644 --- a/components/script/dom/bindings/codegen/run.py +++ b/components/script/dom/bindings/codegen/run.py @@ -30,7 +30,7 @@ def main(): from Configuration import Configuration from CodegenRust import CGBindingRoot - parser = WebIDL.Parser(make_dir(os.path.join(out_dir, "cache"))) + parser = WebIDL.Parser(make_dir(os.path.join(out_dir, "cache")), use_builtin_readable_stream=False) webidls = [name for name in os.listdir(webidls_dir) if name.endswith(".webidl")] for webidl in webidls: filename = os.path.join(webidls_dir, webidl) diff --git a/components/script/dom/bindings/function.rs b/components/script/dom/bindings/function.rs new file mode 100644 index 00000000000..86f4903b07d --- /dev/null +++ b/components/script/dom/bindings/function.rs @@ -0,0 +1,50 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/// Defines a macro `native_fn!` to create a JavaScript function from a Rust function pointer. +/// # Example +/// ``` +/// let js_function: Rc<Function> = native_fn!(my_rust_function, c"myFunction", 2, 0); +/// ``` +#[macro_export] +macro_rules! native_fn { + ($call:expr, $name:expr, $nargs:expr, $flags:expr) => {{ + let cx = $crate::dom::types::GlobalScope::get_cx(); + let fun_obj = $crate::native_raw_obj_fn!(cx, $call, $name, $nargs, $flags); + #[allow(unsafe_code)] + unsafe { + Function::new(cx, fun_obj) + } + }}; +} + +/// Defines a macro `native_raw_obj_fn!` to create a raw JavaScript function object. +/// # Example +/// ``` +/// let raw_function_obj: *mut JSObject = native_raw_obj_fn!(cx, my_rust_function, c"myFunction", 2, 0); +/// ``` +#[macro_export] +macro_rules! native_raw_obj_fn { + ($cx:expr, $call:expr, $name:expr, $nargs:expr, $flags:expr) => {{ + #[allow(unsafe_code)] + #[allow(clippy::macro_metavars_in_unsafe)] + unsafe extern "C" fn wrapper(cx: *mut JSContext, argc: u32, vp: *mut JSVal) -> bool { + $call(cx, argc, vp) + } + #[allow(unsafe_code)] + #[allow(clippy::macro_metavars_in_unsafe)] + unsafe { + let name: &std::ffi::CStr = $name; + let raw_fun = $crate::dom::bindings::import::module::jsapi::JS_NewFunction( + *$cx, + Some(wrapper), + $nargs, + $flags, + name.as_ptr() as *const std::ffi::c_char, + ); + assert!(!raw_fun.is_null()); + $crate::dom::bindings::import::module::jsapi::JS_GetFunctionObject(raw_fun) + } + }}; +} diff --git a/components/script/dom/bindings/import.rs b/components/script/dom/bindings/import.rs index 0d9f0d70fdf..c4586b43905 100644 --- a/components/script/dom/bindings/import.rs +++ b/components/script/dom/bindings/import.rs @@ -19,7 +19,7 @@ pub mod base { pub use crate::dom::bindings::callback::{ wrap_call_this_object, CallSetup, CallbackContainer, CallbackFunction, CallbackInterface, - CallbackObject, ExceptionHandling, + CallbackObject, ExceptionHandling, ThisReflector, }; pub use crate::dom::bindings::codegen::Bindings::AudioNodeBinding::{ ChannelCountMode, ChannelCountModeValues, ChannelInterpretation, diff --git a/components/script/dom/bindings/interface.rs b/components/script/dom/bindings/interface.rs index ce2f7e275b6..bf2fc6faa87 100644 --- a/components/script/dom/bindings/interface.rs +++ b/components/script/dom/bindings/interface.rs @@ -146,7 +146,6 @@ pub unsafe fn create_global_object( let mut options = RealmOptions::default(); options.creationOptions_.traceGlobal_ = Some(trace); options.creationOptions_.sharedMemoryAndAtomics_ = false; - options.creationOptions_.streams_ = true; select_compartment(cx, &mut options); let principal = ServoJSPrincipals::new(origin); diff --git a/components/script/dom/bindings/mod.rs b/components/script/dom/bindings/mod.rs index a6666b70d90..633763f1ba6 100644 --- a/components/script/dom/bindings/mod.rs +++ b/components/script/dom/bindings/mod.rs @@ -143,6 +143,7 @@ pub mod conversions; pub mod error; pub mod finalize; pub mod frozenarray; +pub mod function; pub mod guard; pub mod import; pub mod inheritance; diff --git a/components/script/dom/bytelengthqueuingstrategy.rs b/components/script/dom/bytelengthqueuingstrategy.rs new file mode 100644 index 00000000000..55b8e4e8815 --- /dev/null +++ b/components/script/dom/bytelengthqueuingstrategy.rs @@ -0,0 +1,113 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::gc::{HandleValue, MutableHandleValue}; +use js::jsapi::{CallArgs, JSContext}; +use js::jsval::JSVal; +use js::rust::HandleObject; + +use super::bindings::codegen::Bindings::FunctionBinding::Function; +use super::bindings::codegen::Bindings::QueuingStrategyBinding::{ + ByteLengthQueuingStrategyMethods, QueuingStrategyInit, +}; +use super::bindings::import::module::{DomObject, DomRoot, Fallible, Reflector}; +use super::bindings::reflector::reflect_dom_object_with_proto; +use super::types::GlobalScope; +use crate::dom::bindings::import::module::get_dictionary_property; +use crate::native_fn; +use crate::script_runtime::CanGc; + +#[dom_struct] +pub struct ByteLengthQueuingStrategy { + reflector_: Reflector, + high_water_mark: f64, +} + +impl ByteLengthQueuingStrategy { + pub fn new_inherited(init: f64) -> Self { + Self { + reflector_: Reflector::new(), + high_water_mark: init, + } + } + + pub fn new( + global: &GlobalScope, + proto: Option<HandleObject>, + init: f64, + can_gc: CanGc, + ) -> DomRoot<Self> { + reflect_dom_object_with_proto(Box::new(Self::new_inherited(init)), global, proto, can_gc) + } +} + +impl ByteLengthQueuingStrategyMethods<crate::DomTypeHolder> for ByteLengthQueuingStrategy { + /// <https://streams.spec.whatwg.org/#blqs-constructor> + fn Constructor( + global: &GlobalScope, + proto: Option<HandleObject>, + can_gc: CanGc, + init: &QueuingStrategyInit, + ) -> DomRoot<Self> { + Self::new(global, proto, init.highWaterMark, can_gc) + } + /// <https://streams.spec.whatwg.org/#blqs-high-water-mark> + fn HighWaterMark(&self) -> f64 { + self.high_water_mark + } + + /// <https://streams.spec.whatwg.org/#blqs-size> + fn GetSize(&self) -> Fallible<Rc<Function>> { + let global = self.reflector_.global(); + // Return this's relevant global object's byte length queuing strategy + // size function. + if let Some(fun) = global.get_byte_length_queuing_strategy_size() { + return Ok(fun); + } + + // Step 1. Let steps be the following steps, given chunk + // Note: See ByteLengthQueuingStrategySize instead. + + // Step 2. Let F be !CreateBuiltinFunction(steps, 1, "size", « », + // globalObject’s relevant Realm). + let fun = native_fn!(byte_length_queuing_strategy_size, c"size", 1, 0); + // Step 3. Set globalObject’s byte length queuing strategy size function to + // a Function that represents a reference to F, + // with callback context equal to globalObject’s relevant settings object. + global.set_byte_length_queuing_strategy_size(fun.clone()); + Ok(fun) + } +} + +/// <https://streams.spec.whatwg.org/#byte-length-queuing-strategy-size-function> +#[allow(unsafe_code)] +pub unsafe fn byte_length_queuing_strategy_size( + cx: *mut JSContext, + argc: u32, + vp: *mut JSVal, +) -> bool { + let args = CallArgs::from_vp(vp, argc); + + // Step 1.1: Return ? GetV(chunk, "byteLength"). + let val = HandleValue::from_raw(args.get(0)); + + // https://tc39.es/ecma262/multipage/abstract-operations.html#sec-getv + // Let O be ? ToObject(V). + if !val.is_object() { + return false; + } + rooted!(in(cx) let object = val.to_object()); + + // Return ? O.[[Get]](P, V). + get_dictionary_property( + cx, + object.handle(), + "byteLength", + MutableHandleValue::from_raw(args.rval()), + ) + .unwrap_or(false) +} diff --git a/components/script/dom/countqueuingstrategy.rs b/components/script/dom/countqueuingstrategy.rs new file mode 100644 index 00000000000..73552c57b5b --- /dev/null +++ b/components/script/dom/countqueuingstrategy.rs @@ -0,0 +1,127 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::{CallArgs, JSContext}; +use js::jsval::{Int32Value, JSVal}; +use js::rust::HandleObject; + +use super::bindings::codegen::Bindings::FunctionBinding::Function; +use super::bindings::codegen::Bindings::QueuingStrategyBinding::{ + CountQueuingStrategyMethods, QueuingStrategy, QueuingStrategyInit, QueuingStrategySize, +}; +use super::bindings::import::module::{DomObject, DomRoot, Error, Fallible, Reflector}; +use super::bindings::reflector::reflect_dom_object_with_proto; +use super::types::GlobalScope; +use crate::script_runtime::CanGc; +use crate::{native_fn, native_raw_obj_fn}; + +#[dom_struct] +pub struct CountQueuingStrategy { + reflector_: Reflector, + high_water_mark: f64, +} + +impl CountQueuingStrategy { + pub fn new_inherited(init: f64) -> Self { + Self { + reflector_: Reflector::new(), + high_water_mark: init, + } + } + + pub fn new( + global: &GlobalScope, + proto: Option<HandleObject>, + init: f64, + can_gc: CanGc, + ) -> DomRoot<Self> { + reflect_dom_object_with_proto(Box::new(Self::new_inherited(init)), global, proto, can_gc) + } +} + +impl CountQueuingStrategyMethods<crate::DomTypeHolder> for CountQueuingStrategy { + /// <https://streams.spec.whatwg.org/#cqs-constructor> + fn Constructor( + global: &GlobalScope, + proto: Option<HandleObject>, + can_gc: CanGc, + init: &QueuingStrategyInit, + ) -> DomRoot<Self> { + Self::new(global, proto, init.highWaterMark, can_gc) + } + + /// <https://streams.spec.whatwg.org/#cqs-high-water-mark> + fn HighWaterMark(&self) -> f64 { + self.high_water_mark + } + + /// <https://streams.spec.whatwg.org/#cqs-size> + fn GetSize(&self) -> Fallible<Rc<Function>> { + let global = self.reflector_.global(); + // Return this's relevant global object's count queuing strategy + // size function. + if let Some(fun) = global.get_count_queuing_strategy_size() { + return Ok(fun); + } + + // Step 1. Let steps be the following steps, given chunk + // Note: See ByteLengthQueuingStrategySize instead. + + // Step 2. Let F be !CreateBuiltinFunction(steps, 1, "size", « », + // globalObject’s relevant Realm). + let fun = native_fn!(count_queuing_strategy_size, c"size", 0, 0); + // Step 3. Set globalObject’s count queuing strategy size function to + // a Function that represents a reference to F, + // with callback context equal to globalObject’s relevant settings object. + global.set_count_queuing_strategy_size(fun.clone()); + Ok(fun) + } +} + +/// <https://streams.spec.whatwg.org/#count-queuing-strategy-size-function> +#[allow(unsafe_code)] +pub unsafe fn count_queuing_strategy_size(_cx: *mut JSContext, argc: u32, vp: *mut JSVal) -> bool { + let args = CallArgs::from_vp(vp, argc); + // Step 1.1. Return 1. + args.rval().set(Int32Value(1)); + true +} + +/// Extract the high water mark from a QueuingStrategy. +/// If the high water mark is not set, return the default value. +/// +/// <https://streams.spec.whatwg.org/#validate-and-normalize-high-water-mark> +pub fn extract_high_water_mark(strategy: &QueuingStrategy, default_hwm: f64) -> Result<f64, Error> { + if strategy.highWaterMark.is_none() { + return Ok(default_hwm); + } + + let high_water_mark = strategy.highWaterMark.unwrap(); + if high_water_mark.is_nan() || high_water_mark < 0.0 { + return Err(Error::Range( + "High water mark must be a non-negative number.".to_string(), + )); + } + + Ok(high_water_mark) +} + +/// Extract the size algorithm from a QueuingStrategy. +/// If the size algorithm is not set, return a fallback function which always returns 1. +/// +/// <https://streams.spec.whatwg.org/#make-size-algorithm-from-size-function> +pub fn extract_size_algorithm(strategy: &QueuingStrategy) -> Rc<QueuingStrategySize> { + if strategy.size.is_none() { + let cx = GlobalScope::get_cx(); + let fun_obj = native_raw_obj_fn!(cx, count_queuing_strategy_size, c"size", 0, 0); + #[allow(unsafe_code)] + unsafe { + return QueuingStrategySize::new(cx, fun_obj); + }; + } + strategy.size.as_ref().unwrap().clone() +} diff --git a/components/script/dom/defaultteereadrequest.rs b/components/script/dom/defaultteereadrequest.rs new file mode 100644 index 00000000000..dfe5b22d045 --- /dev/null +++ b/components/script/dom/defaultteereadrequest.rs @@ -0,0 +1,238 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::cell::Cell; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::Heap; +use js::jsval::{JSVal, UndefinedValue}; +use js::rust::HandleValue as SafeHandleValue; + +use super::bindings::reflector::reflect_dom_object; +use super::bindings::root::DomRoot; +use super::bindings::structuredclone; +use crate::dom::bindings::reflector::{DomObject, Reflector}; +use crate::dom::bindings::root::Dom; +use crate::dom::bindings::trace::RootedTraceableBox; +use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::readablestream::ReadableStream; +use crate::microtask::Microtask; +use crate::script_runtime::CanGc; + +#[derive(JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +pub struct DefaultTeeReadRequestMicrotask { + #[ignore_malloc_size_of = "mozjs"] + chunk: Box<Heap<JSVal>>, + tee_read_request: Dom<DefaultTeeReadRequest>, +} + +impl DefaultTeeReadRequestMicrotask { + pub fn microtask_chunk_steps(&self, can_gc: CanGc) { + self.tee_read_request.chunk_steps(&self.chunk, can_gc) + } +} + +#[dom_struct] +/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2> +pub struct DefaultTeeReadRequest { + reflector_: Reflector, + stream: Dom<ReadableStream>, + branch_1: Dom<ReadableStream>, + branch_2: Dom<ReadableStream>, + #[ignore_malloc_size_of = "Rc"] + reading: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + read_again: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + canceled_1: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + canceled_2: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + clone_for_branch_2: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + cancel_promise: Rc<Promise>, + tee_underlying_source: Dom<DefaultTeeUnderlyingSource>, +} +impl DefaultTeeReadRequest { + #[allow(clippy::too_many_arguments)] + #[allow(crown::unrooted_must_root)] + pub fn new( + stream: &ReadableStream, + branch_1: &ReadableStream, + branch_2: &ReadableStream, + reading: Rc<Cell<bool>>, + read_again: Rc<Cell<bool>>, + canceled_1: Rc<Cell<bool>>, + canceled_2: Rc<Cell<bool>>, + clone_for_branch_2: Rc<Cell<bool>>, + cancel_promise: Rc<Promise>, + tee_underlying_source: &DefaultTeeUnderlyingSource, + can_gc: CanGc, + ) -> DomRoot<Self> { + reflect_dom_object( + Box::new(DefaultTeeReadRequest { + reflector_: Reflector::new(), + stream: Dom::from_ref(stream), + branch_1: Dom::from_ref(branch_1), + branch_2: Dom::from_ref(branch_2), + reading, + read_again, + canceled_1, + canceled_2, + clone_for_branch_2, + cancel_promise, + tee_underlying_source: Dom::from_ref(tee_underlying_source), + }), + &*stream.global(), + can_gc, + ) + } + /// Call into cancel of the stream, + /// <https://streams.spec.whatwg.org/#readable-stream-cancel> + pub fn stream_cancel(&self, reason: SafeHandleValue, can_gc: CanGc) { + self.stream.cancel(reason, can_gc); + } + /// Enqueue a microtask to perform the chunk steps + /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2> + pub fn enqueue_chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) { + // Queue a microtask to perform the following steps: + let tee_read_request_chunk = DefaultTeeReadRequestMicrotask { + chunk: Heap::boxed(*chunk.handle()), + tee_read_request: Dom::from_ref(self), + }; + let global = self.stream.global(); + let microtask_queue = global.microtask_queue(); + let cx = GlobalScope::get_cx(); + microtask_queue.enqueue( + Microtask::ReadableStreamTeeReadRequest(tee_read_request_chunk), + cx, + ); + } + /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2> + #[allow(unsafe_code)] + #[allow(clippy::borrowed_box)] + pub fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) { + // Set readAgain to false. + self.read_again.set(false); + // Let chunk1 and chunk2 be chunk. + let chunk1 = chunk; + let chunk2 = chunk; + // If canceled_2 is false and cloneForBranch2 is true, + if !self.canceled_2.get() && self.clone_for_branch_2.get() { + let cx = GlobalScope::get_cx(); + // Let cloneResult be StructuredClone(chunk2). + rooted!(in(*cx) let mut clone_result = UndefinedValue()); + let data = structuredclone::write( + cx, + unsafe { SafeHandleValue::from_raw(chunk2.handle()) }, + None, + ) + .unwrap(); + // If cloneResult is an abrupt completion, + if structuredclone::read(&self.stream.global(), data, clone_result.handle_mut()) + .is_err() + { + // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], cloneResult.[[Value]]). + self.readable_stream_default_controller_error( + &self.branch_1, + clone_result.handle(), + ); + + // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], cloneResult.[[Value]]). + self.readable_stream_default_controller_error( + &self.branch_2, + clone_result.handle(), + ); + // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]). + self.stream_cancel(clone_result.handle(), can_gc); + // Return. + return; + } else { + // Otherwise, set chunk2 to cloneResult.[[Value]]. + chunk2.set(*clone_result); + } + } + // If canceled_1 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_1.[[controller]], chunk1). + if !self.canceled_1.get() { + self.readable_stream_default_controller_enqueue( + &self.branch_1, + unsafe { SafeHandleValue::from_raw(chunk1.handle()) }, + can_gc, + ); + } + // If canceled_2 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_2.[[controller]], chunk2). + if !self.canceled_2.get() { + self.readable_stream_default_controller_enqueue( + &self.branch_2, + unsafe { SafeHandleValue::from_raw(chunk2.handle()) }, + can_gc, + ); + } + // Set reading to false. + self.reading.set(false); + // If readAgain is true, perform pullAlgorithm. + if self.read_again.get() { + self.pull_algorithm(can_gc); + } + } + /// <https://streams.spec.whatwg.org/#read-request-close-steps> + pub fn close_steps(&self) { + // Set reading to false. + self.reading.set(false); + // If canceled_1 is false, perform ! ReadableStreamDefaultControllerClose(branch_1.[[controller]]). + if !self.canceled_1.get() { + self.readable_stream_default_controller_close(&self.branch_1); + } + // If canceled_2 is false, perform ! ReadableStreamDefaultControllerClose(branch_2.[[controller]]). + if !self.canceled_2.get() { + self.readable_stream_default_controller_close(&self.branch_2); + } + // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined. + if !self.canceled_1.get() || !self.canceled_2.get() { + self.cancel_promise.resolve_native(&()); + } + } + /// <https://streams.spec.whatwg.org/#read-request-error-steps> + pub fn error_steps(&self) { + // Set reading to false. + self.reading.set(false); + } + /// Call into enqueue of the default controller of a stream, + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue> + fn readable_stream_default_controller_enqueue( + &self, + stream: &ReadableStream, + chunk: SafeHandleValue, + can_gc: CanGc, + ) { + stream + .get_default_controller() + .enqueue(GlobalScope::get_cx(), chunk, can_gc) + .expect("enqueue failed for stream controller in DefaultTeeReadRequest"); + } + + /// Call into close of the default controller of a stream, + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close> + fn readable_stream_default_controller_close(&self, stream: &ReadableStream) { + stream.get_default_controller().close(); + } + + /// Call into error of the default controller of stream, + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error> + fn readable_stream_default_controller_error( + &self, + stream: &ReadableStream, + error: SafeHandleValue, + ) { + stream.get_default_controller().error(error); + } + + pub fn pull_algorithm(&self, can_gc: CanGc) { + self.tee_underlying_source.pull_algorithm(can_gc); + } +} diff --git a/components/script/dom/defaultteeunderlyingsource.rs b/components/script/dom/defaultteeunderlyingsource.rs new file mode 100644 index 00000000000..902f0986eb8 --- /dev/null +++ b/components/script/dom/defaultteeunderlyingsource.rs @@ -0,0 +1,221 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::cell::Cell; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value}; +use js::jsval::{ObjectValue, UndefinedValue}; +use js::rust::HandleValue as SafeHandleValue; + +use super::bindings::root::{DomRoot, MutNullableDom}; +use super::types::{ReadableStream, ReadableStreamDefaultReader}; +use crate::dom::bindings::import::module::Error; +use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector}; +use crate::dom::bindings::root::Dom; +use crate::dom::defaultteereadrequest::DefaultTeeReadRequest; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::readablestreamdefaultreader::ReadRequest; +use crate::script_runtime::CanGc; + +#[derive(JSTraceable, MallocSizeOf)] +pub enum TeeCancelAlgorithm { + Cancel1Algorithm, + Cancel2Algorithm, +} + +#[dom_struct] +/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee> +pub struct DefaultTeeUnderlyingSource { + reflector_: Reflector, + reader: Dom<ReadableStreamDefaultReader>, + stream: Dom<ReadableStream>, + branch_1: MutNullableDom<ReadableStream>, + branch_2: MutNullableDom<ReadableStream>, + #[ignore_malloc_size_of = "Rc"] + reading: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + read_again: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + canceled_1: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + canceled_2: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + clone_for_branch_2: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + #[allow(clippy::redundant_allocation)] + reason_1: Rc<Box<Heap<Value>>>, + #[ignore_malloc_size_of = "Rc"] + #[allow(clippy::redundant_allocation)] + reason_2: Rc<Box<Heap<Value>>>, + #[ignore_malloc_size_of = "Rc"] + cancel_promise: Rc<Promise>, + tee_cancel_algorithm: TeeCancelAlgorithm, +} + +impl DefaultTeeUnderlyingSource { + #[allow(clippy::too_many_arguments)] + #[allow(clippy::redundant_allocation)] + #[allow(crown::unrooted_must_root)] + pub fn new( + reader: &ReadableStreamDefaultReader, + stream: &ReadableStream, + reading: Rc<Cell<bool>>, + read_again: Rc<Cell<bool>>, + canceled_1: Rc<Cell<bool>>, + canceled_2: Rc<Cell<bool>>, + clone_for_branch_2: Rc<Cell<bool>>, + reason_1: Rc<Box<Heap<Value>>>, + reason_2: Rc<Box<Heap<Value>>>, + cancel_promise: Rc<Promise>, + tee_cancel_algorithm: TeeCancelAlgorithm, + can_gc: CanGc, + ) -> DomRoot<DefaultTeeUnderlyingSource> { + reflect_dom_object( + Box::new(DefaultTeeUnderlyingSource { + reflector_: Reflector::new(), + reader: Dom::from_ref(reader), + stream: Dom::from_ref(stream), + branch_1: MutNullableDom::new(None), + branch_2: MutNullableDom::new(None), + reading, + read_again, + canceled_1, + canceled_2, + clone_for_branch_2, + reason_1, + reason_2, + cancel_promise, + tee_cancel_algorithm, + }), + &*stream.global(), + can_gc, + ) + } + + pub fn set_branch_1(&self, stream: &ReadableStream) { + self.branch_1.set(Some(stream)); + } + + pub fn set_branch_2(&self, stream: &ReadableStream) { + self.branch_2.set(Some(stream)); + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee> + /// Let pullAlgorithm be the following steps: + #[allow(crown::unrooted_must_root)] + pub fn pull_algorithm(&self, can_gc: CanGc) -> Option<Result<Rc<Promise>, Error>> { + // If reading is true, + if self.reading.get() { + // Set readAgain to true. + self.read_again.set(true); + // Return a promise resolved with undefined. + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + return Some(Promise::new_resolved( + &self.stream.global(), + cx, + rval.handle(), + )); + } + + // Set reading to true. + self.reading.set(true); + + // Let readRequest be a read request with the following items: + let tee_read_request = DefaultTeeReadRequest::new( + &self.stream, + &self.branch_1.get().expect("Branch 1 should be set."), + &self.branch_2.get().expect("Branch 2 should be set."), + self.reading.clone(), + self.read_again.clone(), + self.canceled_1.clone(), + self.canceled_2.clone(), + self.clone_for_branch_2.clone(), + self.cancel_promise.clone(), + self, + can_gc, + ); + + // Rooting: the tee read request is rooted above. + let read_request = ReadRequest::DefaultTee { + tee_read_request: Dom::from_ref(&tee_read_request), + }; + + // Perform ! ReadableStreamDefaultReaderRead(reader, readRequest). + self.reader.read(&read_request, can_gc); + + // Return a promise resolved with undefined. + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + Some(Promise::new_resolved( + &self.stream.global(), + GlobalScope::get_cx(), + rval.handle(), + )) + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee> + /// Let cancel1Algorithm be the following steps, taking a reason argument + /// and + /// Let cancel2Algorithm be the following steps, taking a reason argument + #[allow(unsafe_code)] + pub fn cancel_algorithm( + &self, + reason: SafeHandleValue, + can_gc: CanGc, + ) -> Option<Result<Rc<Promise>, Error>> { + match self.tee_cancel_algorithm { + TeeCancelAlgorithm::Cancel1Algorithm => { + // Set canceled_1 to true. + self.canceled_1.set(true); + + // Set reason_1 to reason. + self.reason_1.set(reason.get()); + + // If canceled_2 is true, + if self.canceled_2.get() { + self.resolve_cancel_promise(can_gc); + } + // Return cancelPromise. + Some(Ok(self.cancel_promise.clone())) + }, + TeeCancelAlgorithm::Cancel2Algorithm => { + // Set canceled_2 to true. + self.canceled_2.set(true); + + // Set reason_2 to reason. + self.reason_2.set(reason.get()); + + // If canceled_1 is true, + if self.canceled_1.get() { + self.resolve_cancel_promise(can_gc); + } + // Return cancelPromise. + Some(Ok(self.cancel_promise.clone())) + }, + } + } + + #[allow(unsafe_code)] + fn resolve_cancel_promise(&self, can_gc: CanGc) { + // Let compositeReason be ! CreateArrayFromList(« reason_1, reason_2 »). + let cx = GlobalScope::get_cx(); + rooted_vec!(let mut reasons_values); + reasons_values.push(self.reason_1.get()); + reasons_values.push(self.reason_2.get()); + + let reasons_values_array = HandleValueArray::from(&reasons_values); + rooted!(in(*cx) let reasons = unsafe { NewArrayObject(*cx, &reasons_values_array) }); + rooted!(in(*cx) let reasons_value = ObjectValue(reasons.get())); + + // Let cancelResult be ! ReadableStreamCancel(stream, compositeReason). + let cancel_result = self.stream.cancel(reasons_value.handle(), can_gc); + + // Resolve cancelPromise with cancelResult. + self.cancel_promise.resolve_native(&cancel_result); + } +} diff --git a/components/script/dom/globalscope.rs b/components/script/dom/globalscope.rs index 0b4c390cf1d..722bda2d453 100644 --- a/components/script/dom/globalscope.rs +++ b/components/script/dom/globalscope.rs @@ -3,7 +3,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ use std::borrow::Cow; -use std::cell::Cell; +use std::cell::{Cell, OnceCell}; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::ops::Index; @@ -73,6 +73,7 @@ use super::bindings::trace::{HashMapTracedValues, RootedTraceableBox}; use crate::dom::bindings::cell::{DomRefCell, RefMut}; use crate::dom::bindings::codegen::Bindings::BroadcastChannelBinding::BroadcastChannelMethods; use crate::dom::bindings::codegen::Bindings::EventSourceBinding::EventSource_Binding::EventSourceMethods; +use crate::dom::bindings::codegen::Bindings::FunctionBinding::Function; use crate::dom::bindings::codegen::Bindings::ImageBitmapBinding::{ ImageBitmapOptions, ImageBitmapSource, }; @@ -92,6 +93,7 @@ use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; use crate::dom::bindings::settings_stack::{entry_global, incumbent_global, AutoEntryScript}; use crate::dom::bindings::str::DOMString; use crate::dom::bindings::structuredclone; +use crate::dom::bindings::trace::CustomTraceable; use crate::dom::bindings::weakref::{DOMTracker, WeakRef}; use crate::dom::blob::Blob; use crate::dom::broadcastchannel::BroadcastChannel; @@ -114,9 +116,10 @@ use crate::dom::paintworkletglobalscope::PaintWorkletGlobalScope; use crate::dom::performance::Performance; use crate::dom::performanceobserver::VALID_ENTRY_TYPES; use crate::dom::promise::Promise; -use crate::dom::readablestream::{ExternalUnderlyingSource, ReadableStream}; +use crate::dom::readablestream::ReadableStream; use crate::dom::serviceworker::ServiceWorker; use crate::dom::serviceworkerregistration::ServiceWorkerRegistration; +use crate::dom::underlyingsourcecontainer::UnderlyingSourceType; #[cfg(feature = "webgpu")] use crate::dom::webgpu::gpudevice::GPUDevice; #[cfg(feature = "webgpu")] @@ -367,6 +370,20 @@ pub struct GlobalScope { /// Directory to store unminified scripts for this window if unminify-js /// opt is enabled. unminified_js_dir: Option<String>, + + /// The byte length queuing strategy size function that will be initialized once + /// `size` getter of `ByteLengthQueuingStrategy` is called. + /// + /// <https://streams.spec.whatwg.org/#byte-length-queuing-strategy-size-function> + #[ignore_malloc_size_of = "Rc<T> is hard"] + byte_length_queuing_strategy_size_function: OnceCell<Rc<Function>>, + + /// The count queuing strategy size function that will be initialized once + /// `size` getter of `CountQueuingStrategy` is called. + /// + /// <https://streams.spec.whatwg.org/#count-queuing-strategy-size-function> + #[ignore_malloc_size_of = "Rc<T> is hard"] + count_queuing_strategy_size_function: OnceCell<Rc<Function>>, } /// A wrapper for glue-code between the ipc router and the event-loop. @@ -629,10 +646,10 @@ impl MessageListener { } /// Callback used to enqueue file chunks to streams as part of FileListener. -fn stream_handle_incoming(stream: &ReadableStream, bytes: Fallible<Vec<u8>>, can_gc: CanGc) { +fn stream_handle_incoming(stream: &ReadableStream, bytes: Fallible<Vec<u8>>) { match bytes { Ok(b) => { - stream.enqueue_native(b, can_gc); + stream.enqueue_native(b); }, Err(e) => { stream.error_native(e); @@ -642,7 +659,7 @@ fn stream_handle_incoming(stream: &ReadableStream, bytes: Fallible<Vec<u8>>, can /// Callback used to close streams as part of FileListener. fn stream_handle_eof(stream: &ReadableStream) { - stream.close_native(); + stream.controller_close_native(); } impl FileListener { @@ -655,7 +672,7 @@ impl FileListener { let task = task!(enqueue_stream_chunk: move || { let stream = trusted.root(); - stream_handle_incoming(&stream, Ok(blob_buf.bytes), CanGc::note()); + stream_handle_incoming(&stream, Ok(blob_buf.bytes)); }); let _ = self @@ -679,7 +696,7 @@ impl FileListener { let task = task!(enqueue_stream_chunk: move || { let stream = trusted.root(); - stream_handle_incoming(&stream, Ok(bytes_in), CanGc::note()); + stream_handle_incoming(&stream, Ok(bytes_in)); }); let _ = self @@ -745,7 +762,7 @@ impl FileListener { let _ = self.task_source.queue_with_canceller( task!(error_stream: move || { let stream = trusted_stream.root(); - stream_handle_incoming(&stream, error, CanGc::note()); + stream_handle_incoming(&stream, error); }), &self.task_canceller, ); @@ -820,6 +837,8 @@ impl GlobalScope { dynamic_modules: DomRefCell::new(DynamicModuleList::new()), inherited_secure_context, unminified_js_dir: unminify_js.then(|| unminified_path("unminified-js")), + byte_length_queuing_strategy_size_function: OnceCell::new(), + count_queuing_strategy_size_function: OnceCell::new(), } } @@ -2016,7 +2035,8 @@ impl GlobalScope { let stream = ReadableStream::new_with_external_underlying_source( self, - ExternalUnderlyingSource::Blob(size), + UnderlyingSourceType::Blob(size), + can_gc, ); let recv = self.send_msg(file_id); @@ -3447,6 +3467,36 @@ impl GlobalScope { pub fn unminified_js_dir(&self) -> Option<String> { self.unminified_js_dir.clone() } + + pub(crate) fn set_byte_length_queuing_strategy_size(&self, function: Rc<Function>) { + if self + .byte_length_queuing_strategy_size_function + .set(function) + .is_err() + { + warn!("byte length queuing strategy size function is set twice."); + }; + } + + pub(crate) fn get_byte_length_queuing_strategy_size(&self) -> Option<Rc<Function>> { + self.byte_length_queuing_strategy_size_function + .get() + .cloned() + } + + pub(crate) fn set_count_queuing_strategy_size(&self, function: Rc<Function>) { + if self + .count_queuing_strategy_size_function + .set(function) + .is_err() + { + warn!("count queuing strategy size function is set twice."); + }; + } + + pub(crate) fn get_count_queuing_strategy_size(&self) -> Option<Rc<Function>> { + self.count_queuing_strategy_size_function.get().cloned() + } } /// Returns the Rust global scope from a JS global object. diff --git a/components/script/dom/mod.rs b/components/script/dom/mod.rs index 8157728af84..8fe9647d3e9 100644 --- a/components/script/dom/mod.rs +++ b/components/script/dom/mod.rs @@ -243,6 +243,7 @@ pub mod bluetoothremotegattserver; pub mod bluetoothremotegattservice; pub mod bluetoothuuid; pub mod broadcastchannel; +pub mod bytelengthqueuingstrategy; pub mod canvasgradient; pub mod canvaspattern; pub mod canvasrenderingcontext2d; @@ -256,6 +257,7 @@ pub mod comment; pub mod compositionevent; pub mod console; pub mod constantsourcenode; +pub mod countqueuingstrategy; mod create; pub mod crypto; pub mod cryptokey; @@ -283,6 +285,8 @@ pub mod datatransfer; pub mod datatransferitem; pub mod datatransferitemlist; pub mod dedicatedworkerglobalscope; +pub mod defaultteereadrequest; +pub mod defaultteeunderlyingsource; pub mod dissimilaroriginlocation; pub mod dissimilaroriginwindow; pub mod document; @@ -475,7 +479,12 @@ pub mod promiserejectionevent; pub mod radionodelist; pub mod range; pub mod raredata; +pub mod readablebytestreamcontroller; pub mod readablestream; +pub mod readablestreambyobreader; +pub mod readablestreambyobrequest; +pub mod readablestreamdefaultcontroller; +pub mod readablestreamdefaultreader; pub mod request; pub mod resizeobserver; pub mod resizeobserverentry; @@ -540,6 +549,7 @@ pub mod trackevent; pub mod transitionevent; pub mod treewalker; pub mod uievent; +pub mod underlyingsourcecontainer; pub mod url; pub mod urlhelper; pub mod urlsearchparams; diff --git a/components/script/dom/promise.rs b/components/script/dom/promise.rs index 6e1a8fdfb63..b58caf9e825 100644 --- a/components/script/dom/promise.rs +++ b/components/script/dom/promise.rs @@ -24,9 +24,9 @@ use js::jsapi::{ }; use js::jsval::{Int32Value, JSVal, ObjectValue, UndefinedValue}; use js::rust::wrappers::{ - AddPromiseReactions, CallOriginalPromiseReject, CallOriginalPromiseResolve, GetPromiseState, - IsPromiseObject, NewPromiseObject, RejectPromise, ResolvePromise, - SetPromiseUserInputEventHandlingState, + AddPromiseReactions, CallOriginalPromiseReject, CallOriginalPromiseResolve, + GetPromiseIsHandled, GetPromiseState, IsPromiseObject, NewPromiseObject, RejectPromise, + ResolvePromise, SetAnyPromiseIsHandled, SetPromiseUserInputEventHandlingState, }; use js::rust::{HandleObject, HandleValue, MutableHandleObject, Runtime}; @@ -285,6 +285,17 @@ impl Promise { assert!(ok); } } + + #[allow(unsafe_code)] + pub fn get_promise_is_handled(&self) -> bool { + unsafe { GetPromiseIsHandled(self.reflector().get_jsobject()) } + } + + #[allow(unsafe_code)] + pub fn set_promise_is_handled(&self) -> bool { + let cx = GlobalScope::get_cx(); + unsafe { SetAnyPromiseIsHandled(*cx, self.reflector().get_jsobject()) } + } } #[allow(unsafe_code)] diff --git a/components/script/dom/readablebytestreamcontroller.rs b/components/script/dom/readablebytestreamcontroller.rs new file mode 100644 index 00000000000..058fce5ad3c --- /dev/null +++ b/components/script/dom/readablebytestreamcontroller.rs @@ -0,0 +1,64 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use dom_struct::dom_struct; +use js::rust::HandleValue as SafeHandleValue; + +use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods; +use crate::dom::bindings::import::module::{Error, Fallible}; +use crate::dom::bindings::reflector::Reflector; +use crate::dom::bindings::root::{DomRoot, MutNullableDom}; +use crate::dom::readablestream::ReadableStream; +use crate::script_runtime::JSContext as SafeJSContext; + +/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller> +#[dom_struct] +pub struct ReadableByteStreamController { + reflector_: Reflector, + stream: MutNullableDom<ReadableStream>, +} + +impl ReadableByteStreamController { + pub fn set_stream(&self, stream: &ReadableStream) { + self.stream.set(Some(stream)) + } +} + +impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController { + /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request> + fn GetByobRequest( + &self, + ) -> Fallible<Option<DomRoot<super::readablestreambyobrequest::ReadableStreamBYOBRequest>>> + { + // TODO + Err(Error::NotFound) + } + + /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size> + fn GetDesiredSize(&self) -> Option<f64> { + // TODO + None + } + + /// <https://streams.spec.whatwg.org/#rbs-controller-close> + fn Close(&self) -> Fallible<()> { + // TODO + Err(Error::NotFound) + } + + /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue> + fn Enqueue( + &self, + _chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>, + ) -> Fallible<()> { + // TODO + Err(Error::NotFound) + } + + /// <https://streams.spec.whatwg.org/#rbs-controller-error> + fn Error(&self, _cx: SafeJSContext, _e: SafeHandleValue) -> Fallible<()> { + // TODO + Err(Error::NotFound) + } +} diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs index 2c748391d5b..413b3f09743 100644 --- a/components/script/dom/readablestream.rs +++ b/components/script/dom/readablestream.rs @@ -2,103 +2,236 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::cell::{Cell, RefCell}; -use std::os::raw::c_void; +use std::cell::Cell; use std::ptr::{self, NonNull}; use std::rc::Rc; -use std::slice; use dom_struct::dom_struct; -use js::glue::{ - CreateReadableStreamUnderlyingSource, DeleteReadableStreamUnderlyingSource, - ReadableStreamUnderlyingSourceTraps, +use js::conversions::ToJSValConvertible; +use js::jsapi::{Heap, JSObject}; +use js::jsval::{JSVal, ObjectValue, UndefinedValue}; +use js::rust::{ + HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, + MutableHandleValue as SafeMutableHandleValue, }; -use js::jsapi::{ - AutoRequireNoGC, HandleObject, HandleValue, Heap, IsReadableStream, JSContext, JSObject, - JS_GetArrayBufferViewData, NewReadableExternalSourceStreamObject, ReadableStreamClose, - ReadableStreamDefaultReaderRead, ReadableStreamError, ReadableStreamGetReader, - ReadableStreamIsDisturbed, ReadableStreamIsLocked, ReadableStreamIsReadable, - ReadableStreamReaderMode, ReadableStreamReaderReleaseLock, ReadableStreamUnderlyingSource, - ReadableStreamUpdateDataAvailableFromSource, UnwrapReadableStream, -}; -use js::jsval::{JSVal, UndefinedValue}; -use js::rust::{HandleValue as SafeHandleValue, IntoHandle}; +use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy; +use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{ + ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode, +}; +use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods; +use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods; +use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource; use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult}; use crate::dom::bindings::error::Error; -use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector}; -use crate::dom::bindings::root::DomRoot; -use crate::dom::bindings::settings_stack::{AutoEntryScript, AutoIncumbentScript}; +use crate::dom::bindings::import::module::Fallible; +use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader; +use crate::dom::bindings::reflector::{DomObject, Reflector, reflect_dom_object, reflect_dom_object_with_proto}; +use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom}; +use crate::dom::bindings::trace::RootedTraceableBox; use crate::dom::bindings::utils::get_dictionary_property; +use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm}; use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; +use crate::dom::readablebytestreamcontroller::ReadableByteStreamController; +use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader; +use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController; +use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader}; +use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm; +use crate::dom::types::DefaultTeeUnderlyingSource; +use crate::dom::underlyingsourcecontainer::UnderlyingSourceType; use crate::js::conversions::FromJSValConvertible; use crate::realms::{enter_realm, InRealm}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; +use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; + +/// The fulfillment handler for the reacting to sourceCancelPromise part of +/// <https://streams.spec.whatwg.org/#readable-stream-cancel>. +#[derive(Clone, JSTraceable, MallocSizeOf)] +struct SourceCancelPromiseFulfillmentHandler { + #[ignore_malloc_size_of = "Rc are hard"] + result: Rc<Promise>, +} + +impl Callback for SourceCancelPromiseFulfillmentHandler { + /// The fulfillment handler for the reacting to sourceCancelPromise part of + /// <https://streams.spec.whatwg.org/#readable-stream-cancel>. + /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled> + fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) { + self.result.resolve_native(&()); + } +} + +/// The rejection handler for the reacting to sourceCancelPromise part of +/// <https://streams.spec.whatwg.org/#readable-stream-cancel>. +#[derive(Clone, JSTraceable, MallocSizeOf)] +struct SourceCancelPromiseRejectionHandler { + #[ignore_malloc_size_of = "Rc are hard"] + result: Rc<Promise>, +} + +impl Callback for SourceCancelPromiseRejectionHandler { + /// The rejection handler for the reacting to sourceCancelPromise part of + /// <https://streams.spec.whatwg.org/#readable-stream-cancel>. + /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled> + fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) { + self.result.reject_native(&v); + } +} + +/// <https://streams.spec.whatwg.org/#readablestream-state> +#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)] +pub enum ReadableStreamState { + #[default] + Readable, + Closed, + Errored, +} -static UNDERLYING_SOURCE_TRAPS: ReadableStreamUnderlyingSourceTraps = - ReadableStreamUnderlyingSourceTraps { - requestData: Some(request_data), - writeIntoReadRequestBuffer: Some(write_into_read_request_buffer), - cancel: Some(cancel), - onClosed: Some(close), - onErrored: Some(error), - finalize: Some(finalize), - }; +/// <https://streams.spec.whatwg.org/#readablestream-controller> +#[derive(JSTraceable, MallocSizeOf)] +#[crown::unrooted_must_root_lint::must_root] +pub enum ControllerType { + /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller> + Byte(MutNullableDom<ReadableByteStreamController>), + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller> + Default(MutNullableDom<ReadableStreamDefaultController>), +} + +/// <https://streams.spec.whatwg.org/#readablestream-readerr> +#[derive(JSTraceable, MallocSizeOf)] +#[crown::unrooted_must_root_lint::must_root] +pub enum ReaderType { + /// <https://streams.spec.whatwg.org/#readablestreambyobreader> + #[allow(clippy::upper_case_acronyms)] + BYOB(MutNullableDom<ReadableStreamBYOBReader>), + /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader> + Default(MutNullableDom<ReadableStreamDefaultReader>), +} + +/// <https://streams.spec.whatwg.org/#create-readable-stream> +#[allow(crown::unrooted_must_root)] +fn create_readable_stream( + global: &GlobalScope, + underlying_source_type: UnderlyingSourceType, + queuing_strategy: QueuingStrategy, + can_gc: CanGc, +) -> DomRoot<ReadableStream> { + // If highWaterMark was not passed, set it to 1. + let high_water_mark = queuing_strategy.highWaterMark.unwrap_or(1.0); + + // If sizeAlgorithm was not passed, set it to an algorithm that returns 1. + let size_algorithm = queuing_strategy + .size + .unwrap_or(extract_size_algorithm(&QueuingStrategy::empty())); + + // Assert: ! IsNonNegativeNumber(highWaterMark) is true. + assert!(high_water_mark >= 0.0); + + // Let stream be a new ReadableStream. + // Perform ! InitializeReadableStream(stream). + let stream = ReadableStream::new_with_proto( + global, + None, + ControllerType::Default(MutNullableDom::new(None)), + can_gc, + ); + // Let controller be a new ReadableStreamDefaultController. + let controller = ReadableStreamDefaultController::new( + global, + underlying_source_type, + high_water_mark, + size_algorithm, + can_gc, + ); + + // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, + // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm). + controller + .setup(stream.clone(), can_gc) + .expect("Setup of default controller cannot fail"); + + // Return stream. + stream +} + +/// <https://streams.spec.whatwg.org/#rs-class> #[dom_struct] pub struct ReadableStream { reflector_: Reflector, - #[ignore_malloc_size_of = "SM handles JS values"] - js_stream: Heap<*mut JSObject>, - #[ignore_malloc_size_of = "SM handles JS values"] - js_reader: Heap<*mut JSObject>, - has_reader: Cell<bool>, - #[ignore_malloc_size_of = "Rc is hard"] - external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>, + + /// <https://streams.spec.whatwg.org/#readablestream-controller> + /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`, + /// because it is never unset once set. + controller: ControllerType, + + /// <https://streams.spec.whatwg.org/#readablestream-storederror> + #[ignore_malloc_size_of = "mozjs"] + stored_error: Heap<JSVal>, + + /// <https://streams.spec.whatwg.org/#readablestream-disturbed> + disturbed: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#readablestream-reader> + reader: ReaderType, + + /// <https://streams.spec.whatwg.org/#readablestream-state> + state: Cell<ReadableStreamState>, } impl ReadableStream { - fn new_inherited( - external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>, - ) -> ReadableStream { + #[allow(crown::unrooted_must_root)] + /// <https://streams.spec.whatwg.org/#initialize-readable-stream> + fn new_inherited(controller: ControllerType) -> ReadableStream { + let reader = match &controller { + ControllerType::Default(_) => ReaderType::Default(MutNullableDom::new(None)), + ControllerType::Byte(_) => ReaderType::BYOB(MutNullableDom::new(None)), + }; ReadableStream { reflector_: Reflector::new(), - js_stream: Heap::default(), - js_reader: Heap::default(), - has_reader: Default::default(), - external_underlying_source, + controller, + stored_error: Heap::default(), + disturbed: Default::default(), + reader, + state: Cell::new(Default::default()), } } - fn new( + #[allow(crown::unrooted_must_root)] + fn new_with_proto( global: &GlobalScope, - external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>, + proto: Option<SafeHandleObject>, + controller: ControllerType, + can_gc: CanGc, ) -> DomRoot<ReadableStream> { - reflect_dom_object( - Box::new(ReadableStream::new_inherited(external_underlying_source)), + reflect_dom_object_with_proto( + Box::new(ReadableStream::new_inherited(controller)), global, - CanGc::note(), + proto, + can_gc, ) } - /// Used from RustCodegen.py - #[allow(unsafe_code)] - pub unsafe fn from_js( - cx: SafeJSContext, - obj: *mut JSObject, - realm: InRealm, - ) -> Result<DomRoot<ReadableStream>, ()> { - if !IsReadableStream(obj) { - return Err(()); + /// Used as part of + /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + pub fn set_default_controller(&self, controller: &ReadableStreamDefaultController) { + match self.controller { + ControllerType::Default(ref ctrl) => ctrl.set(Some(controller)), + ControllerType::Byte(_) => { + unreachable!("set_default_controller called in setup of default controller.") + }, } + } - let global = GlobalScope::from_safe_context(cx, realm); - - let stream = ReadableStream::new(&global, None); - stream.js_stream.set(UnwrapReadableStream(obj)); - - Ok(stream) + /// Used as part of + /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + pub fn assert_no_controller(&self) { + let has_no_controller = match self.controller { + ControllerType::Default(ref ctrl) => ctrl.get().is_none(), + ControllerType::Byte(ref ctrl) => ctrl.get().is_none(), + }; + assert!(has_no_controller); } /// Build a stream backed by a Rust source that has already been read into memory. @@ -109,426 +242,690 @@ impl ReadableStream { ) -> DomRoot<ReadableStream> { let stream = ReadableStream::new_with_external_underlying_source( global, - ExternalUnderlyingSource::Memory(bytes.len()), + UnderlyingSourceType::Memory(bytes.len()), + can_gc, ); - stream.enqueue_native(bytes, can_gc); - stream.close_native(); + stream.enqueue_native(bytes); + stream.controller_close_native(); stream } /// Build a stream backed by a Rust underlying source. - #[allow(unsafe_code)] + /// Note: external sources are always paired with a default controller. + #[allow(crown::unrooted_must_root)] pub fn new_with_external_underlying_source( global: &GlobalScope, - source: ExternalUnderlyingSource, + source: UnderlyingSourceType, + can_gc: CanGc, ) -> DomRoot<ReadableStream> { - let _ar = enter_realm(global); - let _ais = AutoIncumbentScript::new(global); - let cx = GlobalScope::get_cx(); + assert!(source.is_native()); + let stream = ReadableStream::new_with_proto( + global, + None, + ControllerType::Default(MutNullableDom::new(None)), + can_gc, + ); + let controller = ReadableStreamDefaultController::new( + global, + source, + 1.0, + extract_size_algorithm(&QueuingStrategy::empty()), + can_gc, + ); + controller + .setup(stream.clone(), can_gc) + .expect("Setup of controller with external underlying source cannot fail"); + stream + } - let source = Rc::new(ExternalUnderlyingSourceController::new(source)); + /// Call into the release steps of the controller, + pub fn perform_release_steps(&self) { + match self.controller { + ControllerType::Default(ref controller) => controller + .get() + .expect("Stream should have controller.") + .perform_release_steps(), + ControllerType::Byte(_) => todo!(), + } + } - let stream = ReadableStream::new(global, Some(source.clone())); + /// Call into the pull steps of the controller, + /// as part of + /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read> + pub fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) { + match self.controller { + ControllerType::Default(ref controller) => controller + .get() + .expect("Stream should have controller.") + .perform_pull_steps(read_request, can_gc), + ControllerType::Byte(_) => todo!(), + } + } - unsafe { - let js_wrapper = CreateReadableStreamUnderlyingSource( - &UNDERLYING_SOURCE_TRAPS, - &*source as *const _ as *const c_void, - ); + /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request> + pub fn add_read_request(&self, read_request: &ReadRequest) { + match self.reader { + // Assert: stream.[[reader]] implements ReadableStreamDefaultReader. + ReaderType::Default(ref reader) => { + let Some(reader) = reader.get() else { + panic!("Attempt to add a read request without having first acquired a reader."); + }; - rooted!(in(*cx) - let js_stream = NewReadableExternalSourceStreamObject( - *cx, - js_wrapper, - ptr::null_mut(), - HandleObject::null(), - ) - ); + // Assert: stream.[[state]] is "readable". + assert!(self.is_readable()); - stream.js_stream.set(UnwrapReadableStream(js_stream.get())); + // Append readRequest to stream.[[reader]].[[readRequests]]. + reader.add_read_request(read_request); + }, + ReaderType::BYOB(_) => { + unreachable!("Adding a read request can only be done on a default reader.") + }, } - - stream } /// Get a pointer to the underlying JS object. + /// TODO: remove, + /// by using at call point the `ReadableStream` directly instead of a JSObject. pub fn get_js_stream(&self) -> NonNull<JSObject> { - NonNull::new(self.js_stream.get()) + NonNull::new(*self.reflector().get_jsobject()) .expect("Couldn't get a non-null pointer to JS stream object.") } + /// Endpoint to enqueue chunks directly from Rust. + /// Note: in other use cases this call happens via the controller. + pub fn enqueue_native(&self, bytes: Vec<u8>) { + match self.controller { + ControllerType::Default(ref controller) => controller + .get() + .expect("Stream should have controller.") + .enqueue_native(bytes), + _ => unreachable!( + "Enqueueing chunk to a stream from Rust on other than default controller" + ), + } + } - #[allow(unsafe_code)] - pub fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) { - let global = self.global(); - let _ar = enter_realm(&*global); - let cx = GlobalScope::get_cx(); - - let handle = unsafe { self.js_stream.handle() }; + /// <https://streams.spec.whatwg.org/#readable-stream-error> + pub fn error(&self, e: SafeHandleValue) { + // Assert: stream.[[state]] is "readable". + assert!(self.is_readable()); + // Set stream.[[state]] to "errored". + self.state.set(ReadableStreamState::Errored); + // Set stream.[[storedError]] to e. + self.stored_error.set(e.get()); + + // Let reader be stream.[[reader]]. + match self.reader { + ReaderType::Default(ref reader) => { + let Some(reader) = reader.get() else { + // If reader is undefined, return. + return; + }; + reader.error(e); + }, + // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e). + _ => todo!(), + } + } - self.external_underlying_source - .as_ref() - .expect("No external source to enqueue bytes.") - .enqueue_chunk(cx, handle, bytes, can_gc); + /// <https://streams.spec.whatwg.org/#readablestream-storederror> + pub fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) { + handle_mut.set(self.stored_error.get()); } + /// <https://streams.spec.whatwg.org/#readable-stream-error> + /// Note: in other use cases this call happens via the controller. #[allow(unsafe_code)] pub fn error_native(&self, error: Error) { - let global = self.global(); - let _ar = enter_realm(&*global); let cx = GlobalScope::get_cx(); - - unsafe { - rooted!(in(*cx) let mut js_error = UndefinedValue()); - error.to_jsval(*cx, &global, js_error.handle_mut()); - ReadableStreamError( - *cx, - self.js_stream.handle(), - js_error.handle().into_handle(), - ); - } + rooted!(in(*cx) let mut error_val = UndefinedValue()); + unsafe { error.to_jsval(*cx, &self.global(), error_val.handle_mut()) }; + self.error(error_val.handle()); } - #[allow(unsafe_code)] - pub fn close_native(&self) { - let global = self.global(); - let _ar = enter_realm(&*global); - let cx = GlobalScope::get_cx(); - - let handle = unsafe { self.js_stream.handle() }; - - self.external_underlying_source - .as_ref() - .expect("No external source to close.") - .close(cx, handle); + /// Call into the controller's `Close` method. + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close> + pub fn controller_close_native(&self) { + match self.controller { + ControllerType::Default(ref controller) => { + let _ = controller + .get() + .expect("Stream should have controller.") + .Close(); + }, + ControllerType::Byte(_) => { + unreachable!("Native closing is only done on default controllers.") + }, + } } - /// Does the stream have all data in memory? + /// Returns a boolean reflecting whether the stream has all data in memory. + /// Useful for native source integration only. pub fn in_memory(&self) -> bool { - self.external_underlying_source - .as_ref() - .map(|source| source.in_memory()) - .unwrap_or(false) + match self.controller { + ControllerType::Default(ref controller) => controller + .get() + .expect("Stream should have controller.") + .in_memory(), + ControllerType::Byte(_) => unreachable!( + "Checking if source is in memory for a stream with a non-default controller" + ), + } } /// Return bytes for synchronous use, if the stream has all data in memory. + /// Useful for native source integration only. pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> { - self.external_underlying_source - .as_ref() - .and_then(|source| source.get_in_memory_bytes()) + match self.controller { + ControllerType::Default(ref controller) => controller + .get() + .expect("Stream should have controller.") + .get_in_memory_bytes(), + ControllerType::Byte(_) => { + unreachable!("Getting in-memory bytes for a stream with a non-default controller") + }, + } } /// Acquires a reader and locks the stream, /// must be done before `read_a_chunk`. - #[allow(unsafe_code)] - pub fn start_reading(&self) -> Result<(), ()> { - if self.is_locked() || self.is_disturbed() { - return Err(()); - } + /// Native call to + /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader> + pub fn acquire_default_reader( + &self, + can_gc: CanGc, + ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> { + // Let reader be a new ReadableStreamDefaultReader. + let reader = reflect_dom_object( + Box::new(ReadableStreamDefaultReader::new_inherited( + &self.global(), + can_gc, + )), + &*self.global(), + can_gc, + ); - let global = self.global(); - let _ar = enter_realm(&*global); - let cx = GlobalScope::get_cx(); + // Perform ? SetUpReadableStreamDefaultReader(reader, stream). + reader.set_up(self, &self.global(), can_gc)?; - unsafe { - rooted!(in(*cx) let reader = ReadableStreamGetReader( - *cx, - self.js_stream.handle(), - ReadableStreamReaderMode::Default, - )); + // Return reader. + Ok(reader) + } - // Note: the stream is locked to the reader. - self.js_reader.set(reader.get()); + pub fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> { + match self.controller { + ControllerType::Default(ref controller) => { + controller.get().expect("Stream should have controller.") + }, + ControllerType::Byte(_) => unreachable!( + "Getting default controller for a stream with a non-default controller" + ), } - - self.has_reader.set(true); - Ok(()) } /// Read a chunk from the stream, /// must be called after `start_reading`, /// and before `stop_reading`. - #[allow(unsafe_code)] - pub fn read_a_chunk(&self) -> Rc<Promise> { - if !self.has_reader.get() { - panic!("Attempt to read stream chunk without having acquired a reader."); - } - - let global = self.global(); - let _ar = enter_realm(&*global); - let _aes = AutoEntryScript::new(&global); - - let cx = GlobalScope::get_cx(); - - unsafe { - rooted!(in(*cx) let promise_obj = ReadableStreamDefaultReaderRead( - *cx, - self.js_reader.handle(), - )); - Promise::new_with_js_promise(promise_obj.handle(), cx) + /// Native call to + /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read> + pub fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> { + match self.reader { + ReaderType::Default(ref reader) => { + let Some(reader) = reader.get() else { + unreachable!( + "Attempt to read stream chunk without having first acquired a reader." + ); + }; + reader.Read(can_gc) + }, + ReaderType::BYOB(_) => { + unreachable!("Native reading of a chunk can only be done with a default reader.") + }, } } /// Releases the lock on the reader, /// must be done after `start_reading`. - #[allow(unsafe_code)] + /// Native call to + /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease> pub fn stop_reading(&self) { - if !self.has_reader.get() { - panic!("ReadableStream::stop_reading called on a readerless stream."); - } - - self.has_reader.set(false); - - let global = self.global(); - let _ar = enter_realm(&*global); - let cx = GlobalScope::get_cx(); - - unsafe { - ReadableStreamReaderReleaseLock(*cx, self.js_reader.handle()); - // Note: is this the way to nullify the Heap? - self.js_reader.set(ptr::null_mut()); + match self.reader { + ReaderType::Default(ref reader) => { + let Some(reader) = reader.get() else { + unreachable!("Attempt to stop reading without having first acquired a reader."); + }; + reader.release(); + }, + ReaderType::BYOB(_) => { + unreachable!("Native stop reading can only be done with a default reader.") + }, } } - #[allow(unsafe_code)] + /// <https://streams.spec.whatwg.org/#is-readable-stream-locked> pub fn is_locked(&self) -> bool { - // If we natively took a reader, we're locked. - if self.has_reader.get() { - return true; + match self.reader { + ReaderType::Default(ref reader) => reader.get().is_some(), + ReaderType::BYOB(ref reader) => reader.get().is_some(), } - - // Otherwise, still double-check that script didn't lock the stream. - let cx = GlobalScope::get_cx(); - let mut locked_or_disturbed = false; - - unsafe { - ReadableStreamIsLocked(*cx, self.js_stream.handle(), &mut locked_or_disturbed); - } - - locked_or_disturbed } - #[allow(unsafe_code)] pub fn is_disturbed(&self) -> bool { - // Check that script didn't disturb the stream. - let cx = GlobalScope::get_cx(); - let mut locked_or_disturbed = false; - - unsafe { - ReadableStreamIsDisturbed(*cx, self.js_stream.handle(), &mut locked_or_disturbed); - } - - locked_or_disturbed + self.disturbed.get() } -} -#[allow(unsafe_code)] -unsafe extern "C" fn request_data( - source: *const c_void, - cx: *mut JSContext, - stream: HandleObject, - desired_size: usize, -) { - let source = &*(source as *const ExternalUnderlyingSourceController); - source.pull( - SafeJSContext::from_ptr(cx), - stream, - desired_size, - CanGc::note(), - ); -} - -#[allow(unsafe_code)] -unsafe extern "C" fn write_into_read_request_buffer( - source: *const c_void, - _cx: *mut JSContext, - _stream: HandleObject, - chunk: HandleObject, - length: usize, - bytes_written: *mut usize, -) { - let source = &*(source as *const ExternalUnderlyingSourceController); - let mut is_shared_memory = false; - let buffer = JS_GetArrayBufferViewData( - *chunk, - &mut is_shared_memory, - &AutoRequireNoGC { _address: 0 }, - ); - assert!(!is_shared_memory); - let slice = slice::from_raw_parts_mut(buffer as *mut u8, length); - source.write_into_buffer(slice); - - // Currently we're always able to completely fulfill the write request. - *bytes_written = length; -} - -#[allow(unsafe_code)] -unsafe extern "C" fn cancel( - _source: *const c_void, - _cx: *mut JSContext, - _stream: HandleObject, - _reason: HandleValue, - _resolve_to: *mut JSVal, -) { -} - -#[allow(unsafe_code)] -unsafe extern "C" fn close(_source: *const c_void, _cx: *mut JSContext, _stream: HandleObject) {} + pub fn set_is_disturbed(&self, disturbed: bool) { + self.disturbed.set(disturbed); + } -#[allow(unsafe_code)] -unsafe extern "C" fn error( - _source: *const c_void, - _cx: *mut JSContext, - _stream: HandleObject, - _reason: HandleValue, -) { -} + pub fn is_closed(&self) -> bool { + self.state.get() == ReadableStreamState::Closed + } -#[allow(unsafe_code)] -unsafe extern "C" fn finalize(source: *mut ReadableStreamUnderlyingSource) { - DeleteReadableStreamUnderlyingSource(source); -} + pub fn is_errored(&self) -> bool { + self.state.get() == ReadableStreamState::Errored + } -pub enum ExternalUnderlyingSource { - /// Facilitate partial integration with sources - /// that are currently read into memory. - Memory(usize), - /// A blob as underlying source, with a known total size. - Blob(usize), - /// A fetch response as underlying source. - FetchResponse, -} + pub fn is_readable(&self) -> bool { + self.state.get() == ReadableStreamState::Readable + } -#[derive(JSTraceable, MallocSizeOf)] -struct ExternalUnderlyingSourceController { - /// Loosely matches the underlying queue, - /// <https://streams.spec.whatwg.org/#internal-queues> - buffer: RefCell<Vec<u8>>, - /// Has the stream been closed by native code? - closed: Cell<bool>, - /// Does this stream contains all it's data in memory? - in_memory: Cell<bool>, -} + pub fn has_default_reader(&self) -> bool { + match self.reader { + ReaderType::Default(ref reader) => reader.get().is_some(), + ReaderType::BYOB(_) => false, + } + } -impl ExternalUnderlyingSourceController { - fn new(source: ExternalUnderlyingSource) -> ExternalUnderlyingSourceController { - let (buffer, in_mem) = match source { - ExternalUnderlyingSource::Blob(size) => (Vec::with_capacity(size), false), - ExternalUnderlyingSource::Memory(size) => (Vec::with_capacity(size), true), - ExternalUnderlyingSource::FetchResponse => (vec![], false), - }; - ExternalUnderlyingSourceController { - buffer: RefCell::new(buffer), - closed: Cell::new(false), - in_memory: Cell::new(in_mem), + /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests> + pub fn get_num_read_requests(&self) -> usize { + assert!(self.has_default_reader()); + match self.reader { + ReaderType::Default(ref reader) => { + let reader = reader + .get() + .expect("Stream must have a reader when get num read requests is called into."); + reader.get_num_read_requests() + }, + ReaderType::BYOB(_) => unreachable!( + "Stream must have a default reader when get num read requests is called into." + ), } } - /// Does the stream have all data in memory? - pub fn in_memory(&self) -> bool { - self.in_memory.get() + /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request> + #[allow(crown::unrooted_must_root)] + pub fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool) { + // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true. + assert!(self.has_default_reader()); + match self.reader { + ReaderType::Default(ref reader) => { + // step 2 - Let reader be stream.[[reader]]. + let reader = reader + .get() + .expect("Stream must have a reader when a read request is fulfilled."); + // step 3 - Assert: reader.[[readRequests]] is not empty. + assert_ne!(reader.get_num_read_requests(), 0); + // step 4 & 5 + // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]]. + let request = reader.remove_read_request(); + + if done { + // step 6 - If done is true, perform readRequest’s close steps. + request.close_steps(); + } else { + // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk. + let result = RootedTraceableBox::new(Heap::default()); + result.set(*chunk); + request.chunk_steps(result); + } + }, + ReaderType::BYOB(_) => unreachable!( + "Stream must have a default reader when fulfill read requests is called into." + ), + } } - /// Return bytes synchronously if the stream has all data in memory. - pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> { - if self.in_memory.get() { - return Some(self.buffer.borrow().clone()); + /// <https://streams.spec.whatwg.org/#readable-stream-close> + pub fn close(&self) { + // Assert: stream.[[state]] is "readable". + assert!(self.is_readable()); + // Set stream.[[state]] to "closed". + self.state.set(ReadableStreamState::Closed); + // Let reader be stream.[[reader]]. + match self.reader { + ReaderType::Default(ref reader) => { + let Some(reader) = reader.get() else { + // If reader is undefined, return. + return; + }; + // step 5 & 6 + reader.close(); + }, + ReaderType::BYOB(ref _reader) => todo!(), } - None } - /// Signal available bytes if the stream is currently readable. - /// The apparently unused CanGc argument represents that the JS API calls like - /// `ReadableStreamUpdateDataAvailableFromSource` can trigger a GC. + /// <https://streams.spec.whatwg.org/#readable-stream-cancel> #[allow(unsafe_code)] - fn maybe_signal_available_bytes( - &self, - cx: SafeJSContext, - stream: HandleObject, - available: usize, - _can_gc: CanGc, - ) { - if available == 0 { - return; + pub fn cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> { + // Set stream.[[disturbed]] to true. + self.disturbed.set(true); + + // If stream.[[state]] is "closed", return a promise resolved with undefined. + if self.is_closed() { + let promise = Promise::new(&self.reflector_.global(), can_gc); + promise.resolve_native(&()); + return promise; } - unsafe { - let mut readable = false; - if !ReadableStreamIsReadable(*cx, stream, &mut readable) { - return; - } - if readable { - ReadableStreamUpdateDataAvailableFromSource(*cx, stream, available as u32); + // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]]. + if self.is_errored() { + let promise = Promise::new(&self.reflector_.global(), can_gc); + unsafe { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + self.stored_error.to_jsval(*cx, rval.handle_mut()); + promise.reject_native(&rval.handle()); + return promise; } } + // Perform ! ReadableStreamClose(stream). + self.close(); + // step 5, 6, 7, 8 + // TODO: run the bytes reader steps. + + // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason). + let source_cancel_promise = match self.controller { + ControllerType::Default(ref controller) => controller + .get() + .expect("Stream should have controller.") + .perform_cancel_steps(reason, can_gc), + ControllerType::Byte(_) => { + todo!() + }, + }; + + // Create a new promise, + // and setup a handler in order to react to the fulfillment of sourceCancelPromise. + let global = self.reflector_.global(); + let result_promise = Promise::new(&global, can_gc); + let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler { + result: result_promise.clone(), + }); + let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler { + result: result_promise.clone(), + }); + let handler = + PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler)); + let realm = enter_realm(&*global); + let comp = InRealm::Entered(&realm); + source_cancel_promise.append_native_handler(&handler, comp, can_gc); + + // Return the result of reacting to sourceCancelPromise + // with a fulfillment step that returns undefined. + result_promise } - /// Close a currently readable js stream. - #[allow(unsafe_code)] - fn maybe_close_js_stream(&self, cx: SafeJSContext, stream: HandleObject) { - unsafe { - let mut readable = false; - if !ReadableStreamIsReadable(*cx, stream, &mut readable) { - return; - } - if readable { - ReadableStreamClose(*cx, stream); - } + pub fn set_reader(&self, new_reader: Option<&ReadableStreamDefaultReader>) { + match self.reader { + ReaderType::Default(ref reader) => { + reader.set(new_reader); + }, + ReaderType::BYOB(_) => { + unreachable!("Setting a reader can only be done on a default reader.") + }, } } + /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee> + #[allow(crown::unrooted_must_root)] + fn default_tee( + &self, + clone_for_branch_2: bool, + can_gc: CanGc, + ) -> Fallible<Vec<DomRoot<ReadableStream>>> { + // Assert: stream implements ReadableStream. + + // Assert: cloneForBranch2 is a boolean. + let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2)); + + // Let reader be ? AcquireReadableStreamDefaultReader(stream). + let reader = self.acquire_default_reader(can_gc)?; + self.set_reader(Some(&reader)); + + // Let reading be false. + let reading = Rc::new(Cell::new(false)); + // Let readAgain be false. + let read_again = Rc::new(Cell::new(false)); + // Let canceled1 be false. + let canceled_1 = Rc::new(Cell::new(false)); + // Let canceled2 be false. + let canceled_2 = Rc::new(Cell::new(false)); + + // Let reason1 be undefined. + let reason_1 = Rc::new(Heap::boxed(UndefinedValue())); + // Let reason2 be undefined. + let reason_2 = Rc::new(Heap::boxed(UndefinedValue())); + // Let cancelPromise be a new promise. + let cancel_promise = Promise::new(&self.reflector_.global(), can_gc); + + let tee_source_1 = DefaultTeeUnderlyingSource::new( + &reader, + self, + reading.clone(), + read_again.clone(), + canceled_1.clone(), + canceled_2.clone(), + clone_for_branch_2.clone(), + reason_1.clone(), + reason_2.clone(), + cancel_promise.clone(), + TeeCancelAlgorithm::Cancel1Algorithm, + can_gc, + ); + + let underlying_source_type_branch_1 = + UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1)); + + let tee_source_2 = DefaultTeeUnderlyingSource::new( + &reader, + self, + reading, + read_again, + canceled_1.clone(), + canceled_2.clone(), + clone_for_branch_2, + reason_1, + reason_2, + cancel_promise.clone(), + TeeCancelAlgorithm::Cancel2Algorithm, + can_gc, + ); + + let underlying_source_type_branch_2 = + UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2)); + + // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm). + let branch_1 = create_readable_stream( + &self.reflector_.global(), + underlying_source_type_branch_1, + QueuingStrategy::empty(), + can_gc, + ); + tee_source_1.set_branch_1(&branch_1); + tee_source_2.set_branch_1(&branch_1); + + // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm). + let branch_2 = create_readable_stream( + &self.reflector_.global(), + underlying_source_type_branch_2, + QueuingStrategy::empty(), + can_gc, + ); + tee_source_1.set_branch_2(&branch_2); + tee_source_2.set_branch_2(&branch_2); + + // Upon rejection of reader.[[closedPromise]] with reason r, + reader.append_native_handler_to_closed_promise( + &branch_1, + &branch_2, + canceled_1, + canceled_2, + cancel_promise, + can_gc, + ); - fn close(&self, cx: SafeJSContext, stream: HandleObject) { - self.closed.set(true); - self.maybe_close_js_stream(cx, stream); + // Return « branch_1, branch_2 ». + Ok(vec![branch_1, branch_2]) } - fn enqueue_chunk( + /// <https://streams.spec.whatwg.org/#readable-stream-tee> + fn tee( &self, + clone_for_branch_2: bool, + can_gc: CanGc, + ) -> Fallible<Vec<DomRoot<ReadableStream>>> { + // Assert: stream implements ReadableStream. + // Assert: cloneForBranch2 is a boolean. + + match self.controller { + ControllerType::Default(ref _controller) => { + // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2). + self.default_tee(clone_for_branch_2, can_gc) + }, + ControllerType::Byte(ref _controller) => { + // If stream.[[controller]] implements ReadableByteStreamController, + // return ? ReadableByteStreamTee(stream). + todo!() + }, + } + } +} + +impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream { + /// <https://streams.spec.whatwg.org/#rs-constructor> + fn Constructor( cx: SafeJSContext, - stream: HandleObject, - mut chunk: Vec<u8>, + global: &GlobalScope, + proto: Option<SafeHandleObject>, can_gc: CanGc, - ) { - let available = { - let mut buffer = self.buffer.borrow_mut(); - buffer.append(&mut chunk); - buffer.len() + underlying_source: Option<*mut JSObject>, + strategy: &QueuingStrategy, + ) -> Fallible<DomRoot<Self>> { + // If underlyingSource is missing, set it to null. + rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut())); + // Let underlyingSourceDict be underlyingSource, + // converted to an IDL value of type UnderlyingSource. + let underlying_source_dict = if !underlying_source_obj.is_null() { + rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get())); + match JsUnderlyingSource::new(cx, obj_val.handle()) { + Ok(ConversionResult::Success(val)) => val, + Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())), + _ => { + return Err(Error::JSFailed); + }, + } + } else { + JsUnderlyingSource::empty() }; - self.maybe_signal_available_bytes(cx, stream, available, can_gc); - } - #[allow(unsafe_code)] - fn pull(&self, cx: SafeJSContext, stream: HandleObject, _desired_size: usize, can_gc: CanGc) { - // Note: for pull sources, - // this would be the time to ask for a chunk. + // Perform ! InitializeReadableStream(this). + let stream = if underlying_source_dict.type_.is_some() { + ReadableStream::new_with_proto( + global, + proto, + ControllerType::Byte(MutNullableDom::new(None)), + can_gc, + ) + } else { + ReadableStream::new_with_proto( + global, + proto, + ControllerType::Default(MutNullableDom::new(None)), + can_gc, + ) + }; - if self.closed.get() { - return self.maybe_close_js_stream(cx, stream); - } + if underlying_source_dict.type_.is_some() { + // TODO: If underlyingSourceDict["type"] is "bytes" + return Err(Error::Type("Bytes streams not implemented".to_string())); + } else { + // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1). + let high_water_mark = extract_high_water_mark(strategy, 1.0)?; + + // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy). + let size_algorithm = extract_size_algorithm(strategy); + + let controller = ReadableStreamDefaultController::new( + global, + UnderlyingSourceType::Js(underlying_source_dict, Heap::default()), + high_water_mark, + size_algorithm, + can_gc, + ); + + // Note: this must be done before `setup`, + // otherwise `thisOb` is null in the start callback. + controller.set_underlying_source_this_object(underlying_source_obj.handle()); - let available = { - let buffer = self.buffer.borrow(); - buffer.len() + // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource + controller.setup(stream.clone(), can_gc)?; }; - self.maybe_signal_available_bytes(cx, stream, available, can_gc); + Ok(stream) } - fn get_chunk_with_length(&self, length: usize) -> Vec<u8> { - let mut buffer = self.buffer.borrow_mut(); - let buffer_len = buffer.len(); - assert!(buffer_len >= length); - buffer.split_off(buffer_len - length) + /// <https://streams.spec.whatwg.org/#rs-locked> + fn Locked(&self) -> bool { + self.is_locked() } - fn write_into_buffer(&self, dest: &mut [u8]) { - let length = dest.len(); - let chunk = self.get_chunk_with_length(length); - dest.copy_from_slice(chunk.as_slice()); + /// <https://streams.spec.whatwg.org/#rs-cancel> + fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> { + if self.is_locked() { + // If ! IsReadableStreamLocked(this) is true, + // return a promise rejected with a TypeError exception. + let promise = Promise::new(&self.reflector_.global(), can_gc); + promise.reject_error(Error::Type("stream is not locked".to_owned())); + promise + } else { + // Return ! ReadableStreamCancel(this, reason). + self.cancel(reason, can_gc) + } + } + + /// <https://streams.spec.whatwg.org/#rs-get-reader> + fn GetReader( + &self, + options: &ReadableStreamGetReaderOptions, + can_gc: CanGc, + ) -> Fallible<ReadableStreamReader> { + // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this). + if options.mode.is_none() { + return Ok(ReadableStreamReader::ReadableStreamDefaultReader( + self.acquire_default_reader(can_gc)?, + )); + } + // 2. Assert: options["mode"] is "byob". + assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob); + + // 3. Return ? AcquireReadableStreamBYOBReader(this). + Err(Error::Type( + "AcquireReadableStreamBYOBReader is not implemented".to_owned(), + )) + } + + /// <https://streams.spec.whatwg.org/#rs-tee> + fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> { + // Return ? ReadableStreamTee(this, false). + self.tee(false, can_gc) } } #[allow(unsafe_code)] /// Get the `done` property of an object that a read promise resolved to. pub fn get_read_promise_done(cx: SafeJSContext, v: &SafeHandleValue) -> Result<bool, Error> { + if !v.is_object() { + return Err(Error::Type("Unknown format for done property.".to_string())); + } unsafe { rooted!(in(*cx) let object = v.to_object()); rooted!(in(*cx) let mut done = UndefinedValue()); @@ -547,6 +944,11 @@ pub fn get_read_promise_done(cx: SafeJSContext, v: &SafeHandleValue) -> Result<b #[allow(unsafe_code)] /// Get the `value` property of an object that a read promise resolved to. pub fn get_read_promise_bytes(cx: SafeJSContext, v: &SafeHandleValue) -> Result<Vec<u8>, Error> { + if !v.is_object() { + return Err(Error::Type( + "Unknown format for for bytes read.".to_string(), + )); + } unsafe { rooted!(in(*cx) let object = v.to_object()); rooted!(in(*cx) let mut bytes = UndefinedValue()); diff --git a/components/script/dom/readablestreambyobreader.rs b/components/script/dom/readablestreambyobreader.rs new file mode 100644 index 00000000000..556fe15ea54 --- /dev/null +++ b/components/script/dom/readablestreambyobreader.rs @@ -0,0 +1,81 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#![allow(dead_code)] + +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::gc::CustomAutoRooterGuard; +use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue}; +use js::typedarray::ArrayBufferView; + +use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods; +use crate::dom::bindings::error::Error; +use crate::dom::bindings::import::module::Fallible; +use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector}; +use crate::dom::bindings::root::DomRoot; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::readablestream::ReadableStream; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; + +/// <https://streams.spec.whatwg.org/#readablestreambyobreader> +#[dom_struct] +pub struct ReadableStreamBYOBReader { + reflector_: Reflector, +} + +impl ReadableStreamBYOBReader { + fn new_inherited() -> ReadableStreamBYOBReader { + ReadableStreamBYOBReader { + reflector_: Reflector::new(), + } + } + + fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> { + reflect_dom_object( + Box::new(ReadableStreamBYOBReader::new_inherited()), + global, + can_gc, + ) + } +} + +impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader { + /// <https://streams.spec.whatwg.org/#byob-reader-constructor> + fn Constructor( + _global: &GlobalScope, + _proto: Option<SafeHandleObject>, + _can_gc: CanGc, + _stream: &ReadableStream, + ) -> Fallible<DomRoot<Self>> { + // TODO + Err(Error::NotFound) + } + + /// <https://streams.spec.whatwg.org/#byob-reader-read> + fn Read(&self, _view: CustomAutoRooterGuard<ArrayBufferView>, can_gc: CanGc) -> Rc<Promise> { + // TODO + Promise::new(&self.reflector_.global(), can_gc) + } + + /// <https://streams.spec.whatwg.org/#byob-reader-release-lock> + fn ReleaseLock(&self) -> Fallible<()> { + // TODO + Err(Error::NotFound) + } + + /// <https://streams.spec.whatwg.org/#generic-reader-closed> + fn Closed(&self, can_gc: CanGc) -> Rc<Promise> { + // TODO + Promise::new(&self.reflector_.global(), can_gc) + } + + /// <https://streams.spec.whatwg.org/#generic-reader-cancel> + fn Cancel(&self, _cx: SafeJSContext, _reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> { + // TODO + Promise::new(&self.reflector_.global(), can_gc) + } +} diff --git a/components/script/dom/readablestreambyobrequest.rs b/components/script/dom/readablestreambyobrequest.rs new file mode 100644 index 00000000000..e2f7199d170 --- /dev/null +++ b/components/script/dom/readablestreambyobrequest.rs @@ -0,0 +1,39 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use dom_struct::dom_struct; + +use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBRequestBinding::ReadableStreamBYOBRequestMethods; +use crate::dom::bindings::import::module::{Error, Fallible}; +use crate::dom::bindings::reflector::Reflector; +use crate::script_runtime::JSContext as SafeJSContext; + +/// <https://streams.spec.whatwg.org/#readablestreambyobrequest> +#[dom_struct] +pub struct ReadableStreamBYOBRequest { + reflector_: Reflector, +} + +impl ReadableStreamBYOBRequestMethods<crate::DomTypeHolder> for ReadableStreamBYOBRequest { + /// <https://streams.spec.whatwg.org/#rs-byob-request-view> + fn GetView(&self, _cx: SafeJSContext) -> Option<js::typedarray::ArrayBufferView> { + // TODO + None + } + + /// <https://streams.spec.whatwg.org/#rs-byob-request-respond> + fn Respond(&self, _bytes_written: u64) -> Fallible<()> { + // TODO + Err(Error::NotFound) + } + + /// <https://streams.spec.whatwg.org/#rs-byob-request-respond-with-new-view> + fn RespondWithNewView( + &self, + _view: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>, + ) -> Fallible<()> { + // TODO + Err(Error::NotFound) + } +} diff --git a/components/script/dom/readablestreamdefaultcontroller.rs b/components/script/dom/readablestreamdefaultcontroller.rs new file mode 100644 index 00000000000..1496031d998 --- /dev/null +++ b/components/script/dom/readablestreamdefaultcontroller.rs @@ -0,0 +1,878 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::ptr; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::{Heap, JSObject}; +use js::jsval::{JSVal, UndefinedValue}; +use js::rust::wrappers::JS_GetPendingException; +use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue}; +use js::typedarray::Uint8; + +use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize; +use crate::dom::bindings::buffer_source::create_buffer_source; +use crate::dom::bindings::callback::ExceptionHandling; +use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods; +use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller; +use crate::dom::bindings::import::module::{throw_dom_exception, Error, Fallible}; +use crate::dom::bindings::refcounted::Trusted; +use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector}; +use crate::dom::bindings::root::{DomRoot, MutNullableDom}; +use crate::dom::bindings::trace::RootedTraceableBox; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; +use crate::dom::readablestream::ReadableStream; +use crate::dom::readablestreamdefaultreader::ReadRequest; +use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType}; +use crate::js::conversions::ToJSValConvertible; +use crate::realms::{enter_realm, InRealm}; +use crate::script_runtime::{CanGc, JSContext, JSContext as SafeJSContext}; + +/// The fulfillment handler for +/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +struct PullAlgorithmFulfillmentHandler { + #[ignore_malloc_size_of = "Trusted are hard"] + controller: Trusted<ReadableStreamDefaultController>, +} + +impl Callback for PullAlgorithmFulfillmentHandler { + /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> + /// Upon fulfillment of pullPromise + fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) { + let controller = self.controller.root(); + + // Set controller.[[pulling]] to false. + controller.pulling.set(false); + + // If controller.[[pullAgain]] is true, + if controller.pull_again.get() { + // Set controller.[[pullAgain]] to false. + controller.pull_again.set(false); + + // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller). + controller.call_pull_if_needed(can_gc); + } + } +} + +/// The rejection handler for +/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +struct PullAlgorithmRejectionHandler { + #[ignore_malloc_size_of = "Trusted are hard"] + controller: Trusted<ReadableStreamDefaultController>, +} + +impl Callback for PullAlgorithmRejectionHandler { + /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> + /// Upon rejection of pullPromise with reason e. + fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) { + let controller = self.controller.root(); + + // Perform ! ReadableStreamDefaultControllerError(controller, e). + controller.error(v); + } +} + +/// The fulfillment handler for +/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +struct StartAlgorithmFulfillmentHandler { + #[ignore_malloc_size_of = "Trusted are hard"] + controller: Trusted<ReadableStreamDefaultController>, +} + +impl Callback for StartAlgorithmFulfillmentHandler { + /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + /// Upon fulfillment of startPromise, + fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) { + let controller = self.controller.root(); + + // Set controller.[[started]] to true. + controller.started.set(true); + + // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller). + controller.call_pull_if_needed(can_gc); + } +} + +/// The rejection handler for +/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[allow(crown::unrooted_must_root)] +struct StartAlgorithmRejectionHandler { + #[ignore_malloc_size_of = "Trusted are hard"] + controller: Trusted<ReadableStreamDefaultController>, +} + +impl Callback for StartAlgorithmRejectionHandler { + /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + /// Upon rejection of startPromise with reason r, + fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) { + let controller = self.controller.root(); + + // Perform ! ReadableStreamDefaultControllerError(controller, r). + controller.error(v); + } +} + +/// <https://streams.spec.whatwg.org/#value-with-size> +#[derive(JSTraceable)] +#[crown::unrooted_must_root_lint::must_root] +pub struct ValueWithSize { + value: Box<Heap<JSVal>>, + size: f64, +} + +/// <https://streams.spec.whatwg.org/#value-with-size> +#[derive(JSTraceable)] +#[crown::unrooted_must_root_lint::must_root] +pub enum EnqueuedValue { + /// A value enqueued from Rust. + Native(Box<[u8]>), + /// A Js value. + Js(ValueWithSize), +} + +impl EnqueuedValue { + fn size(&self) -> f64 { + match self { + EnqueuedValue::Native(v) => v.len() as f64, + EnqueuedValue::Js(v) => v.size, + } + } + + #[allow(unsafe_code)] + fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue) { + match self { + EnqueuedValue::Native(chunk) => { + rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>()); + create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut()) + .expect("failed to create buffer source for native chunk."); + unsafe { array_buffer_ptr.to_jsval(*cx, rval) }; + }, + EnqueuedValue::Js(value_with_size) => unsafe { + value_with_size.value.to_jsval(*cx, rval); + }, + } + } +} + +/// <https://streams.spec.whatwg.org/#is-non-negative-number> +fn is_non_negative_number(value: &EnqueuedValue) -> bool { + let value_with_size = match value { + EnqueuedValue::Native(_) => return true, + EnqueuedValue::Js(value_with_size) => value_with_size, + }; + + // If v is not a Number, return false. + // Checked as part of the WebIDL. + + // If v is NaN, return false. + if value_with_size.size.is_nan() { + return false; + } + + // If v < 0, return false. + if value_with_size.size.is_sign_negative() { + return false; + } + + true +} + +/// <https://streams.spec.whatwg.org/#queue-with-sizes> +#[derive(Default, JSTraceable, MallocSizeOf)] +#[crown::unrooted_must_root_lint::must_root] +pub struct QueueWithSizes { + #[ignore_malloc_size_of = "EnqueuedValue::Js"] + queue: VecDeque<EnqueuedValue>, + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize> + total_size: f64, +} + +impl QueueWithSizes { + /// <https://streams.spec.whatwg.org/#dequeue-value> + #[allow(crown::unrooted_must_root)] + fn dequeue_value(&mut self) -> EnqueuedValue { + let value = self + .queue + .pop_front() + .expect("Buffer cannot be empty when dequeue value is called into."); + self.total_size -= value.size(); + value + } + + /// <https://streams.spec.whatwg.org/#enqueue-value-with-size> + #[allow(crown::unrooted_must_root)] + fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> { + // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception. + if !is_non_negative_number(&value) { + return Err(Error::Range( + "The size of the enqueued chunk is not a non-negative number.".to_string(), + )); + } + + // If size is +∞, throw a RangeError exception. + if value.size().is_infinite() { + return Err(Error::Range( + "The size of the enqueued chunk is infinite.".to_string(), + )); + } + + self.total_size += value.size(); + self.queue.push_back(value); + + Ok(()) + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + /// Only used with native sources. + fn get_in_memory_bytes(&self) -> Option<Vec<u8>> { + self.queue + .iter() + .try_fold(Vec::new(), |mut acc, value| match value { + EnqueuedValue::Native(chunk) => { + acc.extend(chunk.iter().copied()); + Some(acc) + }, + _ => { + warn!("get_in_memory_bytes called on a controller with non-native source."); + None + }, + }) + } + + /// <https://streams.spec.whatwg.org/#reset-queue> + fn reset(&mut self) { + self.queue.clear(); + self.total_size = Default::default(); + } +} + +/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller> +#[dom_struct] +pub struct ReadableStreamDefaultController { + reflector_: Reflector, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue> + queue: RefCell<QueueWithSizes>, + + /// A mutable reference to the underlying source is used to implement these two + /// internal slots: + /// + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm> + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm> + underlying_source: MutNullableDom<UnderlyingSourceContainer>, + + stream: MutNullableDom<ReadableStream>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm> + strategy_hwm: f64, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm> + #[ignore_malloc_size_of = "mozjs"] + strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested> + close_requested: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started> + started: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling> + pulling: Cell<bool>, + + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain> + pull_again: Cell<bool>, +} + +impl ReadableStreamDefaultController { + #[allow(crown::unrooted_must_root)] + fn new_inherited( + global: &GlobalScope, + underlying_source_type: UnderlyingSourceType, + strategy_hwm: f64, + strategy_size: Rc<QueuingStrategySize>, + can_gc: CanGc, + ) -> ReadableStreamDefaultController { + ReadableStreamDefaultController { + reflector_: Reflector::new(), + queue: RefCell::new(Default::default()), + stream: MutNullableDom::new(None), + underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new( + global, + underlying_source_type, + can_gc, + ))), + strategy_hwm, + strategy_size: RefCell::new(Some(strategy_size)), + close_requested: Default::default(), + started: Default::default(), + pulling: Default::default(), + pull_again: Default::default(), + } + } + + #[allow(crown::unrooted_must_root)] + pub fn new( + global: &GlobalScope, + underlying_source: UnderlyingSourceType, + strategy_hwm: f64, + strategy_size: Rc<QueuingStrategySize>, + can_gc: CanGc, + ) -> DomRoot<ReadableStreamDefaultController> { + reflect_dom_object( + Box::new(ReadableStreamDefaultController::new_inherited( + global, + underlying_source, + strategy_hwm, + strategy_size, + can_gc, + )), + global, + can_gc, + ) + } + + /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + #[allow(unsafe_code)] + pub fn setup(&self, stream: DomRoot<ReadableStream>, can_gc: CanGc) -> Result<(), Error> { + // Assert: stream.[[controller]] is undefined + stream.assert_no_controller(); + + // Set controller.[[stream]] to stream. + self.stream.set(Some(&stream)); + + let global = &*self.global(); + let rooted_default_controller = DomRoot::from_ref(self); + + // Perform ! ResetQueue(controller). + // Set controller.[[started]], controller.[[closeRequested]], + // controller.[[pullAgain]], and controller.[[pulling]] to false. + // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm + // and controller.[[strategyHWM]] to highWaterMark. + // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm + // and controller.[[strategyHWM]] to highWaterMark. + // Set controller.[[cancelAlgorithm]] to cancelAlgorithm. + + // Note: the above steps are done in `new`. + + // Set stream.[[controller]] to controller. + stream.set_default_controller(&rooted_default_controller); + + if let Some(underlying_source) = rooted_default_controller.underlying_source.get() { + // Let startResult be the result of performing startAlgorithm. (This might throw an exception.) + let start_result = underlying_source + .call_start_algorithm( + Controller::ReadableStreamDefaultController(rooted_default_controller.clone()), + can_gc, + ) + .unwrap_or_else(|| { + let promise = Promise::new(global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }); + + // Let startPromise be a promise resolved with startResult. + let start_promise = start_result?; + + // Upon fulfillment of startPromise, + let fulfillment_handler = Box::new(StartAlgorithmFulfillmentHandler { + controller: Trusted::new(&*rooted_default_controller), + }); + + // Upon rejection of startPromise with reason r, + let rejection_handler = Box::new(StartAlgorithmRejectionHandler { + controller: Trusted::new(&*rooted_default_controller), + }); + let handler = PromiseNativeHandler::new( + global, + Some(fulfillment_handler), + Some(rejection_handler), + ); + let realm = enter_realm(global); + let comp = InRealm::Entered(&realm); + start_promise.append_native_handler(&handler, comp, can_gc); + }; + + Ok(()) + } + + /// Setting the JS object after the heap has settled down. + pub fn set_underlying_source_this_object(&self, this_object: HandleObject) { + if let Some(underlying_source) = self.underlying_source.get() { + underlying_source.set_underlying_source_this_object(this_object); + } + } + + /// <https://streams.spec.whatwg.org/#dequeue-value> + #[allow(crown::unrooted_must_root)] + fn dequeue_value(&self) -> EnqueuedValue { + let mut queue = self.queue.borrow_mut(); + queue.dequeue_value() + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull> + fn should_call_pull(&self) -> bool { + // Let stream be controller.[[stream]]. + // Note: the spec does not assert that stream is not undefined here, + // so we return false if it is. + let Some(stream) = self.stream.get() else { + debug!("`should_call_pull` called on a controller without a stream."); + return false; + }; + + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return. + if !self.can_close_or_enqueue() { + return false; + } + + // If controller.[[started]] is false, return false. + if !self.started.get() { + return false; + } + + // If ! IsReadableStreamLocked(stream) is true + // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true. + if stream.is_locked() && stream.get_num_read_requests() > 0 { + return true; + } + + // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller). + // Assert: desiredSize is not null. + let desired_size = self.get_desired_size().expect("desiredSize is not null."); + + if desired_size > 0. { + return true; + } + + false + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> + #[allow(unsafe_code)] + fn call_pull_if_needed(&self, can_gc: CanGc) { + if !self.should_call_pull() { + return; + } + + // If controller.[[pulling]] is true, + if self.pulling.get() { + // Set controller.[[pullAgain]] to true. + self.pull_again.set(true); + + return; + } + + // Set controller.[[pulling]] to true. + self.pulling.set(true); + + // Let pullPromise be the result of performing controller.[[pullAlgorithm]]. + // Continues into the resolve and reject handling of the native handler. + let global = self.global(); + let rooted_default_controller = DomRoot::from_ref(self); + let controller = + Controller::ReadableStreamDefaultController(rooted_default_controller.clone()); + + let Some(underlying_source) = self.underlying_source.get() else { + return; + }; + + let fulfillment_handler = Box::new(PullAlgorithmFulfillmentHandler { + controller: Trusted::new(&*rooted_default_controller), + }); + let rejection_handler = Box::new(PullAlgorithmRejectionHandler { + controller: Trusted::new(&*rooted_default_controller), + }); + let handler = + PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler)); + + let realm = enter_realm(&*global); + let comp = InRealm::Entered(&realm); + let result = underlying_source + .call_pull_algorithm(controller, can_gc) + .unwrap_or_else(|| { + let promise = Promise::new(&global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }); + let promise = result.unwrap_or_else(|error| { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + // TODO: check if `self.global()` is the right globalscope. + unsafe { + error + .clone() + .to_jsval(*cx, &self.global(), rval.handle_mut()) + }; + let promise = Promise::new(&global, can_gc); + promise.reject_native(&rval.handle()); + promise + }); + promise.append_native_handler(&handler, comp, can_gc); + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel> + #[allow(unsafe_code)] + pub fn perform_cancel_steps(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> { + // Perform ! ResetQueue(this). + self.queue.borrow_mut().reset(); + + let underlying_source = self + .underlying_source + .get() + .expect("Controller should have a source when the cancel steps are called into."); + let global = self.global(); + + // Let result be the result of performing this.[[cancelAlgorithm]], passing reason. + let result = underlying_source + .call_cancel_algorithm(reason, can_gc) + .unwrap_or_else(|| { + let promise = Promise::new(&global, can_gc); + promise.resolve_native(&()); + Ok(promise) + }); + let promise = result.unwrap_or_else(|error| { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + // TODO: check if `self.global()` is the right globalscope. + unsafe { + error + .clone() + .to_jsval(*cx, &self.global(), rval.handle_mut()) + }; + let promise = Promise::new(&global, can_gc); + promise.reject_native(&rval.handle()); + promise + }); + + // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this). + self.clear_algorithms(); + + // Return result(the promise). + promise + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull> + #[allow(crown::unrooted_must_root)] + pub fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) { + // Let stream be this.[[stream]]. + // Note: the spec does not assert that there is a stream. + let Some(stream) = self.stream.get() else { + return; + }; + + // if queue contains bytes, perform chunk steps. + if !self.queue.borrow().is_empty() { + // Rooting: chunk will be tied to a rooted value + // before calling into a function that can GC. + let chunk = self.dequeue_value(); + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + let result = RootedTraceableBox::new(Heap::default()); + chunk.to_jsval(cx, rval.handle_mut()); + result.set(*rval); + + // If this.[[closeRequested]] is true and this.[[queue]] is empty + if self.close_requested.get() && self.queue.borrow().is_empty() { + // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + // Perform ! ReadableStreamClose(stream). + stream.close(); + } else { + // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this). + self.call_pull_if_needed(can_gc); + } + // Perform readRequest’s chunk steps, given chunk. + read_request.chunk_steps(result); + } else { + // Perform ! ReadableStreamAddReadRequest(stream, readRequest). + stream.add_read_request(read_request); + + // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this). + self.call_pull_if_needed(can_gc); + } + } + + /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps> + pub fn perform_release_steps(&self) { + // step 1 - Return. + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue> + #[allow(unsafe_code)] + pub fn enqueue( + &self, + cx: SafeJSContext, + chunk: SafeHandleValue, + can_gc: CanGc, + ) -> Result<(), Error> { + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return. + if !self.can_close_or_enqueue() { + return Ok(()); + } + + let stream = self + .stream + .get() + .expect("Controller must have a stream when a chunk is enqueued."); + + // If ! IsReadableStreamLocked(stream) is true + // and ! ReadableStreamGetNumReadRequests(stream) > 0, + // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false). + if stream.is_locked() && stream.get_num_read_requests() > 0 { + stream.fulfill_read_request(chunk, false); + } else { + // Otherwise, + // Let result be the result of performing controller.[[strategySizeAlgorithm]], + // passing in chunk, and interpreting the result as a completion record. + // Note: the clone is necessary to prevent potential re-borrow panics. + let strategy_size = { + let reference = self.strategy_size.borrow(); + reference.clone() + }; + let size = if let Some(strategy_size) = strategy_size { + // Note: the Rethrow exception handling is necessary, + // otherwise returning JSFailed will panic because no exception is pending. + let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow); + match result { + // Let chunkSize be result.[[Value]]. + Ok(size) => size, + Err(error) => { + // If result is an abrupt completion, + rooted!(in(*cx) let mut rval = UndefinedValue()); + unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) }; + + // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]). + self.error(rval.handle()); + + // Return result. + // Note: we need to return a type error, because no exception is pending. + return Err(error); + }, + } + } else { + 0. + }; + + { + // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize). + let res = { + let mut queue = self.queue.borrow_mut(); + queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize { + value: Heap::boxed(chunk.get()), + size, + })) + }; + if let Err(error) = res { + // If enqueueResult is an abrupt completion, + + // First, throw the exception. + // Note: this must be done manually here, + // because `enqueue_value_with_size` does not call into JS. + throw_dom_exception(cx, &self.global(), error); + + // Then, get a handle to the JS val for the exception, + // and use that to error the stream. + rooted!(in(*cx) let mut rval = UndefinedValue()); + unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) }; + + // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]). + self.error(rval.handle()); + + // Return enqueueResult. + // Note: because we threw the exception above, + // there is a pending exception and we can return JSFailed. + return Err(Error::JSFailed); + } + } + } + + // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller). + self.call_pull_if_needed(can_gc); + + Ok(()) + } + + /// Native call to + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue> + #[allow(crown::unrooted_must_root)] + pub fn enqueue_native(&self, chunk: Vec<u8>) { + let stream = self + .stream + .get() + .expect("Controller must have a stream when a chunk is enqueued."); + if stream.is_locked() && stream.get_num_read_requests() > 0 { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + let enqueued_chunk = EnqueuedValue::Native(chunk.into_boxed_slice()); + enqueued_chunk.to_jsval(cx, rval.handle_mut()); + stream.fulfill_read_request(rval.handle(), false); + } else { + let mut queue = self.queue.borrow_mut(); + queue + .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice())) + .expect("Enqueuing a chunk from Rust should not fail."); + } + } + + /// Does the stream have all data in memory? + pub fn in_memory(&self) -> bool { + let Some(underlying_source) = self.underlying_source.get() else { + return false; + }; + underlying_source.in_memory() + } + + /// Return bytes synchronously if the stream has all data in memory. + pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> { + let underlying_source = self.underlying_source.get()?; + if underlying_source.in_memory() { + return self.queue.borrow().get_in_memory_bytes(); + } + None + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms> + fn clear_algorithms(&self) { + // Set controller.[[pullAlgorithm]] to undefined. + // Set controller.[[cancelAlgorithm]] to undefined. + self.underlying_source.set(None); + + // Set controller.[[strategySizeAlgorithm]] to undefined. + *self.strategy_size.borrow_mut() = None; + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close> + pub fn close(&self) { + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return. + if !self.can_close_or_enqueue() { + return; + } + + let Some(stream) = self.stream.get() else { + return; + }; + + // Set controller.[[closeRequested]] to true. + self.close_requested.set(true); + + if self.queue.borrow().is_empty() { + // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + // Perform ! ReadableStreamClose(stream). + stream.close(); + } + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size> + fn get_desired_size(&self) -> Option<f64> { + let stream = self.stream.get()?; + + // If state is "errored", return null. + if stream.is_errored() { + return None; + } + + // If state is "closed", return 0. + if stream.is_closed() { + return Some(0.0); + } + + // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]]. + let queue = self.queue.borrow(); + let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX); + Some(desired_size.clamp(desired_size, self.strategy_hwm)) + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue> + fn can_close_or_enqueue(&self) -> bool { + let Some(stream) = self.stream.get() else { + return false; + }; + + // If controller.[[closeRequested]] is false and state is "readable", return true. + if !self.close_requested.get() && stream.is_readable() { + return true; + } + + // Otherwise, return false. + false + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error> + pub fn error(&self, e: SafeHandleValue) { + let Some(stream) = self.stream.get() else { + return; + }; + + // If stream.[[state]] is not "readable", return. + if !stream.is_readable() { + return; + } + + // Perform ! ResetQueue(controller). + self.queue.borrow_mut().reset(); + + // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller). + self.clear_algorithms(); + + stream.error(e); + } +} + +impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder> + for ReadableStreamDefaultController +{ + /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size> + fn GetDesiredSize(&self) -> Option<f64> { + self.get_desired_size() + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-close> + fn Close(&self) -> Fallible<()> { + if !self.can_close_or_enqueue() { + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, + // throw a TypeError exception. + return Err(Error::Type("Stream cannot be closed.".to_string())); + } + + // Perform ! ReadableStreamDefaultControllerClose(this). + self.close(); + + Ok(()) + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue> + fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> { + // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception. + if !self.can_close_or_enqueue() { + return Err(Error::Type("Stream cannot be enqueued to.".to_string())); + } + + // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk). + self.enqueue(cx, chunk, can_gc) + } + + /// <https://streams.spec.whatwg.org/#rs-default-controller-error> + fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue) -> Fallible<()> { + self.error(e); + Ok(()) + } +} diff --git a/components/script/dom/readablestreamdefaultreader.rs b/components/script/dom/readablestreamdefaultreader.rs new file mode 100644 index 00000000000..0508c363ac9 --- /dev/null +++ b/components/script/dom/readablestreamdefaultreader.rs @@ -0,0 +1,530 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::cell::Cell; +use std::collections::VecDeque; +use std::mem; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::Heap; +use js::jsval::{JSVal, UndefinedValue}; +use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue}; + +use super::bindings::root::MutNullableDom; +use super::types::ReadableStreamDefaultController; +use crate::dom::bindings::cell::DomRefCell; +use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{ + ReadableStreamDefaultReaderMethods, ReadableStreamReadResult, +}; +use crate::dom::bindings::error::Error; +use crate::dom::bindings::import::module::Fallible; +use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, DomObject, Reflector}; +use crate::dom::bindings::root::{Dom, DomRoot}; +use crate::dom::bindings::trace::RootedTraceableBox; +use crate::dom::defaultteereadrequest::DefaultTeeReadRequest; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; +use crate::dom::readablestream::ReadableStream; +use crate::realms::{enter_realm, InRealm}; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; + +/// <https://streams.spec.whatwg.org/#read-request> +#[derive(Clone, JSTraceable)] +#[crown::unrooted_must_root_lint::must_root] +pub enum ReadRequest { + /// <https://streams.spec.whatwg.org/#default-reader-read> + Read(Rc<Promise>), + /// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2> + DefaultTee { + tee_read_request: Dom<DefaultTeeReadRequest>, + }, +} + +impl ReadRequest { + /// <https://streams.spec.whatwg.org/#read-request-chunk-steps> + pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) { + match self { + ReadRequest::Read(promise) => { + promise.resolve_native(&ReadableStreamReadResult { + done: Some(false), + value: chunk, + }); + }, + ReadRequest::DefaultTee { tee_read_request } => { + tee_read_request.enqueue_chunk_steps(chunk); + }, + } + } + + /// <https://streams.spec.whatwg.org/#read-request-close-steps> + pub fn close_steps(&self) { + match self { + ReadRequest::Read(promise) => { + let result = RootedTraceableBox::new(Heap::default()); + result.set(UndefinedValue()); + promise.resolve_native(&ReadableStreamReadResult { + done: Some(true), + value: result, + }); + }, + ReadRequest::DefaultTee { tee_read_request } => { + tee_read_request.close_steps(); + }, + } + } + + /// <https://streams.spec.whatwg.org/#read-request-error-steps> + pub fn error_steps(&self, e: SafeHandleValue) { + match self { + ReadRequest::Read(promise) => promise.reject_native(&e), + ReadRequest::DefaultTee { tee_read_request } => { + tee_read_request.error_steps(); + }, + } + } +} + +/// The rejection handler for +/// <https://streams.spec.whatwg.org/#readable-stream-tee> +#[derive(Clone, JSTraceable, MallocSizeOf)] +#[crown::unrooted_must_root_lint::must_root] +struct ClosedPromiseRejectionHandler { + branch_1_controller: Dom<ReadableStreamDefaultController>, + branch_2_controller: Dom<ReadableStreamDefaultController>, + #[ignore_malloc_size_of = "Rc"] + canceled_1: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + canceled_2: Rc<Cell<bool>>, + #[ignore_malloc_size_of = "Rc"] + cancel_promise: Rc<Promise>, +} + +impl Callback for ClosedPromiseRejectionHandler { + /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed> + /// Upon rejection of reader.[[closedPromise]] with reason r, + fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) { + let branch_1_controller = &self.branch_1_controller; + let branch_2_controller = &self.branch_2_controller; + + // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], r). + branch_1_controller.error(v); + // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], r). + branch_2_controller.error(v); + + // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined. + if !self.canceled_1.get() || !self.canceled_2.get() { + self.cancel_promise.resolve_native(&()); + } + } +} + +/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader> +#[dom_struct] +pub struct ReadableStreamDefaultReader { + reflector_: Reflector, + + /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream> + stream: MutNullableDom<ReadableStream>, + + #[ignore_malloc_size_of = "no VecDeque support"] + read_requests: DomRefCell<VecDeque<ReadRequest>>, + + /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise> + #[ignore_malloc_size_of = "Rc is hard"] + closed_promise: DomRefCell<Rc<Promise>>, +} + +impl ReadableStreamDefaultReader { + /// <https://streams.spec.whatwg.org/#default-reader-constructor> + #[allow(non_snake_case)] + pub fn Constructor( + global: &GlobalScope, + proto: Option<SafeHandleObject>, + can_gc: CanGc, + stream: &ReadableStream, + ) -> Fallible<DomRoot<Self>> { + let reader = Self::new_with_proto(global, proto, can_gc); + + // Perform ? SetUpReadableStreamDefaultReader(this, stream). + Self::set_up(&reader, stream, global, can_gc)?; + + Ok(reader) + } + + fn new_with_proto( + global: &GlobalScope, + proto: Option<SafeHandleObject>, + can_gc: CanGc, + ) -> DomRoot<ReadableStreamDefaultReader> { + reflect_dom_object_with_proto( + Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)), + global, + proto, + can_gc, + ) + } + + pub fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader { + ReadableStreamDefaultReader { + reflector_: Reflector::new(), + stream: MutNullableDom::new(None), + read_requests: DomRefCell::new(Default::default()), + closed_promise: DomRefCell::new(Promise::new(global, can_gc)), + } + } + + /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader> + pub fn set_up( + &self, + stream: &ReadableStream, + global: &GlobalScope, + can_gc: CanGc, + ) -> Fallible<()> { + // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception. + if stream.is_locked() { + return Err(Error::Type("stream is locked".to_owned())); + } + // Perform ! ReadableStreamReaderGenericInitialize(reader, stream). + + self.generic_initialize(global, stream, can_gc)?; + + // Set reader.[[readRequests]] to a new empty list. + self.read_requests.borrow_mut().clear(); + + Ok(()) + } + + /// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-initialize> + pub fn generic_initialize( + &self, + global: &GlobalScope, + stream: &ReadableStream, + can_gc: CanGc, + ) -> Fallible<()> { + // Set reader.[[stream]] to stream. + self.stream.set(Some(stream)); + + // Set stream.[[reader]] to reader. + stream.set_reader(Some(self)); + + if stream.is_readable() { + // If stream.[[state]] is "readable + // Set reader.[[closedPromise]] to a new promise. + *self.closed_promise.borrow_mut() = Promise::new(global, can_gc); + } else if stream.is_closed() { + // Otherwise, if stream.[[state]] is "closed", + // Set reader.[[closedPromise]] to a promise resolved with undefined. + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut rval = UndefinedValue()); + *self.closed_promise.borrow_mut() = Promise::new_resolved(global, cx, rval.handle())?; + } else { + // Assert: stream.[[state]] is "errored" + assert!(stream.is_errored()); + + // Set reader.[[closedPromise]] to a promise rejected with stream.[[storedError]]. + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut error = UndefinedValue()); + stream.get_stored_error(error.handle_mut()); + *self.closed_promise.borrow_mut() = Promise::new_rejected(global, cx, error.handle())?; + + // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true + self.closed_promise.borrow().set_promise_is_handled(); + } + + Ok(()) + } + + /// <https://streams.spec.whatwg.org/#readable-stream-close> + #[allow(crown::unrooted_must_root)] + pub fn close(&self) { + // Resolve reader.[[closedPromise]] with undefined. + self.closed_promise.borrow().resolve_native(&()); + // If reader implements ReadableStreamDefaultReader, + // Let readRequests be reader.[[readRequests]]. + let mut read_requests = self.take_read_requests(); + // Set reader.[[readRequests]] to an empty list. + // For each readRequest of readRequests, + for request in read_requests.drain(0..) { + // Perform readRequest’s close steps. + request.close_steps(); + } + } + + /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request> + pub fn add_read_request(&self, read_request: &ReadRequest) { + self.read_requests + .borrow_mut() + .push_back(read_request.clone()); + } + + /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests> + pub fn get_num_read_requests(&self) -> usize { + self.read_requests.borrow().len() + } + + /// <https://streams.spec.whatwg.org/#readable-stream-error> + pub fn error(&self, e: SafeHandleValue) { + // Reject reader.[[closedPromise]] with e. + self.closed_promise.borrow().reject_native(&e); + + // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true. + self.closed_promise.borrow().set_promise_is_handled(); + + // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e). + self.error_read_requests(e); + } + + /// The removal steps of <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request> + #[allow(crown::unrooted_must_root)] + pub fn remove_read_request(&self) -> ReadRequest { + self.read_requests + .borrow_mut() + .pop_front() + .expect("Reader must have read request when remove is called into.") + } + + /// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-release> + #[allow(unsafe_code)] + pub fn generic_release(&self) { + // Let stream be reader.[[stream]]. + + // Assert: stream is not undefined. + assert!(self.stream.get().is_some()); + + if let Some(stream) = self.stream.get() { + // Assert: stream.[[reader]] is reader. + assert!(stream.has_default_reader()); + + if stream.is_readable() { + // If stream.[[state]] is "readable", reject reader.[[closedPromise]] with a TypeError exception. + self.closed_promise + .borrow() + .reject_error(Error::Type("stream state is not readable".to_owned())); + } else { + // Otherwise, set reader.[[closedPromise]] to a promise rejected with a TypeError exception. + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut error = UndefinedValue()); + unsafe { + Error::Type("Cannot release lock due to stream state.".to_owned()).to_jsval( + *cx, + &self.global(), + error.handle_mut(), + ) + }; + + *self.closed_promise.borrow_mut() = + Promise::new_rejected(&self.global(), cx, error.handle()).unwrap(); + } + // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true. + self.closed_promise.borrow().set_promise_is_handled(); + + // Perform ! stream.[[controller]].[[ReleaseSteps]](). + stream.perform_release_steps(); + + // Set stream.[[reader]] to undefined. + stream.set_reader(None); + // Set reader.[[stream]] to undefined. + self.stream.set(None); + } + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease> + #[allow(unsafe_code)] + pub fn release(&self) { + // Perform ! ReadableStreamReaderGenericRelease(reader). + self.generic_release(); + // Let e be a new TypeError exception. + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut error = UndefinedValue()); + unsafe { + Error::Type("Reader is released".to_owned()).to_jsval( + *cx, + &self.global(), + error.handle_mut(), + ) + }; + + // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e). + self.error_read_requests(error.handle()); + } + + #[allow(crown::unrooted_must_root)] + fn take_read_requests(&self) -> VecDeque<ReadRequest> { + mem::take(&mut *self.read_requests.borrow_mut()) + } + + /// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-cancel> + fn generic_cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> { + // Let stream be reader.[[stream]]. + let stream = self.stream.get(); + + // Assert: stream is not undefined. + let stream = + stream.expect("Reader should have a stream when generic cancel is called into."); + + // Return ! ReadableStreamCancel(stream, reason). + stream.cancel(reason, can_gc) + } + + /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreadererrorreadrequests> + #[allow(crown::unrooted_must_root)] + fn error_read_requests(&self, rval: SafeHandleValue) { + // step 1 + let mut read_requests = self.take_read_requests(); + + // step 2 & 3 + for request in read_requests.drain(0..) { + request.error_steps(rval); + } + } + + /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read> + pub fn read(&self, read_request: &ReadRequest, can_gc: CanGc) { + // Let stream be reader.[[stream]]. + + // Assert: stream is not undefined. + assert!(self.stream.get().is_some()); + + let stream = self.stream.get().unwrap(); + + // Set stream.[[disturbed]] to true. + stream.set_is_disturbed(true); + // If stream.[[state]] is "closed", perform readRequest’s close steps. + if stream.is_closed() { + read_request.close_steps(); + } else if stream.is_errored() { + // Otherwise, if stream.[[state]] is "errored", + // perform readRequest’s error steps given stream.[[storedError]]. + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut error = UndefinedValue()); + stream.get_stored_error(error.handle_mut()); + read_request.error_steps(error.handle()); + } else { + // Otherwise + // Assert: stream.[[state]] is "readable". + assert!(stream.is_readable()); + // Perform ! stream.[[controller]].[[PullSteps]](readRequest). + stream.perform_pull_steps(read_request, can_gc); + } + } + + /// <https://streams.spec.whatwg.org/#ref-for-readablestreamgenericreader-closedpromise%E2%91%A1> + pub fn append_native_handler_to_closed_promise( + &self, + branch_1: &ReadableStream, + branch_2: &ReadableStream, + canceled_1: Rc<Cell<bool>>, + canceled_2: Rc<Cell<bool>>, + cancel_promise: Rc<Promise>, + can_gc: CanGc, + ) { + let branch_1_controller = branch_1.get_default_controller(); + + let branch_2_controller = branch_2.get_default_controller(); + + let global = self.global(); + let handler = PromiseNativeHandler::new( + &global, + None, + Some(Box::new(ClosedPromiseRejectionHandler { + branch_1_controller: Dom::from_ref(&branch_1_controller), + branch_2_controller: Dom::from_ref(&branch_2_controller), + canceled_1, + canceled_2, + cancel_promise, + })), + ); + + let realm = enter_realm(&*global); + let comp = InRealm::Entered(&realm); + + self.closed_promise + .borrow() + .append_native_handler(&handler, comp, can_gc); + } +} + +impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader { + /// <https://streams.spec.whatwg.org/#default-reader-constructor> + fn Constructor( + global: &GlobalScope, + proto: Option<SafeHandleObject>, + can_gc: CanGc, + stream: &ReadableStream, + ) -> Fallible<DomRoot<Self>> { + ReadableStreamDefaultReader::Constructor(global, proto, can_gc, stream) + } + + /// <https://streams.spec.whatwg.org/#default-reader-read> + #[allow(unsafe_code)] + #[allow(crown::unrooted_must_root)] + fn Read(&self, can_gc: CanGc) -> Rc<Promise> { + // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception. + if self.stream.get().is_none() { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut error = UndefinedValue()); + unsafe { + Error::Type("stream is undefined".to_owned()).to_jsval( + *cx, + &self.global(), + error.handle_mut(), + ) + }; + return Promise::new_rejected(&self.global(), cx, error.handle()).unwrap(); + } + // Let promise be a new promise. + let promise = Promise::new(&self.reflector_.global(), can_gc); + + // Let readRequest be a new read request with the following items: + // chunk steps, given chunk + // Resolve promise with «[ "value" → chunk, "done" → false ]». + // + // close steps + // Resolve promise with «[ "value" → undefined, "done" → true ]». + // + // error steps, given e + // Reject promise with e. + + // Rooting(unrooted_must_root): the read request contains only a promise, + // which does not need to be rooted, + // as it is safely managed natively via an Rc. + let read_request = ReadRequest::Read(promise.clone()); + + // Perform ! ReadableStreamDefaultReaderRead(this, readRequest). + self.read(&read_request, can_gc); + + // Return promise. + promise + } + + /// <https://streams.spec.whatwg.org/#default-reader-release-lock> + fn ReleaseLock(&self) { + if self.stream.get().is_some() { + // step 2 - Perform ! ReadableStreamDefaultReaderRelease(this). + self.release(); + } + // step 1 - If this.[[stream]] is undefined, return. + } + + /// <https://streams.spec.whatwg.org/#generic-reader-closed> + fn Closed(&self) -> Rc<Promise> { + self.closed_promise.borrow().clone() + } + + /// <https://streams.spec.whatwg.org/#generic-reader-cancel> + fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> { + if self.stream.get().is_none() { + // If this.[[stream]] is undefined, + // return a promise rejected with a TypeError exception. + let promise = Promise::new(&self.reflector_.global(), can_gc); + promise.reject_error(Error::Type("stream is undefined".to_owned())); + promise + } else { + // Return ! ReadableStreamReaderGenericCancel(this, reason). + self.generic_cancel(reason, can_gc) + } + } +} diff --git a/components/script/dom/response.rs b/components/script/dom/response.rs index 83c94e91874..49081868777 100644 --- a/components/script/dom/response.rs +++ b/components/script/dom/response.rs @@ -30,7 +30,8 @@ use crate::dom::bindings::str::{ByteString, USVString}; use crate::dom::globalscope::GlobalScope; use crate::dom::headers::{is_obs_text, is_vchar, Guard, Headers}; use crate::dom::promise::Promise; -use crate::dom::readablestream::{ExternalUnderlyingSource, ReadableStream}; +use crate::dom::readablestream::ReadableStream; +use crate::dom::underlyingsourcecontainer::UnderlyingSourceType; use crate::script_runtime::{CanGc, JSContext as SafeJSContext, StreamConsumer}; #[dom_struct] @@ -53,10 +54,11 @@ pub struct Response { #[allow(non_snake_case)] impl Response { - pub fn new_inherited(global: &GlobalScope) -> Response { + pub fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> Response { let stream = ReadableStream::new_with_external_underlying_source( global, - ExternalUnderlyingSource::FetchResponse, + UnderlyingSourceType::FetchResponse, + can_gc, ); Response { reflector_: Reflector::new(), @@ -82,7 +84,7 @@ impl Response { can_gc: CanGc, ) -> DomRoot<Response> { reflect_dom_object_with_proto( - Box::new(Response::new_inherited(global)), + Box::new(Response::new_inherited(global, can_gc)), global, proto, can_gc, @@ -444,19 +446,19 @@ impl Response { *self.stream_consumer.borrow_mut() = sc; } - pub fn stream_chunk(&self, chunk: Vec<u8>, can_gc: CanGc) { + pub fn stream_chunk(&self, chunk: Vec<u8>) { // Note, are these two actually mutually exclusive? if let Some(stream_consumer) = self.stream_consumer.borrow().as_ref() { stream_consumer.consume_chunk(chunk.as_slice()); } else if let Some(body) = self.body_stream.get() { - body.enqueue_native(chunk, can_gc); + body.enqueue_native(chunk); } } #[allow(crown::unrooted_must_root)] pub fn finish(&self) { if let Some(body) = self.body_stream.get() { - body.close_native(); + body.controller_close_native(); } let stream_consumer = self.stream_consumer.borrow_mut().take(); if let Some(stream_consumer) = stream_consumer { diff --git a/components/script/dom/underlyingsourcecontainer.rs b/components/script/dom/underlyingsourcecontainer.rs new file mode 100644 index 00000000000..06db74fb4bd --- /dev/null +++ b/components/script/dom/underlyingsourcecontainer.rs @@ -0,0 +1,225 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::ptr; +use std::rc::Rc; + +use dom_struct::dom_struct; +use js::jsapi::{Heap, IsPromiseObject, JSObject}; +use js::jsval::JSVal; +use js::rust::{Handle as SafeHandle, HandleObject, HandleValue as SafeHandleValue, IntoHandle}; + +use crate::dom::bindings::callback::ExceptionHandling; +use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource; +use crate::dom::bindings::import::module::Error; +use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller; +use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, DomObject, Reflector}; +use crate::dom::bindings::root::{Dom, DomRoot}; +use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::script_runtime::CanGc; + +/// <https://streams.spec.whatwg.org/#underlying-source-api> +/// The `Js` variant corresponds to +/// the JavaScript object representing the underlying source. +/// The other variants are native sources in Rust. +#[derive(JSTraceable)] +#[crown::unrooted_must_root_lint::must_root] +pub enum UnderlyingSourceType { + /// Facilitate partial integration with sources + /// that are currently read into memory. + Memory(usize), + /// A blob as underlying source, with a known total size. + Blob(usize), + /// A fetch response as underlying source. + FetchResponse, + /// A struct representing a JS object as underlying source, + /// and the actual JS object for use as `thisArg` in callbacks. + Js(JsUnderlyingSource, Heap<*mut JSObject>), + /// Tee + Tee(Dom<DefaultTeeUnderlyingSource>), +} + +impl UnderlyingSourceType { + /// Is the source backed by a Rust native source? + pub fn is_native(&self) -> bool { + matches!( + self, + UnderlyingSourceType::Memory(_) | + UnderlyingSourceType::Blob(_) | + UnderlyingSourceType::FetchResponse + ) + } + + /// Does the source have all data in memory? + pub fn in_memory(&self) -> bool { + matches!(self, UnderlyingSourceType::Memory(_)) + } +} + +/// Wrapper around the underlying source. +#[dom_struct] +pub struct UnderlyingSourceContainer { + reflector_: Reflector, + #[ignore_malloc_size_of = "JsUnderlyingSource implemented in SM."] + underlying_source_type: UnderlyingSourceType, +} + +impl UnderlyingSourceContainer { + #[allow(crown::unrooted_must_root)] + fn new_inherited(underlying_source_type: UnderlyingSourceType) -> UnderlyingSourceContainer { + UnderlyingSourceContainer { + reflector_: Reflector::new(), + underlying_source_type, + } + } + + #[allow(crown::unrooted_must_root)] + pub fn new( + global: &GlobalScope, + underlying_source_type: UnderlyingSourceType, + can_gc: CanGc, + ) -> DomRoot<UnderlyingSourceContainer> { + // TODO: setting the underlying source dict as the prototype of the + // `UnderlyingSourceContainer`, as it is later used as the "this" in Call_. + // Is this a good idea? + reflect_dom_object_with_proto( + Box::new(UnderlyingSourceContainer::new_inherited( + underlying_source_type, + )), + global, + None, + can_gc, + ) + } + + /// Setting the JS object after the heap has settled down. + pub fn set_underlying_source_this_object(&self, object: HandleObject) { + if let UnderlyingSourceType::Js(_source, this_obj) = &self.underlying_source_type { + this_obj.set(*object); + } + } + + /// <https://streams.spec.whatwg.org/#dom-underlyingsource-cancel> + #[allow(unsafe_code)] + pub fn call_cancel_algorithm( + &self, + reason: SafeHandleValue, + can_gc: CanGc, + ) -> Option<Result<Rc<Promise>, Error>> { + match &self.underlying_source_type { + UnderlyingSourceType::Js(source, this_obj) => { + if let Some(algo) = &source.cancel { + let result = unsafe { + algo.Call_( + &SafeHandle::from_raw(this_obj.handle()), + Some(reason), + ExceptionHandling::Rethrow, + ) + }; + return Some(result); + } + None + }, + UnderlyingSourceType::Tee(tee_underlyin_source) => { + // Call the cancel algorithm for the appropriate branch. + tee_underlyin_source.cancel_algorithm(reason, can_gc) + }, + _ => None, + } + } + + /// <https://streams.spec.whatwg.org/#dom-underlyingsource-pull> + #[allow(unsafe_code)] + pub fn call_pull_algorithm( + &self, + controller: Controller, + can_gc: CanGc, + ) -> Option<Result<Rc<Promise>, Error>> { + match &self.underlying_source_type { + UnderlyingSourceType::Js(source, this_obj) => { + if let Some(algo) = &source.pull { + let result = unsafe { + algo.Call_( + &SafeHandle::from_raw(this_obj.handle()), + controller, + ExceptionHandling::Rethrow, + ) + }; + return Some(result); + } + None + }, + UnderlyingSourceType::Tee(tee_underlyin_source) => { + // Call the pull algorithm for the appropriate branch. + tee_underlyin_source.pull_algorithm(can_gc) + }, + // Note: other source type have no pull steps for now. + _ => None, + } + } + + /// <https://streams.spec.whatwg.org/#dom-underlyingsource-start> + /// + /// Note: The algorithm can return any value, including a promise, + /// we always transform the result into a promise for convenience, + /// and it is also how to spec deals with the situation. + /// see "Let startPromise be a promise resolved with startResult." + /// at <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller> + #[allow(unsafe_code)] + pub fn call_start_algorithm( + &self, + controller: Controller, + can_gc: CanGc, + ) -> Option<Result<Rc<Promise>, Error>> { + match &self.underlying_source_type { + UnderlyingSourceType::Js(source, this_obj) => { + if let Some(start) = &source.start { + let cx = GlobalScope::get_cx(); + rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>()); + rooted!(in(*cx) let mut result: JSVal); + unsafe { + if let Err(error) = start.Call_( + &SafeHandle::from_raw(this_obj.handle()), + controller, + result.handle_mut(), + ExceptionHandling::Rethrow, + ) { + return Some(Err(error)); + } + } + let is_promise = unsafe { + if result.is_object() { + result_object.set(result.to_object()); + IsPromiseObject(result_object.handle().into_handle()) + } else { + false + } + }; + let promise = if is_promise { + let promise = Promise::new_with_js_promise(result_object.handle(), cx); + promise + } else { + let promise = Promise::new(&self.global(), can_gc); + promise.resolve_native(&result.get()); + promise + }; + return Some(Ok(promise)); + } + None + }, + UnderlyingSourceType::Tee(_) => { + // Let startAlgorithm be an algorithm that returns undefined. + None + }, + _ => None, + } + } + + /// Does the source have all data in memory? + pub fn in_memory(&self) -> bool { + self.underlying_source_type.in_memory() + } +} diff --git a/components/script/dom/webidls/QueuingStrategy.webidl b/components/script/dom/webidls/QueuingStrategy.webidl new file mode 100644 index 00000000000..66870289de9 --- /dev/null +++ b/components/script/dom/webidls/QueuingStrategy.webidl @@ -0,0 +1,34 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// https://streams.spec.whatwg.org/#qs + +dictionary QueuingStrategy { + unrestricted double highWaterMark; + QueuingStrategySize size; +}; + +callback QueuingStrategySize = unrestricted double (any chunk); + +dictionary QueuingStrategyInit { + required unrestricted double highWaterMark; +}; + +[Exposed=*] +interface ByteLengthQueuingStrategy { + constructor(QueuingStrategyInit init); + + readonly attribute unrestricted double highWaterMark; + [Throws] + readonly attribute Function size; +}; + +[Exposed=*] +interface CountQueuingStrategy { + constructor(QueuingStrategyInit init); + + readonly attribute unrestricted double highWaterMark; + [Throws] + readonly attribute Function size; +}; diff --git a/components/script/dom/webidls/ReadableByteStreamController.webidl b/components/script/dom/webidls/ReadableByteStreamController.webidl new file mode 100644 index 00000000000..2734bd1e4c2 --- /dev/null +++ b/components/script/dom/webidls/ReadableByteStreamController.webidl @@ -0,0 +1,19 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// https://streams.spec.whatwg.org/#rbs-controller-class-definition + +[Exposed=*] +interface ReadableByteStreamController { + [Throws] // Throws on OOM + readonly attribute ReadableStreamBYOBRequest? byobRequest; + readonly attribute unrestricted double? desiredSize; + + [Throws] + undefined close(); + [Throws] + undefined enqueue(ArrayBufferView chunk); + [Throws] + undefined error(optional any e); +}; diff --git a/components/script/dom/webidls/ReadableStream.webidl b/components/script/dom/webidls/ReadableStream.webidl index d03212e3e9e..6378dc2ba87 100644 --- a/components/script/dom/webidls/ReadableStream.webidl +++ b/components/script/dom/webidls/ReadableStream.webidl @@ -2,10 +2,59 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -// This interface is entirely internal to Servo, and should not be accessible to -// web pages. +// https://streams.spec.whatwg.org/#readablestream -[LegacyNoInterfaceObject, Exposed=(Window,Worker)] -// Need to escape "ReadableStream" so it's treated as an identifier. +[Exposed=*] // [Transferable] - See Bug 1562065 interface _ReadableStream { + [Throws] + constructor(optional object underlyingSource, optional QueuingStrategy strategy = {}); + + // [Throws] + // static ReadableStream from(any asyncIterable); + + readonly attribute boolean locked; + + [NewObject] + Promise<undefined> cancel(optional any reason); + + [Throws] + ReadableStreamReader getReader(optional ReadableStreamGetReaderOptions options = {}); + + // [Throws] + // ReadableStream pipeThrough(ReadableWritablePair transform, optional StreamPipeOptions options = {}); + + // [NewObject] + // Promise<undefined> pipeTo(WritableStream destination, optional StreamPipeOptions options = {}); + + [Throws] + sequence<ReadableStream> tee(); + + // [GenerateReturnMethod] + // async iterable<any>(optional ReadableStreamIteratorOptions options = {}); +}; + +enum ReadableStreamType { "bytes" }; + +enum ReadableStreamReaderMode { "byob" }; + +dictionary ReadableStreamGetReaderOptions { + ReadableStreamReaderMode mode; +}; + +/* +dictionary ReadableStreamIteratorOptions { + boolean preventCancel = false; +}; + +dictionary ReadableWritablePair { + required ReadableStream readable; + required WritableStream writable; +}; + +dictionary StreamPipeOptions { + boolean preventClose = false; + boolean preventAbort = false; + boolean preventCancel = false; + AbortSignal signal; }; +*/ diff --git a/components/script/dom/webidls/ReadableStreamBYOBReader.webidl b/components/script/dom/webidls/ReadableStreamBYOBReader.webidl new file mode 100644 index 00000000000..cae85b43350 --- /dev/null +++ b/components/script/dom/webidls/ReadableStreamBYOBReader.webidl @@ -0,0 +1,19 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// https://streams.spec.whatwg.org/#byob-reader-class-definition + +[Exposed=*] +interface ReadableStreamBYOBReader { + [Throws] + constructor(ReadableStream stream); + + [NewObject] + Promise<ReadableStreamReadResult> read(ArrayBufferView view); + + [Throws] + undefined releaseLock(); +}; +ReadableStreamBYOBReader includes ReadableStreamGenericReader; + diff --git a/components/script/dom/webidls/ReadableStreamBYOBRequest.webidl b/components/script/dom/webidls/ReadableStreamBYOBRequest.webidl new file mode 100644 index 00000000000..dad656c98e1 --- /dev/null +++ b/components/script/dom/webidls/ReadableStreamBYOBRequest.webidl @@ -0,0 +1,15 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// https://streams.spec.whatwg.org/#rs-byob-request-class-definition + +[Exposed=*] +interface ReadableStreamBYOBRequest { + readonly attribute ArrayBufferView? view; + + [Throws] + undefined respond([EnforceRange] unsigned long long bytesWritten); + [Throws] + undefined respondWithNewView(ArrayBufferView view); +}; diff --git a/components/script/dom/webidls/ReadableStreamDefaultController.webidl b/components/script/dom/webidls/ReadableStreamDefaultController.webidl new file mode 100644 index 00000000000..6e3e1546dc8 --- /dev/null +++ b/components/script/dom/webidls/ReadableStreamDefaultController.webidl @@ -0,0 +1,17 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// https://streams.spec.whatwg.org/#rs-default-controller-class-definition + +[Exposed=*] +interface ReadableStreamDefaultController { + readonly attribute unrestricted double? desiredSize; + + [Throws] + undefined close(); + [Throws] + undefined enqueue(optional any chunk); + [Throws] + undefined error(optional any e); +}; diff --git a/components/script/dom/webidls/ReadableStreamDefaultReader.webidl b/components/script/dom/webidls/ReadableStreamDefaultReader.webidl new file mode 100644 index 00000000000..63593912c67 --- /dev/null +++ b/components/script/dom/webidls/ReadableStreamDefaultReader.webidl @@ -0,0 +1,47 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// https://streams.spec.whatwg.org/#generic-reader-mixin-definition +// https://streams.spec.whatwg.org/#default-reader-class-definition + +typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader) ReadableStreamReader; + +interface mixin ReadableStreamGenericReader { + readonly attribute Promise<undefined> closed; + + [NewObject] + Promise<undefined> cancel(optional any reason); +}; + +[Exposed=*] +interface ReadableStreamDefaultReader { + [Throws] + constructor(ReadableStream stream); + + [NewObject] + Promise<ReadableStreamReadResult> read(); + + undefined releaseLock(); +}; +ReadableStreamDefaultReader includes ReadableStreamGenericReader; + + +dictionary ReadableStreamReadResult { + any value; + boolean done; +}; + +// The DefaultTeeReadRequest interface is entirely internal to Servo, and should not be accessible to +// web pages. +[LegacyNoInterfaceObject, Exposed=(Window,Worker)] +// Need to escape "DefaultTeeReadRequest" so it's treated as an identifier. +interface _DefaultTeeReadRequest { +}; + +// The DefaultTeeUnderlyingSource interface is entirely internal to Servo, and should not be accessible to +// web pages. +[LegacyNoInterfaceObject, Exposed=(Window,Worker)] +// Need to escape "DefaultTeeUnderlyingSource" so it's treated as an identifier. +interface _DefaultTeeUnderlyingSource { +}; diff --git a/components/script/dom/webidls/UnderlyingSource.webidl b/components/script/dom/webidls/UnderlyingSource.webidl new file mode 100644 index 00000000000..5295129ee38 --- /dev/null +++ b/components/script/dom/webidls/UnderlyingSource.webidl @@ -0,0 +1,21 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// https://streams.spec.whatwg.org/#underlying-source-api + +[GenerateInit] +dictionary UnderlyingSource { + UnderlyingSourceStartCallback start; + UnderlyingSourcePullCallback pull; + UnderlyingSourceCancelCallback cancel; + ReadableStreamType type; + [EnforceRange] unsigned long long autoAllocateChunkSize; +}; + +typedef (ReadableStreamDefaultController or ReadableByteStreamController) ReadableStreamController; + +callback UnderlyingSourceStartCallback = any (ReadableStreamController controller); +callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller); +callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason); + diff --git a/components/script/dom/webidls/UnderlyingSourceContainer.webidl b/components/script/dom/webidls/UnderlyingSourceContainer.webidl new file mode 100644 index 00000000000..b0bcfcaf242 --- /dev/null +++ b/components/script/dom/webidls/UnderlyingSourceContainer.webidl @@ -0,0 +1,11 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// This interface is entirely internal to Servo, and should not be accessible to +// web pages. + +[LegacyNoInterfaceObject, Exposed=(Window,Worker)] +// Need to escape "ReadableStream" so it's treated as an identifier. +interface _UnderlyingSourceContainer { +}; diff --git a/components/script/fetch.rs b/components/script/fetch.rs index b6c2181538d..266f848ec7a 100644 --- a/components/script/fetch.rs +++ b/components/script/fetch.rs @@ -279,7 +279,7 @@ impl FetchResponseListener for FetchContext { fn process_response_chunk(&mut self, _: RequestId, chunk: Vec<u8>) { let response = self.response_object.root(); - response.stream_chunk(chunk, CanGc::note()); + response.stream_chunk(chunk); } fn process_response_eof( diff --git a/components/script/microtask.rs b/components/script/microtask.rs index 684a6f9d3b2..87bd4e1da1a 100644 --- a/components/script/microtask.rs +++ b/components/script/microtask.rs @@ -18,6 +18,7 @@ use crate::dom::bindings::cell::DomRefCell; use crate::dom::bindings::codegen::Bindings::PromiseBinding::PromiseJobCallback; use crate::dom::bindings::codegen::Bindings::VoidFunctionBinding::VoidFunction; use crate::dom::bindings::root::DomRoot; +use crate::dom::defaultteereadrequest::DefaultTeeReadRequestMicrotask; use crate::dom::globalscope::GlobalScope; use crate::dom::htmlimageelement::ImageElementMicrotask; use crate::dom::htmlmediaelement::MediaElementMicrotask; @@ -41,6 +42,7 @@ pub enum Microtask { User(UserMicrotask), MediaElement(MediaElementMicrotask), ImageElement(ImageElementMicrotask), + ReadableStreamTeeReadRequest(DefaultTeeReadRequestMicrotask), CustomElementReaction, NotifyMutationObservers, } @@ -140,6 +142,9 @@ impl MicrotaskQueue { Microtask::NotifyMutationObservers => { MutationObserver::notify_mutation_observers(); }, + Microtask::ReadableStreamTeeReadRequest(ref task) => { + task.microtask_chunk_steps(can_gc) + }, } } } |