aboutsummaryrefslogtreecommitdiffstats
path: root/components
diff options
context:
space:
mode:
authorGregory Terzian <2792687+gterzian@users.noreply.github.com>2024-12-18 05:14:00 +0800
committerGitHub <noreply@github.com>2024-12-17 21:14:00 +0000
commit379bbb41dde5c46ff39cfc9027d7df49fae733b8 (patch)
treeb8224b9e9d088885fcb3dff405118d5ef932080f /components
parent026d3717177def1b77e8790f3f045feea66df872 (diff)
downloadservo-379bbb41dde5c46ff39cfc9027d7df49fae733b8.tar.gz
servo-379bbb41dde5c46ff39cfc9027d7df49fae733b8.zip
Dom: Re-implement `ReadableStream` Part 1 : Default `Reader` and `Controller` (#34064)
* Re-implement readablestream: basics and default reader and controller --------- Co-authored-by: Jason Tsai <jason@pews.dev> Signed-off-by: Wu Wayne <yuweiwu@pm.me> Add remaining WebIDLs of ReadableStream (#32605) * Add Reader's WebIDL files * Add necessary methods in ReadableStream.webidl Signed-off-by: Wu Wayne <yuweiwu@pm.me> Create safe wrapper for JSFunctions (#32620) * Create safe wrapper for JSFunctions Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Add assert to check if the name ends in a null character Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Create macro to wrap unsafe extern "C" function calls Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Remove WRAPPER_FN Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Add macro example documentation Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Use C-string literals Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Ensure name is Cstr type Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> * Scope #[allow(unsafe_code)] Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> --------- Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com> Signed-off-by: Wu Wayne <yuweiwu@pm.me> Start implementation of default controller and reader Start implementation of default controller and reader * implement basic internal slots, with todos Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * enum for controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * re-implement native controller methods Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add calling into pull algo Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * more details on chunk enqueuing Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add fulfill read request, clean-up warnings Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * read request and reader typing Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * allow for more than one non-native underlying source type Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add todo for should pull Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add underlying source dom struct container Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove rc around source type * add default controller init in stream constructor * setup source container with prototype of source dict * clean-up docs, dispatch of controller in pull algo call * turn off SM streams * remove prototype setting on underlying source container * fix read request promise resolving * tidy * clean-up js conversions in read req handlers * add queue with sizes concept * use dom in pull promise handlers * Demonstrate using dictionary as callback this object. * move value with size to a struct * fmt * put readable stream state in a cell * nits in expectations * remove allow unroot by passing read result directly to promise resolving * tidy * root default controller inside call_pull_if_needed --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: Josh Matthews <josh@joshmatthews.net> Signed-off-by: Wu Wayne <yuweiwu@pm.me> ReadableStream: implement Cancel and Locked (#33136) * implement Locked * implement Cancel and close Signed-off-by: Wu Wayne <yuweiwu@pm.me> Add GetPromiseIsHandled and SetAnyPromiseIsHandled to Promise Signed-off-by: Taym <haddadi.taym@gmail.com> mach fmt Signed-off-by: Taym <haddadi.taym@gmail.com> Readablestream default controller: get desired size (#33497) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> stream: implement controller close (#33498) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> implement stream default controller error (#33503) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Readablestream default controller: enqueue (#33528) * Implement ReadableStreamDefaultControllerMethods::Enqueue Signed-off-by: Wu Wayne <yuweiwu@pm.me> * Add spec comments Signed-off-by: Wu Wayne <yuweiwu@pm.me> --------- Signed-off-by: Wu Wayne <yuweiwu@pm.me> readablestream default controller: fulfill read requests (#33542) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Fix extract_size_algorithm (#33561) Signed-off-by: Wu Wayne <yuweiwu@pm.me> Readablestream default controller: use strategy size (#33551) * readablestream default controller: use strategy size, fallible enqueue Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> docs * readablestream default controller: clear strategy size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * prevent potential re-borrow panics when calling into the strategy size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * document readablestream constructor Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Readablestream: impl default controller should pull, start algo (#33586) * implement should-pull algo for default controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add start algorithm setup for default controller Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> implement promise native handling for start and pull algorithms (#33603) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Implement ReadableStreamDefaultReader (#33160) * Implement ReadableStreamDefaultReader Make the stream mutable readable-stream-reader-generic-release Proper error types when releasing Closed Cancel Signed-off-by: Taym <haddadi.taym@gmail.com> * follow the spec more closely Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Implement ReadableStreamDefaultReader read (#34007) * Implement ReadableStreamDefaultReader read Signed-off-by: Taym <haddadi.taym@gmail.com> * Perform readRequest’s error steps with stream.stored_error Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Improve ReadableStreamDefaultReader close (#34014) * improve ReadableStreamDefaultReader close Signed-off-by: Taym <haddadi.taym@gmail.com> * remove resolve_closed_promise Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Use Rc<Box<[u8]>> for queue to optimize get_in_memory_bytes Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> * Improve read_a_chunk and stop_reading implemntation (#34077) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Implement ReadableStreamDefaultReader::Constructor (#34056) * Implement ReadableStreamDefaultReader::Constructor Signed-off-by: Taym <haddadi.taym@gmail.com> * make start_reading returns ReadableStreamDefaultReader Signed-off-by: Taym <haddadi.taym@gmail.com> * Fix can_gc Signed-off-by: Taym <haddadi.taym@gmail.com> * Add canGc to ReadableStream::GetReader Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Readablestream fix CanGc (#34080) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * correct ReadableStream::error_native implementation and fix clippy warnings (#34088) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * turn assertion of stream present on controller on a early return with false (#34097) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix already mutably borrowed crash (#34105) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Refactor `get_in_memory_bytes` to return `Option<Vec<u8>> and avoid `unreachable!` panic (#34123) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Set ReadableStream ReadableStreamDefaultReader in start_reading (#34125) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix Unhandled rejection with value: object `TypeError: stream is not locked` (#34204) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix assert!(self.is_readable()) crash in ReadableStream::close (#34207) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix call to to_js_object in underlying source algos (#34098) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * do not assume presence of a stream when performing pull steps (#34244) * do not assume presence of a stream when performing pull steps Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add doc comments Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * gracefully handle failure of underlying source algorithms (#34243) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * ensure result of calling start algo is an object (#34245) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * return js failed error if underlying source constructor threw (#34246) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Use JSVal for ValueWithSize::value (#34259) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix release reader lock, (#34255) fix setting stream on controller in new, fix matching fallthrough, reduce visibility of controller error method Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * in stream cancel, reject promist if locked (#34271) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix UnderlyingSourceContainer::call_start_algorithm (#34277) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * implement controller cancel steps, fix stream cancel method (#34301) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix conditional in perform pull steps (#34324) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * set reader closed promise to one resolved with undefined if stream closed on init (#34321) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix init of stream and controller (#34323) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Stream: Fix reborrow in controller enqueue, and fix error and exception handling. (#34338) * fix re-borrow in controller enqueue Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * do not call to_jsval on JSFailed error in enqueue Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix error and exception handling in controller enqueue Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * remove TODO about correctness of stored error, since this was done as part of the switch to a js val. Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Stream: Fix incorrect "this" object in underlying source callbacks (#34368) * in controller close, throw type error if stream cannot be closed Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * store original js object for underlying source, for use as this object in callbacks Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix conditional logic in enqueue to ensure pull is called into (#34375) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Stream: Fix bytelength queueing strategy (#34376) * fix handling of value that is not an object in bytelength queuing strategy Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * return type error if strategy size call fails, to prevent panic because no exception is pending Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * set correct default count queuing size strategy (#34389) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * use proto in stream constructor (#34441) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix edge cases in get_desired_size (#34440) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Stream: fix algo and strategy calls error handling. (#34424) * fix error handling in cancel steps Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * in pull steps, reject promise if pull algo throws Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * if start algorithm fails, rethrow the error Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * when the strategy size fails, directly get the pending exception and use it to error the stream Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * add error handling to enqueue value with size Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * when enqueueing a value errors, ensure we error and stream with the same error used to throw an exception Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix native use of streams (#34468) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Implement readablestreamdefaulttee (#34405) * Implement readablestreamdefaulttee Signed-off-by: Taym <haddadi.taym@gmail.com> * Create UnderlyingSourceType::Tee each stream Signed-off-by: Taym <haddadi.taym@gmail.com> * Use Dom instead of DomRoot Signed-off-by: Taym <haddadi.taym@gmail.com> * Queue a microtask for readRequest chunk steps Signed-off-by: Taym <haddadi.taym@gmail.com> * fix create_readable_stream Signed-off-by: Taym <haddadi.taym@gmail.com> * Remove unnecessary Rc Signed-off-by: Taym <haddadi.taym@gmail.com> * Use correct doc link Signed-off-by: Taym <haddadi.taym@gmail.com> * Add #[allow(crown::unrooted_must_root)] Signed-off-by: Taym <haddadi.taym@gmail.com> * Fix crash in ClosedPromiseRejectionHandler Signed-off-by: Taym <haddadi.taym@gmail.com> * reflect TeeReadRequest and TeeUnderlyingSource Signed-off-by: Taym <haddadi.taym@gmail.com> * fix can_gc Signed-off-by: Taym <haddadi.taym@gmail.com> * reflect tee source, and fix use of mutable dom for tee source and request Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * Fix typo that resolves multiple test failures in 'Tee' tests Signed-off-by: Taym <haddadi.taym@gmail.com> * Fix readable-streams/tee.any.js test Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js (#34531) And fix crate::DomTypeHolder usage * Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js Signed-off-by: Taym <haddadi.taym@gmail.com> * make reader rooted in Constructor and acquire_default_reader Signed-off-by: Taym <haddadi.taym@gmail.com> * Remove spaces Signed-off-by: Taym <haddadi.taym@gmail.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Streams: fetch stream chunks should be uint8 arrays (#34553) * fetch stream chunks should be uint8 arrays Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * fix clippy Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update wpt test for ReadableStream reimplementation (#34579) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix ignore_malloc_size_of in readablestream tee (#34578) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Remove incorrect use of handle array, fail test safely by giving only one reason (#34560) Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update more wpt test for ReadableStream reimplementation (#34598) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix doc and rename Tee to DefaultTee (#34612) Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix: Address review comments Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update response-stream-with-broken-then.any.js.ini test expectation Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix reflect_dom_object can_gc Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix compositeReason for DefaultTeeUnderlyingSource (#34627) * Fix compositeReason for DefaultTeeUnderlyingSource Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update test Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> --------- Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Last fixes stream (#34636) * remove now unsused from_js method of readable stream * fix documenation of error steps * return type error instread of panicking on a todo, when trying to construct a stream of type bytes Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> * fix crown rooting related errors (#34662) Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> --------- Signed-off-by: Taym <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: Wu Wayne <yuweiwu@pm.me> Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Diffstat (limited to 'components')
-rw-r--r--components/script/Cargo.toml2
-rw-r--r--components/script/body.rs11
-rw-r--r--components/script/dom/bindings/callback.rs26
-rw-r--r--components/script/dom/bindings/codegen/Bindings.conf16
-rw-r--r--components/script/dom/bindings/codegen/CodegenRust.py5
-rw-r--r--components/script/dom/bindings/codegen/run.py2
-rw-r--r--components/script/dom/bindings/function.rs50
-rw-r--r--components/script/dom/bindings/import.rs2
-rw-r--r--components/script/dom/bindings/interface.rs1
-rw-r--r--components/script/dom/bindings/mod.rs1
-rw-r--r--components/script/dom/bytelengthqueuingstrategy.rs113
-rw-r--r--components/script/dom/countqueuingstrategy.rs127
-rw-r--r--components/script/dom/defaultteereadrequest.rs238
-rw-r--r--components/script/dom/defaultteeunderlyingsource.rs221
-rw-r--r--components/script/dom/globalscope.rs68
-rw-r--r--components/script/dom/mod.rs10
-rw-r--r--components/script/dom/promise.rs17
-rw-r--r--components/script/dom/readablebytestreamcontroller.rs64
-rw-r--r--components/script/dom/readablestream.rs1150
-rw-r--r--components/script/dom/readablestreambyobreader.rs81
-rw-r--r--components/script/dom/readablestreambyobrequest.rs39
-rw-r--r--components/script/dom/readablestreamdefaultcontroller.rs878
-rw-r--r--components/script/dom/readablestreamdefaultreader.rs530
-rw-r--r--components/script/dom/response.rs16
-rw-r--r--components/script/dom/underlyingsourcecontainer.rs225
-rw-r--r--components/script/dom/webidls/QueuingStrategy.webidl34
-rw-r--r--components/script/dom/webidls/ReadableByteStreamController.webidl19
-rw-r--r--components/script/dom/webidls/ReadableStream.webidl57
-rw-r--r--components/script/dom/webidls/ReadableStreamBYOBReader.webidl19
-rw-r--r--components/script/dom/webidls/ReadableStreamBYOBRequest.webidl15
-rw-r--r--components/script/dom/webidls/ReadableStreamDefaultController.webidl17
-rw-r--r--components/script/dom/webidls/ReadableStreamDefaultReader.webidl47
-rw-r--r--components/script/dom/webidls/UnderlyingSource.webidl21
-rw-r--r--components/script/dom/webidls/UnderlyingSourceContainer.webidl11
-rw-r--r--components/script/fetch.rs2
-rw-r--r--components/script/microtask.rs5
36 files changed, 3728 insertions, 412 deletions
diff --git a/components/script/Cargo.toml b/components/script/Cargo.toml
index 6a9a5bf9a9c..98aec5908ec 100644
--- a/components/script/Cargo.toml
+++ b/components/script/Cargo.toml
@@ -72,7 +72,7 @@ image = { workspace = true }
indexmap = { workspace = true }
ipc-channel = { workspace = true }
itertools = { workspace = true }
-js = { package = "mozjs", git = "https://github.com/servo/mozjs", features = ["streams", "crown"] }
+js = { package = "mozjs", git = "https://github.com/servo/mozjs", features = ["crown"] }
jstraceable_derive = { path = "../jstraceable_derive" }
keyboard-types = { workspace = true }
libc = { workspace = true }
diff --git a/components/script/body.rs b/components/script/body.rs
index 6d7dea4c0ad..17693550554 100644
--- a/components/script/body.rs
+++ b/components/script/body.rs
@@ -187,7 +187,8 @@ impl TransmitBodyConnectHandler {
// TODO: Step 2, If body is null.
// Step 3, get a reader for stream.
- rooted_stream.start_reading().expect("Couldn't acquire a reader for the body stream.");
+ rooted_stream.acquire_default_reader(CanGc::note())
+ .expect("Couldn't acquire a reader for the body stream.");
// Note: this algorithm continues when the first chunk is requested by `net`.
}),
@@ -242,7 +243,7 @@ impl TransmitBodyConnectHandler {
let global = rooted_stream.global();
// Step 4, the result of reading a chunk from body’s stream with reader.
- let promise = rooted_stream.read_a_chunk();
+ let promise = rooted_stream.read_a_chunk(CanGc::note());
// Step 5, the parallel steps waiting for and handling the result of the read promise,
// are a combination of the promise native handler here,
@@ -692,7 +693,7 @@ impl Callback for ConsumeBodyPromiseHandler {
let global = stream.global();
// Run the above step again.
- let read_promise = stream.read_a_chunk();
+ let read_promise = stream.read_a_chunk(can_gc);
let promise_handler = Box::new(ConsumeBodyPromiseHandler {
result_promise: self.result_promise.clone(),
@@ -763,7 +764,7 @@ fn consume_body_with_promise<T: BodyMixin + DomObject>(
};
// Step 3.
- if stream.start_reading().is_err() {
+ if stream.acquire_default_reader(can_gc).is_err() {
return promise.reject_error(Error::Type(
"The response's stream is disturbed or locked".to_string(),
));
@@ -774,7 +775,7 @@ fn consume_body_with_promise<T: BodyMixin + DomObject>(
// Step 1 of
// https://fetch.spec.whatwg.org/#concept-read-all-bytes-from-readablestream
- let read_promise = stream.read_a_chunk();
+ let read_promise = stream.read_a_chunk(can_gc);
let promise_handler = Box::new(ConsumeBodyPromiseHandler {
result_promise: promise.clone(),
diff --git a/components/script/dom/bindings/callback.rs b/components/script/dom/bindings/callback.rs
index 374627c16da..bb4e59ba75c 100644
--- a/components/script/dom/bindings/callback.rs
+++ b/components/script/dom/bindings/callback.rs
@@ -15,7 +15,7 @@ use js::jsapi::{
};
use js::jsval::{JSVal, ObjectValue, UndefinedValue};
use js::rust::wrappers::{JS_GetProperty, JS_WrapObject};
-use js::rust::{MutableHandleObject, Runtime};
+use js::rust::{HandleObject, MutableHandleObject, Runtime};
use crate::dom::bindings::codegen::Bindings::WindowBinding::Window_Binding::WindowMethods;
use crate::dom::bindings::error::{report_pending_exception, Error, Fallible};
@@ -205,9 +205,29 @@ impl CallbackInterface {
}
}
+pub trait ThisReflector {
+ fn jsobject(&self) -> *mut JSObject;
+}
+
+impl<T: DomObject> ThisReflector for T {
+ fn jsobject(&self) -> *mut JSObject {
+ self.reflector().get_jsobject().get()
+ }
+}
+
+impl<'a> ThisReflector for HandleObject<'a> {
+ fn jsobject(&self) -> *mut JSObject {
+ self.get()
+ }
+}
+
/// Wraps the reflector for `p` into the realm of `cx`.
-pub fn wrap_call_this_object<T: DomObject>(cx: JSContext, p: &T, mut rval: MutableHandleObject) {
- rval.set(p.reflector().get_jsobject().get());
+pub fn wrap_call_this_object<T: ThisReflector>(
+ cx: JSContext,
+ p: &T,
+ mut rval: MutableHandleObject,
+) {
+ rval.set(p.jsobject());
assert!(!rval.get().is_null());
unsafe {
diff --git a/components/script/dom/bindings/codegen/Bindings.conf b/components/script/dom/bindings/codegen/Bindings.conf
index f4e2c9e2dfb..d0635ead024 100644
--- a/components/script/dom/bindings/codegen/Bindings.conf
+++ b/components/script/dom/bindings/codegen/Bindings.conf
@@ -538,6 +538,22 @@ DOMInterfaces = {
'canGc': ['SimulateDeviceConnection', 'DisconnectAllDevices'],
},
+'ReadableStream': {
+ 'canGc': ['GetReader', 'Cancel', 'Tee'],
+},
+
+"ReadableStreamDefaultController": {
+ "canGc": ["Enqueue"]
+},
+
+"ReadableStreamBYOBReader": {
+ "canGc": ["Read", "Closed", "Cancel"]
+},
+
+"ReadableStreamDefaultReader": {
+ "canGc": ["Read", "Cancel"]
+},
+
}
Dictionaries = {
diff --git a/components/script/dom/bindings/codegen/CodegenRust.py b/components/script/dom/bindings/codegen/CodegenRust.py
index e25ed68d3e8..a0f926b4c50 100644
--- a/components/script/dom/bindings/codegen/CodegenRust.py
+++ b/components/script/dom/bindings/codegen/CodegenRust.py
@@ -933,6 +933,7 @@ def getJSToNativeConversionInfo(type, descriptorProvider, failureCode=None,
"""
{
use crate::realms::{AlreadyInRealm, InRealm};
+ use crate::dom::readablestream::ReadableStream;
let in_realm_proof = AlreadyInRealm::assert_for_cx(cx);
match ReadableStream::from_js(cx, $${val}.get().to_object(), InRealm::Already(&in_realm_proof)) {
Ok(val) => val,
@@ -949,7 +950,7 @@ def getJSToNativeConversionInfo(type, descriptorProvider, failureCode=None,
templateBody = wrapObjectTemplate(templateBody, "None",
isDefinitelyObject, type, failureCode)
- declType = CGGeneric("DomRoot<ReadableStream>")
+ declType = CGGeneric("DomRoot<dom::readablestream::ReadableStream>")
return handleOptional(templateBody, declType,
handleDefault("None"))
@@ -7718,7 +7719,7 @@ class CGCallback(CGClass):
f"unsafe {{ self.{method.name}({', '.join(argnamesWithoutThis)}) }}")
return [ClassMethod(f'{method.name}_', method.returnType, args,
bodyInHeader=True,
- templateArgs=["T: DomObject"],
+ templateArgs=["T: ThisReflector"],
body=bodyWithThis,
visibility='pub'),
ClassMethod(f'{method.name}__', method.returnType, argsWithoutThis,
diff --git a/components/script/dom/bindings/codegen/run.py b/components/script/dom/bindings/codegen/run.py
index 2dee39814b2..400d022a167 100644
--- a/components/script/dom/bindings/codegen/run.py
+++ b/components/script/dom/bindings/codegen/run.py
@@ -30,7 +30,7 @@ def main():
from Configuration import Configuration
from CodegenRust import CGBindingRoot
- parser = WebIDL.Parser(make_dir(os.path.join(out_dir, "cache")))
+ parser = WebIDL.Parser(make_dir(os.path.join(out_dir, "cache")), use_builtin_readable_stream=False)
webidls = [name for name in os.listdir(webidls_dir) if name.endswith(".webidl")]
for webidl in webidls:
filename = os.path.join(webidls_dir, webidl)
diff --git a/components/script/dom/bindings/function.rs b/components/script/dom/bindings/function.rs
new file mode 100644
index 00000000000..86f4903b07d
--- /dev/null
+++ b/components/script/dom/bindings/function.rs
@@ -0,0 +1,50 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/// Defines a macro `native_fn!` to create a JavaScript function from a Rust function pointer.
+/// # Example
+/// ```
+/// let js_function: Rc<Function> = native_fn!(my_rust_function, c"myFunction", 2, 0);
+/// ```
+#[macro_export]
+macro_rules! native_fn {
+ ($call:expr, $name:expr, $nargs:expr, $flags:expr) => {{
+ let cx = $crate::dom::types::GlobalScope::get_cx();
+ let fun_obj = $crate::native_raw_obj_fn!(cx, $call, $name, $nargs, $flags);
+ #[allow(unsafe_code)]
+ unsafe {
+ Function::new(cx, fun_obj)
+ }
+ }};
+}
+
+/// Defines a macro `native_raw_obj_fn!` to create a raw JavaScript function object.
+/// # Example
+/// ```
+/// let raw_function_obj: *mut JSObject = native_raw_obj_fn!(cx, my_rust_function, c"myFunction", 2, 0);
+/// ```
+#[macro_export]
+macro_rules! native_raw_obj_fn {
+ ($cx:expr, $call:expr, $name:expr, $nargs:expr, $flags:expr) => {{
+ #[allow(unsafe_code)]
+ #[allow(clippy::macro_metavars_in_unsafe)]
+ unsafe extern "C" fn wrapper(cx: *mut JSContext, argc: u32, vp: *mut JSVal) -> bool {
+ $call(cx, argc, vp)
+ }
+ #[allow(unsafe_code)]
+ #[allow(clippy::macro_metavars_in_unsafe)]
+ unsafe {
+ let name: &std::ffi::CStr = $name;
+ let raw_fun = $crate::dom::bindings::import::module::jsapi::JS_NewFunction(
+ *$cx,
+ Some(wrapper),
+ $nargs,
+ $flags,
+ name.as_ptr() as *const std::ffi::c_char,
+ );
+ assert!(!raw_fun.is_null());
+ $crate::dom::bindings::import::module::jsapi::JS_GetFunctionObject(raw_fun)
+ }
+ }};
+}
diff --git a/components/script/dom/bindings/import.rs b/components/script/dom/bindings/import.rs
index 0d9f0d70fdf..c4586b43905 100644
--- a/components/script/dom/bindings/import.rs
+++ b/components/script/dom/bindings/import.rs
@@ -19,7 +19,7 @@ pub mod base {
pub use crate::dom::bindings::callback::{
wrap_call_this_object, CallSetup, CallbackContainer, CallbackFunction, CallbackInterface,
- CallbackObject, ExceptionHandling,
+ CallbackObject, ExceptionHandling, ThisReflector,
};
pub use crate::dom::bindings::codegen::Bindings::AudioNodeBinding::{
ChannelCountMode, ChannelCountModeValues, ChannelInterpretation,
diff --git a/components/script/dom/bindings/interface.rs b/components/script/dom/bindings/interface.rs
index ce2f7e275b6..bf2fc6faa87 100644
--- a/components/script/dom/bindings/interface.rs
+++ b/components/script/dom/bindings/interface.rs
@@ -146,7 +146,6 @@ pub unsafe fn create_global_object(
let mut options = RealmOptions::default();
options.creationOptions_.traceGlobal_ = Some(trace);
options.creationOptions_.sharedMemoryAndAtomics_ = false;
- options.creationOptions_.streams_ = true;
select_compartment(cx, &mut options);
let principal = ServoJSPrincipals::new(origin);
diff --git a/components/script/dom/bindings/mod.rs b/components/script/dom/bindings/mod.rs
index a6666b70d90..633763f1ba6 100644
--- a/components/script/dom/bindings/mod.rs
+++ b/components/script/dom/bindings/mod.rs
@@ -143,6 +143,7 @@ pub mod conversions;
pub mod error;
pub mod finalize;
pub mod frozenarray;
+pub mod function;
pub mod guard;
pub mod import;
pub mod inheritance;
diff --git a/components/script/dom/bytelengthqueuingstrategy.rs b/components/script/dom/bytelengthqueuingstrategy.rs
new file mode 100644
index 00000000000..55b8e4e8815
--- /dev/null
+++ b/components/script/dom/bytelengthqueuingstrategy.rs
@@ -0,0 +1,113 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::gc::{HandleValue, MutableHandleValue};
+use js::jsapi::{CallArgs, JSContext};
+use js::jsval::JSVal;
+use js::rust::HandleObject;
+
+use super::bindings::codegen::Bindings::FunctionBinding::Function;
+use super::bindings::codegen::Bindings::QueuingStrategyBinding::{
+ ByteLengthQueuingStrategyMethods, QueuingStrategyInit,
+};
+use super::bindings::import::module::{DomObject, DomRoot, Fallible, Reflector};
+use super::bindings::reflector::reflect_dom_object_with_proto;
+use super::types::GlobalScope;
+use crate::dom::bindings::import::module::get_dictionary_property;
+use crate::native_fn;
+use crate::script_runtime::CanGc;
+
+#[dom_struct]
+pub struct ByteLengthQueuingStrategy {
+ reflector_: Reflector,
+ high_water_mark: f64,
+}
+
+impl ByteLengthQueuingStrategy {
+ pub fn new_inherited(init: f64) -> Self {
+ Self {
+ reflector_: Reflector::new(),
+ high_water_mark: init,
+ }
+ }
+
+ pub fn new(
+ global: &GlobalScope,
+ proto: Option<HandleObject>,
+ init: f64,
+ can_gc: CanGc,
+ ) -> DomRoot<Self> {
+ reflect_dom_object_with_proto(Box::new(Self::new_inherited(init)), global, proto, can_gc)
+ }
+}
+
+impl ByteLengthQueuingStrategyMethods<crate::DomTypeHolder> for ByteLengthQueuingStrategy {
+ /// <https://streams.spec.whatwg.org/#blqs-constructor>
+ fn Constructor(
+ global: &GlobalScope,
+ proto: Option<HandleObject>,
+ can_gc: CanGc,
+ init: &QueuingStrategyInit,
+ ) -> DomRoot<Self> {
+ Self::new(global, proto, init.highWaterMark, can_gc)
+ }
+ /// <https://streams.spec.whatwg.org/#blqs-high-water-mark>
+ fn HighWaterMark(&self) -> f64 {
+ self.high_water_mark
+ }
+
+ /// <https://streams.spec.whatwg.org/#blqs-size>
+ fn GetSize(&self) -> Fallible<Rc<Function>> {
+ let global = self.reflector_.global();
+ // Return this's relevant global object's byte length queuing strategy
+ // size function.
+ if let Some(fun) = global.get_byte_length_queuing_strategy_size() {
+ return Ok(fun);
+ }
+
+ // Step 1. Let steps be the following steps, given chunk
+ // Note: See ByteLengthQueuingStrategySize instead.
+
+ // Step 2. Let F be !CreateBuiltinFunction(steps, 1, "size", « »,
+ // globalObject’s relevant Realm).
+ let fun = native_fn!(byte_length_queuing_strategy_size, c"size", 1, 0);
+ // Step 3. Set globalObject’s byte length queuing strategy size function to
+ // a Function that represents a reference to F,
+ // with callback context equal to globalObject’s relevant settings object.
+ global.set_byte_length_queuing_strategy_size(fun.clone());
+ Ok(fun)
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#byte-length-queuing-strategy-size-function>
+#[allow(unsafe_code)]
+pub unsafe fn byte_length_queuing_strategy_size(
+ cx: *mut JSContext,
+ argc: u32,
+ vp: *mut JSVal,
+) -> bool {
+ let args = CallArgs::from_vp(vp, argc);
+
+ // Step 1.1: Return ? GetV(chunk, "byteLength").
+ let val = HandleValue::from_raw(args.get(0));
+
+ // https://tc39.es/ecma262/multipage/abstract-operations.html#sec-getv
+ // Let O be ? ToObject(V).
+ if !val.is_object() {
+ return false;
+ }
+ rooted!(in(cx) let object = val.to_object());
+
+ // Return ? O.[[Get]](P, V).
+ get_dictionary_property(
+ cx,
+ object.handle(),
+ "byteLength",
+ MutableHandleValue::from_raw(args.rval()),
+ )
+ .unwrap_or(false)
+}
diff --git a/components/script/dom/countqueuingstrategy.rs b/components/script/dom/countqueuingstrategy.rs
new file mode 100644
index 00000000000..73552c57b5b
--- /dev/null
+++ b/components/script/dom/countqueuingstrategy.rs
@@ -0,0 +1,127 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::{CallArgs, JSContext};
+use js::jsval::{Int32Value, JSVal};
+use js::rust::HandleObject;
+
+use super::bindings::codegen::Bindings::FunctionBinding::Function;
+use super::bindings::codegen::Bindings::QueuingStrategyBinding::{
+ CountQueuingStrategyMethods, QueuingStrategy, QueuingStrategyInit, QueuingStrategySize,
+};
+use super::bindings::import::module::{DomObject, DomRoot, Error, Fallible, Reflector};
+use super::bindings::reflector::reflect_dom_object_with_proto;
+use super::types::GlobalScope;
+use crate::script_runtime::CanGc;
+use crate::{native_fn, native_raw_obj_fn};
+
+#[dom_struct]
+pub struct CountQueuingStrategy {
+ reflector_: Reflector,
+ high_water_mark: f64,
+}
+
+impl CountQueuingStrategy {
+ pub fn new_inherited(init: f64) -> Self {
+ Self {
+ reflector_: Reflector::new(),
+ high_water_mark: init,
+ }
+ }
+
+ pub fn new(
+ global: &GlobalScope,
+ proto: Option<HandleObject>,
+ init: f64,
+ can_gc: CanGc,
+ ) -> DomRoot<Self> {
+ reflect_dom_object_with_proto(Box::new(Self::new_inherited(init)), global, proto, can_gc)
+ }
+}
+
+impl CountQueuingStrategyMethods<crate::DomTypeHolder> for CountQueuingStrategy {
+ /// <https://streams.spec.whatwg.org/#cqs-constructor>
+ fn Constructor(
+ global: &GlobalScope,
+ proto: Option<HandleObject>,
+ can_gc: CanGc,
+ init: &QueuingStrategyInit,
+ ) -> DomRoot<Self> {
+ Self::new(global, proto, init.highWaterMark, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#cqs-high-water-mark>
+ fn HighWaterMark(&self) -> f64 {
+ self.high_water_mark
+ }
+
+ /// <https://streams.spec.whatwg.org/#cqs-size>
+ fn GetSize(&self) -> Fallible<Rc<Function>> {
+ let global = self.reflector_.global();
+ // Return this's relevant global object's count queuing strategy
+ // size function.
+ if let Some(fun) = global.get_count_queuing_strategy_size() {
+ return Ok(fun);
+ }
+
+ // Step 1. Let steps be the following steps, given chunk
+ // Note: See ByteLengthQueuingStrategySize instead.
+
+ // Step 2. Let F be !CreateBuiltinFunction(steps, 1, "size", « »,
+ // globalObject’s relevant Realm).
+ let fun = native_fn!(count_queuing_strategy_size, c"size", 0, 0);
+ // Step 3. Set globalObject’s count queuing strategy size function to
+ // a Function that represents a reference to F,
+ // with callback context equal to globalObject’s relevant settings object.
+ global.set_count_queuing_strategy_size(fun.clone());
+ Ok(fun)
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#count-queuing-strategy-size-function>
+#[allow(unsafe_code)]
+pub unsafe fn count_queuing_strategy_size(_cx: *mut JSContext, argc: u32, vp: *mut JSVal) -> bool {
+ let args = CallArgs::from_vp(vp, argc);
+ // Step 1.1. Return 1.
+ args.rval().set(Int32Value(1));
+ true
+}
+
+/// Extract the high water mark from a QueuingStrategy.
+/// If the high water mark is not set, return the default value.
+///
+/// <https://streams.spec.whatwg.org/#validate-and-normalize-high-water-mark>
+pub fn extract_high_water_mark(strategy: &QueuingStrategy, default_hwm: f64) -> Result<f64, Error> {
+ if strategy.highWaterMark.is_none() {
+ return Ok(default_hwm);
+ }
+
+ let high_water_mark = strategy.highWaterMark.unwrap();
+ if high_water_mark.is_nan() || high_water_mark < 0.0 {
+ return Err(Error::Range(
+ "High water mark must be a non-negative number.".to_string(),
+ ));
+ }
+
+ Ok(high_water_mark)
+}
+
+/// Extract the size algorithm from a QueuingStrategy.
+/// If the size algorithm is not set, return a fallback function which always returns 1.
+///
+/// <https://streams.spec.whatwg.org/#make-size-algorithm-from-size-function>
+pub fn extract_size_algorithm(strategy: &QueuingStrategy) -> Rc<QueuingStrategySize> {
+ if strategy.size.is_none() {
+ let cx = GlobalScope::get_cx();
+ let fun_obj = native_raw_obj_fn!(cx, count_queuing_strategy_size, c"size", 0, 0);
+ #[allow(unsafe_code)]
+ unsafe {
+ return QueuingStrategySize::new(cx, fun_obj);
+ };
+ }
+ strategy.size.as_ref().unwrap().clone()
+}
diff --git a/components/script/dom/defaultteereadrequest.rs b/components/script/dom/defaultteereadrequest.rs
new file mode 100644
index 00000000000..dfe5b22d045
--- /dev/null
+++ b/components/script/dom/defaultteereadrequest.rs
@@ -0,0 +1,238 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::cell::Cell;
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::Heap;
+use js::jsval::{JSVal, UndefinedValue};
+use js::rust::HandleValue as SafeHandleValue;
+
+use super::bindings::reflector::reflect_dom_object;
+use super::bindings::root::DomRoot;
+use super::bindings::structuredclone;
+use crate::dom::bindings::reflector::{DomObject, Reflector};
+use crate::dom::bindings::root::Dom;
+use crate::dom::bindings::trace::RootedTraceableBox;
+use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::readablestream::ReadableStream;
+use crate::microtask::Microtask;
+use crate::script_runtime::CanGc;
+
+#[derive(JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+pub struct DefaultTeeReadRequestMicrotask {
+ #[ignore_malloc_size_of = "mozjs"]
+ chunk: Box<Heap<JSVal>>,
+ tee_read_request: Dom<DefaultTeeReadRequest>,
+}
+
+impl DefaultTeeReadRequestMicrotask {
+ pub fn microtask_chunk_steps(&self, can_gc: CanGc) {
+ self.tee_read_request.chunk_steps(&self.chunk, can_gc)
+ }
+}
+
+#[dom_struct]
+/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
+pub struct DefaultTeeReadRequest {
+ reflector_: Reflector,
+ stream: Dom<ReadableStream>,
+ branch_1: Dom<ReadableStream>,
+ branch_2: Dom<ReadableStream>,
+ #[ignore_malloc_size_of = "Rc"]
+ reading: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ read_again: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ canceled_1: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ canceled_2: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ clone_for_branch_2: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ cancel_promise: Rc<Promise>,
+ tee_underlying_source: Dom<DefaultTeeUnderlyingSource>,
+}
+impl DefaultTeeReadRequest {
+ #[allow(clippy::too_many_arguments)]
+ #[allow(crown::unrooted_must_root)]
+ pub fn new(
+ stream: &ReadableStream,
+ branch_1: &ReadableStream,
+ branch_2: &ReadableStream,
+ reading: Rc<Cell<bool>>,
+ read_again: Rc<Cell<bool>>,
+ canceled_1: Rc<Cell<bool>>,
+ canceled_2: Rc<Cell<bool>>,
+ clone_for_branch_2: Rc<Cell<bool>>,
+ cancel_promise: Rc<Promise>,
+ tee_underlying_source: &DefaultTeeUnderlyingSource,
+ can_gc: CanGc,
+ ) -> DomRoot<Self> {
+ reflect_dom_object(
+ Box::new(DefaultTeeReadRequest {
+ reflector_: Reflector::new(),
+ stream: Dom::from_ref(stream),
+ branch_1: Dom::from_ref(branch_1),
+ branch_2: Dom::from_ref(branch_2),
+ reading,
+ read_again,
+ canceled_1,
+ canceled_2,
+ clone_for_branch_2,
+ cancel_promise,
+ tee_underlying_source: Dom::from_ref(tee_underlying_source),
+ }),
+ &*stream.global(),
+ can_gc,
+ )
+ }
+ /// Call into cancel of the stream,
+ /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
+ pub fn stream_cancel(&self, reason: SafeHandleValue, can_gc: CanGc) {
+ self.stream.cancel(reason, can_gc);
+ }
+ /// Enqueue a microtask to perform the chunk steps
+ /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
+ pub fn enqueue_chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) {
+ // Queue a microtask to perform the following steps:
+ let tee_read_request_chunk = DefaultTeeReadRequestMicrotask {
+ chunk: Heap::boxed(*chunk.handle()),
+ tee_read_request: Dom::from_ref(self),
+ };
+ let global = self.stream.global();
+ let microtask_queue = global.microtask_queue();
+ let cx = GlobalScope::get_cx();
+ microtask_queue.enqueue(
+ Microtask::ReadableStreamTeeReadRequest(tee_read_request_chunk),
+ cx,
+ );
+ }
+ /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
+ #[allow(unsafe_code)]
+ #[allow(clippy::borrowed_box)]
+ pub fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) {
+ // Set readAgain to false.
+ self.read_again.set(false);
+ // Let chunk1 and chunk2 be chunk.
+ let chunk1 = chunk;
+ let chunk2 = chunk;
+ // If canceled_2 is false and cloneForBranch2 is true,
+ if !self.canceled_2.get() && self.clone_for_branch_2.get() {
+ let cx = GlobalScope::get_cx();
+ // Let cloneResult be StructuredClone(chunk2).
+ rooted!(in(*cx) let mut clone_result = UndefinedValue());
+ let data = structuredclone::write(
+ cx,
+ unsafe { SafeHandleValue::from_raw(chunk2.handle()) },
+ None,
+ )
+ .unwrap();
+ // If cloneResult is an abrupt completion,
+ if structuredclone::read(&self.stream.global(), data, clone_result.handle_mut())
+ .is_err()
+ {
+ // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], cloneResult.[[Value]]).
+ self.readable_stream_default_controller_error(
+ &self.branch_1,
+ clone_result.handle(),
+ );
+
+ // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], cloneResult.[[Value]]).
+ self.readable_stream_default_controller_error(
+ &self.branch_2,
+ clone_result.handle(),
+ );
+ // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
+ self.stream_cancel(clone_result.handle(), can_gc);
+ // Return.
+ return;
+ } else {
+ // Otherwise, set chunk2 to cloneResult.[[Value]].
+ chunk2.set(*clone_result);
+ }
+ }
+ // If canceled_1 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_1.[[controller]], chunk1).
+ if !self.canceled_1.get() {
+ self.readable_stream_default_controller_enqueue(
+ &self.branch_1,
+ unsafe { SafeHandleValue::from_raw(chunk1.handle()) },
+ can_gc,
+ );
+ }
+ // If canceled_2 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_2.[[controller]], chunk2).
+ if !self.canceled_2.get() {
+ self.readable_stream_default_controller_enqueue(
+ &self.branch_2,
+ unsafe { SafeHandleValue::from_raw(chunk2.handle()) },
+ can_gc,
+ );
+ }
+ // Set reading to false.
+ self.reading.set(false);
+ // If readAgain is true, perform pullAlgorithm.
+ if self.read_again.get() {
+ self.pull_algorithm(can_gc);
+ }
+ }
+ /// <https://streams.spec.whatwg.org/#read-request-close-steps>
+ pub fn close_steps(&self) {
+ // Set reading to false.
+ self.reading.set(false);
+ // If canceled_1 is false, perform ! ReadableStreamDefaultControllerClose(branch_1.[[controller]]).
+ if !self.canceled_1.get() {
+ self.readable_stream_default_controller_close(&self.branch_1);
+ }
+ // If canceled_2 is false, perform ! ReadableStreamDefaultControllerClose(branch_2.[[controller]]).
+ if !self.canceled_2.get() {
+ self.readable_stream_default_controller_close(&self.branch_2);
+ }
+ // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
+ if !self.canceled_1.get() || !self.canceled_2.get() {
+ self.cancel_promise.resolve_native(&());
+ }
+ }
+ /// <https://streams.spec.whatwg.org/#read-request-error-steps>
+ pub fn error_steps(&self) {
+ // Set reading to false.
+ self.reading.set(false);
+ }
+ /// Call into enqueue of the default controller of a stream,
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
+ fn readable_stream_default_controller_enqueue(
+ &self,
+ stream: &ReadableStream,
+ chunk: SafeHandleValue,
+ can_gc: CanGc,
+ ) {
+ stream
+ .get_default_controller()
+ .enqueue(GlobalScope::get_cx(), chunk, can_gc)
+ .expect("enqueue failed for stream controller in DefaultTeeReadRequest");
+ }
+
+ /// Call into close of the default controller of a stream,
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
+ fn readable_stream_default_controller_close(&self, stream: &ReadableStream) {
+ stream.get_default_controller().close();
+ }
+
+ /// Call into error of the default controller of stream,
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
+ fn readable_stream_default_controller_error(
+ &self,
+ stream: &ReadableStream,
+ error: SafeHandleValue,
+ ) {
+ stream.get_default_controller().error(error);
+ }
+
+ pub fn pull_algorithm(&self, can_gc: CanGc) {
+ self.tee_underlying_source.pull_algorithm(can_gc);
+ }
+}
diff --git a/components/script/dom/defaultteeunderlyingsource.rs b/components/script/dom/defaultteeunderlyingsource.rs
new file mode 100644
index 00000000000..902f0986eb8
--- /dev/null
+++ b/components/script/dom/defaultteeunderlyingsource.rs
@@ -0,0 +1,221 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::cell::Cell;
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
+use js::jsval::{ObjectValue, UndefinedValue};
+use js::rust::HandleValue as SafeHandleValue;
+
+use super::bindings::root::{DomRoot, MutNullableDom};
+use super::types::{ReadableStream, ReadableStreamDefaultReader};
+use crate::dom::bindings::import::module::Error;
+use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector};
+use crate::dom::bindings::root::Dom;
+use crate::dom::defaultteereadrequest::DefaultTeeReadRequest;
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::readablestreamdefaultreader::ReadRequest;
+use crate::script_runtime::CanGc;
+
+#[derive(JSTraceable, MallocSizeOf)]
+pub enum TeeCancelAlgorithm {
+ Cancel1Algorithm,
+ Cancel2Algorithm,
+}
+
+#[dom_struct]
+/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
+pub struct DefaultTeeUnderlyingSource {
+ reflector_: Reflector,
+ reader: Dom<ReadableStreamDefaultReader>,
+ stream: Dom<ReadableStream>,
+ branch_1: MutNullableDom<ReadableStream>,
+ branch_2: MutNullableDom<ReadableStream>,
+ #[ignore_malloc_size_of = "Rc"]
+ reading: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ read_again: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ canceled_1: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ canceled_2: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ clone_for_branch_2: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ #[allow(clippy::redundant_allocation)]
+ reason_1: Rc<Box<Heap<Value>>>,
+ #[ignore_malloc_size_of = "Rc"]
+ #[allow(clippy::redundant_allocation)]
+ reason_2: Rc<Box<Heap<Value>>>,
+ #[ignore_malloc_size_of = "Rc"]
+ cancel_promise: Rc<Promise>,
+ tee_cancel_algorithm: TeeCancelAlgorithm,
+}
+
+impl DefaultTeeUnderlyingSource {
+ #[allow(clippy::too_many_arguments)]
+ #[allow(clippy::redundant_allocation)]
+ #[allow(crown::unrooted_must_root)]
+ pub fn new(
+ reader: &ReadableStreamDefaultReader,
+ stream: &ReadableStream,
+ reading: Rc<Cell<bool>>,
+ read_again: Rc<Cell<bool>>,
+ canceled_1: Rc<Cell<bool>>,
+ canceled_2: Rc<Cell<bool>>,
+ clone_for_branch_2: Rc<Cell<bool>>,
+ reason_1: Rc<Box<Heap<Value>>>,
+ reason_2: Rc<Box<Heap<Value>>>,
+ cancel_promise: Rc<Promise>,
+ tee_cancel_algorithm: TeeCancelAlgorithm,
+ can_gc: CanGc,
+ ) -> DomRoot<DefaultTeeUnderlyingSource> {
+ reflect_dom_object(
+ Box::new(DefaultTeeUnderlyingSource {
+ reflector_: Reflector::new(),
+ reader: Dom::from_ref(reader),
+ stream: Dom::from_ref(stream),
+ branch_1: MutNullableDom::new(None),
+ branch_2: MutNullableDom::new(None),
+ reading,
+ read_again,
+ canceled_1,
+ canceled_2,
+ clone_for_branch_2,
+ reason_1,
+ reason_2,
+ cancel_promise,
+ tee_cancel_algorithm,
+ }),
+ &*stream.global(),
+ can_gc,
+ )
+ }
+
+ pub fn set_branch_1(&self, stream: &ReadableStream) {
+ self.branch_1.set(Some(stream));
+ }
+
+ pub fn set_branch_2(&self, stream: &ReadableStream) {
+ self.branch_2.set(Some(stream));
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
+ /// Let pullAlgorithm be the following steps:
+ #[allow(crown::unrooted_must_root)]
+ pub fn pull_algorithm(&self, can_gc: CanGc) -> Option<Result<Rc<Promise>, Error>> {
+ // If reading is true,
+ if self.reading.get() {
+ // Set readAgain to true.
+ self.read_again.set(true);
+ // Return a promise resolved with undefined.
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ return Some(Promise::new_resolved(
+ &self.stream.global(),
+ cx,
+ rval.handle(),
+ ));
+ }
+
+ // Set reading to true.
+ self.reading.set(true);
+
+ // Let readRequest be a read request with the following items:
+ let tee_read_request = DefaultTeeReadRequest::new(
+ &self.stream,
+ &self.branch_1.get().expect("Branch 1 should be set."),
+ &self.branch_2.get().expect("Branch 2 should be set."),
+ self.reading.clone(),
+ self.read_again.clone(),
+ self.canceled_1.clone(),
+ self.canceled_2.clone(),
+ self.clone_for_branch_2.clone(),
+ self.cancel_promise.clone(),
+ self,
+ can_gc,
+ );
+
+ // Rooting: the tee read request is rooted above.
+ let read_request = ReadRequest::DefaultTee {
+ tee_read_request: Dom::from_ref(&tee_read_request),
+ };
+
+ // Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
+ self.reader.read(&read_request, can_gc);
+
+ // Return a promise resolved with undefined.
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ Some(Promise::new_resolved(
+ &self.stream.global(),
+ GlobalScope::get_cx(),
+ rval.handle(),
+ ))
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
+ /// Let cancel1Algorithm be the following steps, taking a reason argument
+ /// and
+ /// Let cancel2Algorithm be the following steps, taking a reason argument
+ #[allow(unsafe_code)]
+ pub fn cancel_algorithm(
+ &self,
+ reason: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> Option<Result<Rc<Promise>, Error>> {
+ match self.tee_cancel_algorithm {
+ TeeCancelAlgorithm::Cancel1Algorithm => {
+ // Set canceled_1 to true.
+ self.canceled_1.set(true);
+
+ // Set reason_1 to reason.
+ self.reason_1.set(reason.get());
+
+ // If canceled_2 is true,
+ if self.canceled_2.get() {
+ self.resolve_cancel_promise(can_gc);
+ }
+ // Return cancelPromise.
+ Some(Ok(self.cancel_promise.clone()))
+ },
+ TeeCancelAlgorithm::Cancel2Algorithm => {
+ // Set canceled_2 to true.
+ self.canceled_2.set(true);
+
+ // Set reason_2 to reason.
+ self.reason_2.set(reason.get());
+
+ // If canceled_1 is true,
+ if self.canceled_1.get() {
+ self.resolve_cancel_promise(can_gc);
+ }
+ // Return cancelPromise.
+ Some(Ok(self.cancel_promise.clone()))
+ },
+ }
+ }
+
+ #[allow(unsafe_code)]
+ fn resolve_cancel_promise(&self, can_gc: CanGc) {
+ // Let compositeReason be ! CreateArrayFromList(« reason_1, reason_2 »).
+ let cx = GlobalScope::get_cx();
+ rooted_vec!(let mut reasons_values);
+ reasons_values.push(self.reason_1.get());
+ reasons_values.push(self.reason_2.get());
+
+ let reasons_values_array = HandleValueArray::from(&reasons_values);
+ rooted!(in(*cx) let reasons = unsafe { NewArrayObject(*cx, &reasons_values_array) });
+ rooted!(in(*cx) let reasons_value = ObjectValue(reasons.get()));
+
+ // Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
+ let cancel_result = self.stream.cancel(reasons_value.handle(), can_gc);
+
+ // Resolve cancelPromise with cancelResult.
+ self.cancel_promise.resolve_native(&cancel_result);
+ }
+}
diff --git a/components/script/dom/globalscope.rs b/components/script/dom/globalscope.rs
index 0b4c390cf1d..722bda2d453 100644
--- a/components/script/dom/globalscope.rs
+++ b/components/script/dom/globalscope.rs
@@ -3,7 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::borrow::Cow;
-use std::cell::Cell;
+use std::cell::{Cell, OnceCell};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::ops::Index;
@@ -73,6 +73,7 @@ use super::bindings::trace::{HashMapTracedValues, RootedTraceableBox};
use crate::dom::bindings::cell::{DomRefCell, RefMut};
use crate::dom::bindings::codegen::Bindings::BroadcastChannelBinding::BroadcastChannelMethods;
use crate::dom::bindings::codegen::Bindings::EventSourceBinding::EventSource_Binding::EventSourceMethods;
+use crate::dom::bindings::codegen::Bindings::FunctionBinding::Function;
use crate::dom::bindings::codegen::Bindings::ImageBitmapBinding::{
ImageBitmapOptions, ImageBitmapSource,
};
@@ -92,6 +93,7 @@ use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::bindings::settings_stack::{entry_global, incumbent_global, AutoEntryScript};
use crate::dom::bindings::str::DOMString;
use crate::dom::bindings::structuredclone;
+use crate::dom::bindings::trace::CustomTraceable;
use crate::dom::bindings::weakref::{DOMTracker, WeakRef};
use crate::dom::blob::Blob;
use crate::dom::broadcastchannel::BroadcastChannel;
@@ -114,9 +116,10 @@ use crate::dom::paintworkletglobalscope::PaintWorkletGlobalScope;
use crate::dom::performance::Performance;
use crate::dom::performanceobserver::VALID_ENTRY_TYPES;
use crate::dom::promise::Promise;
-use crate::dom::readablestream::{ExternalUnderlyingSource, ReadableStream};
+use crate::dom::readablestream::ReadableStream;
use crate::dom::serviceworker::ServiceWorker;
use crate::dom::serviceworkerregistration::ServiceWorkerRegistration;
+use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
#[cfg(feature = "webgpu")]
use crate::dom::webgpu::gpudevice::GPUDevice;
#[cfg(feature = "webgpu")]
@@ -367,6 +370,20 @@ pub struct GlobalScope {
/// Directory to store unminified scripts for this window if unminify-js
/// opt is enabled.
unminified_js_dir: Option<String>,
+
+ /// The byte length queuing strategy size function that will be initialized once
+ /// `size` getter of `ByteLengthQueuingStrategy` is called.
+ ///
+ /// <https://streams.spec.whatwg.org/#byte-length-queuing-strategy-size-function>
+ #[ignore_malloc_size_of = "Rc<T> is hard"]
+ byte_length_queuing_strategy_size_function: OnceCell<Rc<Function>>,
+
+ /// The count queuing strategy size function that will be initialized once
+ /// `size` getter of `CountQueuingStrategy` is called.
+ ///
+ /// <https://streams.spec.whatwg.org/#count-queuing-strategy-size-function>
+ #[ignore_malloc_size_of = "Rc<T> is hard"]
+ count_queuing_strategy_size_function: OnceCell<Rc<Function>>,
}
/// A wrapper for glue-code between the ipc router and the event-loop.
@@ -629,10 +646,10 @@ impl MessageListener {
}
/// Callback used to enqueue file chunks to streams as part of FileListener.
-fn stream_handle_incoming(stream: &ReadableStream, bytes: Fallible<Vec<u8>>, can_gc: CanGc) {
+fn stream_handle_incoming(stream: &ReadableStream, bytes: Fallible<Vec<u8>>) {
match bytes {
Ok(b) => {
- stream.enqueue_native(b, can_gc);
+ stream.enqueue_native(b);
},
Err(e) => {
stream.error_native(e);
@@ -642,7 +659,7 @@ fn stream_handle_incoming(stream: &ReadableStream, bytes: Fallible<Vec<u8>>, can
/// Callback used to close streams as part of FileListener.
fn stream_handle_eof(stream: &ReadableStream) {
- stream.close_native();
+ stream.controller_close_native();
}
impl FileListener {
@@ -655,7 +672,7 @@ impl FileListener {
let task = task!(enqueue_stream_chunk: move || {
let stream = trusted.root();
- stream_handle_incoming(&stream, Ok(blob_buf.bytes), CanGc::note());
+ stream_handle_incoming(&stream, Ok(blob_buf.bytes));
});
let _ = self
@@ -679,7 +696,7 @@ impl FileListener {
let task = task!(enqueue_stream_chunk: move || {
let stream = trusted.root();
- stream_handle_incoming(&stream, Ok(bytes_in), CanGc::note());
+ stream_handle_incoming(&stream, Ok(bytes_in));
});
let _ = self
@@ -745,7 +762,7 @@ impl FileListener {
let _ = self.task_source.queue_with_canceller(
task!(error_stream: move || {
let stream = trusted_stream.root();
- stream_handle_incoming(&stream, error, CanGc::note());
+ stream_handle_incoming(&stream, error);
}),
&self.task_canceller,
);
@@ -820,6 +837,8 @@ impl GlobalScope {
dynamic_modules: DomRefCell::new(DynamicModuleList::new()),
inherited_secure_context,
unminified_js_dir: unminify_js.then(|| unminified_path("unminified-js")),
+ byte_length_queuing_strategy_size_function: OnceCell::new(),
+ count_queuing_strategy_size_function: OnceCell::new(),
}
}
@@ -2016,7 +2035,8 @@ impl GlobalScope {
let stream = ReadableStream::new_with_external_underlying_source(
self,
- ExternalUnderlyingSource::Blob(size),
+ UnderlyingSourceType::Blob(size),
+ can_gc,
);
let recv = self.send_msg(file_id);
@@ -3447,6 +3467,36 @@ impl GlobalScope {
pub fn unminified_js_dir(&self) -> Option<String> {
self.unminified_js_dir.clone()
}
+
+ pub(crate) fn set_byte_length_queuing_strategy_size(&self, function: Rc<Function>) {
+ if self
+ .byte_length_queuing_strategy_size_function
+ .set(function)
+ .is_err()
+ {
+ warn!("byte length queuing strategy size function is set twice.");
+ };
+ }
+
+ pub(crate) fn get_byte_length_queuing_strategy_size(&self) -> Option<Rc<Function>> {
+ self.byte_length_queuing_strategy_size_function
+ .get()
+ .cloned()
+ }
+
+ pub(crate) fn set_count_queuing_strategy_size(&self, function: Rc<Function>) {
+ if self
+ .count_queuing_strategy_size_function
+ .set(function)
+ .is_err()
+ {
+ warn!("count queuing strategy size function is set twice.");
+ };
+ }
+
+ pub(crate) fn get_count_queuing_strategy_size(&self) -> Option<Rc<Function>> {
+ self.count_queuing_strategy_size_function.get().cloned()
+ }
}
/// Returns the Rust global scope from a JS global object.
diff --git a/components/script/dom/mod.rs b/components/script/dom/mod.rs
index 8157728af84..8fe9647d3e9 100644
--- a/components/script/dom/mod.rs
+++ b/components/script/dom/mod.rs
@@ -243,6 +243,7 @@ pub mod bluetoothremotegattserver;
pub mod bluetoothremotegattservice;
pub mod bluetoothuuid;
pub mod broadcastchannel;
+pub mod bytelengthqueuingstrategy;
pub mod canvasgradient;
pub mod canvaspattern;
pub mod canvasrenderingcontext2d;
@@ -256,6 +257,7 @@ pub mod comment;
pub mod compositionevent;
pub mod console;
pub mod constantsourcenode;
+pub mod countqueuingstrategy;
mod create;
pub mod crypto;
pub mod cryptokey;
@@ -283,6 +285,8 @@ pub mod datatransfer;
pub mod datatransferitem;
pub mod datatransferitemlist;
pub mod dedicatedworkerglobalscope;
+pub mod defaultteereadrequest;
+pub mod defaultteeunderlyingsource;
pub mod dissimilaroriginlocation;
pub mod dissimilaroriginwindow;
pub mod document;
@@ -475,7 +479,12 @@ pub mod promiserejectionevent;
pub mod radionodelist;
pub mod range;
pub mod raredata;
+pub mod readablebytestreamcontroller;
pub mod readablestream;
+pub mod readablestreambyobreader;
+pub mod readablestreambyobrequest;
+pub mod readablestreamdefaultcontroller;
+pub mod readablestreamdefaultreader;
pub mod request;
pub mod resizeobserver;
pub mod resizeobserverentry;
@@ -540,6 +549,7 @@ pub mod trackevent;
pub mod transitionevent;
pub mod treewalker;
pub mod uievent;
+pub mod underlyingsourcecontainer;
pub mod url;
pub mod urlhelper;
pub mod urlsearchparams;
diff --git a/components/script/dom/promise.rs b/components/script/dom/promise.rs
index 6e1a8fdfb63..b58caf9e825 100644
--- a/components/script/dom/promise.rs
+++ b/components/script/dom/promise.rs
@@ -24,9 +24,9 @@ use js::jsapi::{
};
use js::jsval::{Int32Value, JSVal, ObjectValue, UndefinedValue};
use js::rust::wrappers::{
- AddPromiseReactions, CallOriginalPromiseReject, CallOriginalPromiseResolve, GetPromiseState,
- IsPromiseObject, NewPromiseObject, RejectPromise, ResolvePromise,
- SetPromiseUserInputEventHandlingState,
+ AddPromiseReactions, CallOriginalPromiseReject, CallOriginalPromiseResolve,
+ GetPromiseIsHandled, GetPromiseState, IsPromiseObject, NewPromiseObject, RejectPromise,
+ ResolvePromise, SetAnyPromiseIsHandled, SetPromiseUserInputEventHandlingState,
};
use js::rust::{HandleObject, HandleValue, MutableHandleObject, Runtime};
@@ -285,6 +285,17 @@ impl Promise {
assert!(ok);
}
}
+
+ #[allow(unsafe_code)]
+ pub fn get_promise_is_handled(&self) -> bool {
+ unsafe { GetPromiseIsHandled(self.reflector().get_jsobject()) }
+ }
+
+ #[allow(unsafe_code)]
+ pub fn set_promise_is_handled(&self) -> bool {
+ let cx = GlobalScope::get_cx();
+ unsafe { SetAnyPromiseIsHandled(*cx, self.reflector().get_jsobject()) }
+ }
}
#[allow(unsafe_code)]
diff --git a/components/script/dom/readablebytestreamcontroller.rs b/components/script/dom/readablebytestreamcontroller.rs
new file mode 100644
index 00000000000..058fce5ad3c
--- /dev/null
+++ b/components/script/dom/readablebytestreamcontroller.rs
@@ -0,0 +1,64 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use dom_struct::dom_struct;
+use js::rust::HandleValue as SafeHandleValue;
+
+use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
+use crate::dom::bindings::import::module::{Error, Fallible};
+use crate::dom::bindings::reflector::Reflector;
+use crate::dom::bindings::root::{DomRoot, MutNullableDom};
+use crate::dom::readablestream::ReadableStream;
+use crate::script_runtime::JSContext as SafeJSContext;
+
+/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
+#[dom_struct]
+pub struct ReadableByteStreamController {
+ reflector_: Reflector,
+ stream: MutNullableDom<ReadableStream>,
+}
+
+impl ReadableByteStreamController {
+ pub fn set_stream(&self, stream: &ReadableStream) {
+ self.stream.set(Some(stream))
+ }
+}
+
+impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
+ /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
+ fn GetByobRequest(
+ &self,
+ ) -> Fallible<Option<DomRoot<super::readablestreambyobrequest::ReadableStreamBYOBRequest>>>
+ {
+ // TODO
+ Err(Error::NotFound)
+ }
+
+ /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
+ fn GetDesiredSize(&self) -> Option<f64> {
+ // TODO
+ None
+ }
+
+ /// <https://streams.spec.whatwg.org/#rbs-controller-close>
+ fn Close(&self) -> Fallible<()> {
+ // TODO
+ Err(Error::NotFound)
+ }
+
+ /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
+ fn Enqueue(
+ &self,
+ _chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
+ ) -> Fallible<()> {
+ // TODO
+ Err(Error::NotFound)
+ }
+
+ /// <https://streams.spec.whatwg.org/#rbs-controller-error>
+ fn Error(&self, _cx: SafeJSContext, _e: SafeHandleValue) -> Fallible<()> {
+ // TODO
+ Err(Error::NotFound)
+ }
+}
diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs
index 2c748391d5b..413b3f09743 100644
--- a/components/script/dom/readablestream.rs
+++ b/components/script/dom/readablestream.rs
@@ -2,103 +2,236 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-use std::cell::{Cell, RefCell};
-use std::os::raw::c_void;
+use std::cell::Cell;
use std::ptr::{self, NonNull};
use std::rc::Rc;
-use std::slice;
use dom_struct::dom_struct;
-use js::glue::{
- CreateReadableStreamUnderlyingSource, DeleteReadableStreamUnderlyingSource,
- ReadableStreamUnderlyingSourceTraps,
+use js::conversions::ToJSValConvertible;
+use js::jsapi::{Heap, JSObject};
+use js::jsval::{JSVal, ObjectValue, UndefinedValue};
+use js::rust::{
+ HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
+ MutableHandleValue as SafeMutableHandleValue,
};
-use js::jsapi::{
- AutoRequireNoGC, HandleObject, HandleValue, Heap, IsReadableStream, JSContext, JSObject,
- JS_GetArrayBufferViewData, NewReadableExternalSourceStreamObject, ReadableStreamClose,
- ReadableStreamDefaultReaderRead, ReadableStreamError, ReadableStreamGetReader,
- ReadableStreamIsDisturbed, ReadableStreamIsLocked, ReadableStreamIsReadable,
- ReadableStreamReaderMode, ReadableStreamReaderReleaseLock, ReadableStreamUnderlyingSource,
- ReadableStreamUpdateDataAvailableFromSource, UnwrapReadableStream,
-};
-use js::jsval::{JSVal, UndefinedValue};
-use js::rust::{HandleValue as SafeHandleValue, IntoHandle};
+use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
+use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
+ ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
+};
+use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
+use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
+use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult};
use crate::dom::bindings::error::Error;
-use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector};
-use crate::dom::bindings::root::DomRoot;
-use crate::dom::bindings::settings_stack::{AutoEntryScript, AutoIncumbentScript};
+use crate::dom::bindings::import::module::Fallible;
+use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
+use crate::dom::bindings::reflector::{DomObject, Reflector, reflect_dom_object, reflect_dom_object_with_proto};
+use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
+use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::bindings::utils::get_dictionary_property;
+use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
+use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
+use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
+use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
+use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
+use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm;
+use crate::dom::types::DefaultTeeUnderlyingSource;
+use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
use crate::js::conversions::FromJSValConvertible;
use crate::realms::{enter_realm, InRealm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
+use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
+
+/// The fulfillment handler for the reacting to sourceCancelPromise part of
+/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+struct SourceCancelPromiseFulfillmentHandler {
+ #[ignore_malloc_size_of = "Rc are hard"]
+ result: Rc<Promise>,
+}
+
+impl Callback for SourceCancelPromiseFulfillmentHandler {
+ /// The fulfillment handler for the reacting to sourceCancelPromise part of
+ /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
+ /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
+ fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
+ self.result.resolve_native(&());
+ }
+}
+
+/// The rejection handler for the reacting to sourceCancelPromise part of
+/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+struct SourceCancelPromiseRejectionHandler {
+ #[ignore_malloc_size_of = "Rc are hard"]
+ result: Rc<Promise>,
+}
+
+impl Callback for SourceCancelPromiseRejectionHandler {
+ /// The rejection handler for the reacting to sourceCancelPromise part of
+ /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
+ /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
+ fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
+ self.result.reject_native(&v);
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#readablestream-state>
+#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
+pub enum ReadableStreamState {
+ #[default]
+ Readable,
+ Closed,
+ Errored,
+}
-static UNDERLYING_SOURCE_TRAPS: ReadableStreamUnderlyingSourceTraps =
- ReadableStreamUnderlyingSourceTraps {
- requestData: Some(request_data),
- writeIntoReadRequestBuffer: Some(write_into_read_request_buffer),
- cancel: Some(cancel),
- onClosed: Some(close),
- onErrored: Some(error),
- finalize: Some(finalize),
- };
+/// <https://streams.spec.whatwg.org/#readablestream-controller>
+#[derive(JSTraceable, MallocSizeOf)]
+#[crown::unrooted_must_root_lint::must_root]
+pub enum ControllerType {
+ /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
+ Byte(MutNullableDom<ReadableByteStreamController>),
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
+ Default(MutNullableDom<ReadableStreamDefaultController>),
+}
+
+/// <https://streams.spec.whatwg.org/#readablestream-readerr>
+#[derive(JSTraceable, MallocSizeOf)]
+#[crown::unrooted_must_root_lint::must_root]
+pub enum ReaderType {
+ /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
+ #[allow(clippy::upper_case_acronyms)]
+ BYOB(MutNullableDom<ReadableStreamBYOBReader>),
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
+ Default(MutNullableDom<ReadableStreamDefaultReader>),
+}
+
+/// <https://streams.spec.whatwg.org/#create-readable-stream>
+#[allow(crown::unrooted_must_root)]
+fn create_readable_stream(
+ global: &GlobalScope,
+ underlying_source_type: UnderlyingSourceType,
+ queuing_strategy: QueuingStrategy,
+ can_gc: CanGc,
+) -> DomRoot<ReadableStream> {
+ // If highWaterMark was not passed, set it to 1.
+ let high_water_mark = queuing_strategy.highWaterMark.unwrap_or(1.0);
+
+ // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
+ let size_algorithm = queuing_strategy
+ .size
+ .unwrap_or(extract_size_algorithm(&QueuingStrategy::empty()));
+
+ // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
+ assert!(high_water_mark >= 0.0);
+
+ // Let stream be a new ReadableStream.
+ // Perform ! InitializeReadableStream(stream).
+ let stream = ReadableStream::new_with_proto(
+ global,
+ None,
+ ControllerType::Default(MutNullableDom::new(None)),
+ can_gc,
+ );
+ // Let controller be a new ReadableStreamDefaultController.
+ let controller = ReadableStreamDefaultController::new(
+ global,
+ underlying_source_type,
+ high_water_mark,
+ size_algorithm,
+ can_gc,
+ );
+
+ // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
+ // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
+ controller
+ .setup(stream.clone(), can_gc)
+ .expect("Setup of default controller cannot fail");
+
+ // Return stream.
+ stream
+}
+
+/// <https://streams.spec.whatwg.org/#rs-class>
#[dom_struct]
pub struct ReadableStream {
reflector_: Reflector,
- #[ignore_malloc_size_of = "SM handles JS values"]
- js_stream: Heap<*mut JSObject>,
- #[ignore_malloc_size_of = "SM handles JS values"]
- js_reader: Heap<*mut JSObject>,
- has_reader: Cell<bool>,
- #[ignore_malloc_size_of = "Rc is hard"]
- external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>,
+
+ /// <https://streams.spec.whatwg.org/#readablestream-controller>
+ /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
+ /// because it is never unset once set.
+ controller: ControllerType,
+
+ /// <https://streams.spec.whatwg.org/#readablestream-storederror>
+ #[ignore_malloc_size_of = "mozjs"]
+ stored_error: Heap<JSVal>,
+
+ /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
+ disturbed: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#readablestream-reader>
+ reader: ReaderType,
+
+ /// <https://streams.spec.whatwg.org/#readablestream-state>
+ state: Cell<ReadableStreamState>,
}
impl ReadableStream {
- fn new_inherited(
- external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>,
- ) -> ReadableStream {
+ #[allow(crown::unrooted_must_root)]
+ /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
+ fn new_inherited(controller: ControllerType) -> ReadableStream {
+ let reader = match &controller {
+ ControllerType::Default(_) => ReaderType::Default(MutNullableDom::new(None)),
+ ControllerType::Byte(_) => ReaderType::BYOB(MutNullableDom::new(None)),
+ };
ReadableStream {
reflector_: Reflector::new(),
- js_stream: Heap::default(),
- js_reader: Heap::default(),
- has_reader: Default::default(),
- external_underlying_source,
+ controller,
+ stored_error: Heap::default(),
+ disturbed: Default::default(),
+ reader,
+ state: Cell::new(Default::default()),
}
}
- fn new(
+ #[allow(crown::unrooted_must_root)]
+ fn new_with_proto(
global: &GlobalScope,
- external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>,
+ proto: Option<SafeHandleObject>,
+ controller: ControllerType,
+ can_gc: CanGc,
) -> DomRoot<ReadableStream> {
- reflect_dom_object(
- Box::new(ReadableStream::new_inherited(external_underlying_source)),
+ reflect_dom_object_with_proto(
+ Box::new(ReadableStream::new_inherited(controller)),
global,
- CanGc::note(),
+ proto,
+ can_gc,
)
}
- /// Used from RustCodegen.py
- #[allow(unsafe_code)]
- pub unsafe fn from_js(
- cx: SafeJSContext,
- obj: *mut JSObject,
- realm: InRealm,
- ) -> Result<DomRoot<ReadableStream>, ()> {
- if !IsReadableStream(obj) {
- return Err(());
+ /// Used as part of
+ /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ pub fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
+ match self.controller {
+ ControllerType::Default(ref ctrl) => ctrl.set(Some(controller)),
+ ControllerType::Byte(_) => {
+ unreachable!("set_default_controller called in setup of default controller.")
+ },
}
+ }
- let global = GlobalScope::from_safe_context(cx, realm);
-
- let stream = ReadableStream::new(&global, None);
- stream.js_stream.set(UnwrapReadableStream(obj));
-
- Ok(stream)
+ /// Used as part of
+ /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ pub fn assert_no_controller(&self) {
+ let has_no_controller = match self.controller {
+ ControllerType::Default(ref ctrl) => ctrl.get().is_none(),
+ ControllerType::Byte(ref ctrl) => ctrl.get().is_none(),
+ };
+ assert!(has_no_controller);
}
/// Build a stream backed by a Rust source that has already been read into memory.
@@ -109,426 +242,690 @@ impl ReadableStream {
) -> DomRoot<ReadableStream> {
let stream = ReadableStream::new_with_external_underlying_source(
global,
- ExternalUnderlyingSource::Memory(bytes.len()),
+ UnderlyingSourceType::Memory(bytes.len()),
+ can_gc,
);
- stream.enqueue_native(bytes, can_gc);
- stream.close_native();
+ stream.enqueue_native(bytes);
+ stream.controller_close_native();
stream
}
/// Build a stream backed by a Rust underlying source.
- #[allow(unsafe_code)]
+ /// Note: external sources are always paired with a default controller.
+ #[allow(crown::unrooted_must_root)]
pub fn new_with_external_underlying_source(
global: &GlobalScope,
- source: ExternalUnderlyingSource,
+ source: UnderlyingSourceType,
+ can_gc: CanGc,
) -> DomRoot<ReadableStream> {
- let _ar = enter_realm(global);
- let _ais = AutoIncumbentScript::new(global);
- let cx = GlobalScope::get_cx();
+ assert!(source.is_native());
+ let stream = ReadableStream::new_with_proto(
+ global,
+ None,
+ ControllerType::Default(MutNullableDom::new(None)),
+ can_gc,
+ );
+ let controller = ReadableStreamDefaultController::new(
+ global,
+ source,
+ 1.0,
+ extract_size_algorithm(&QueuingStrategy::empty()),
+ can_gc,
+ );
+ controller
+ .setup(stream.clone(), can_gc)
+ .expect("Setup of controller with external underlying source cannot fail");
+ stream
+ }
- let source = Rc::new(ExternalUnderlyingSourceController::new(source));
+ /// Call into the release steps of the controller,
+ pub fn perform_release_steps(&self) {
+ match self.controller {
+ ControllerType::Default(ref controller) => controller
+ .get()
+ .expect("Stream should have controller.")
+ .perform_release_steps(),
+ ControllerType::Byte(_) => todo!(),
+ }
+ }
- let stream = ReadableStream::new(global, Some(source.clone()));
+ /// Call into the pull steps of the controller,
+ /// as part of
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
+ pub fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
+ match self.controller {
+ ControllerType::Default(ref controller) => controller
+ .get()
+ .expect("Stream should have controller.")
+ .perform_pull_steps(read_request, can_gc),
+ ControllerType::Byte(_) => todo!(),
+ }
+ }
- unsafe {
- let js_wrapper = CreateReadableStreamUnderlyingSource(
- &UNDERLYING_SOURCE_TRAPS,
- &*source as *const _ as *const c_void,
- );
+ /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
+ pub fn add_read_request(&self, read_request: &ReadRequest) {
+ match self.reader {
+ // Assert: stream.[[reader]] implements ReadableStreamDefaultReader.
+ ReaderType::Default(ref reader) => {
+ let Some(reader) = reader.get() else {
+ panic!("Attempt to add a read request without having first acquired a reader.");
+ };
- rooted!(in(*cx)
- let js_stream = NewReadableExternalSourceStreamObject(
- *cx,
- js_wrapper,
- ptr::null_mut(),
- HandleObject::null(),
- )
- );
+ // Assert: stream.[[state]] is "readable".
+ assert!(self.is_readable());
- stream.js_stream.set(UnwrapReadableStream(js_stream.get()));
+ // Append readRequest to stream.[[reader]].[[readRequests]].
+ reader.add_read_request(read_request);
+ },
+ ReaderType::BYOB(_) => {
+ unreachable!("Adding a read request can only be done on a default reader.")
+ },
}
-
- stream
}
/// Get a pointer to the underlying JS object.
+ /// TODO: remove,
+ /// by using at call point the `ReadableStream` directly instead of a JSObject.
pub fn get_js_stream(&self) -> NonNull<JSObject> {
- NonNull::new(self.js_stream.get())
+ NonNull::new(*self.reflector().get_jsobject())
.expect("Couldn't get a non-null pointer to JS stream object.")
}
+ /// Endpoint to enqueue chunks directly from Rust.
+ /// Note: in other use cases this call happens via the controller.
+ pub fn enqueue_native(&self, bytes: Vec<u8>) {
+ match self.controller {
+ ControllerType::Default(ref controller) => controller
+ .get()
+ .expect("Stream should have controller.")
+ .enqueue_native(bytes),
+ _ => unreachable!(
+ "Enqueueing chunk to a stream from Rust on other than default controller"
+ ),
+ }
+ }
- #[allow(unsafe_code)]
- pub fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
- let global = self.global();
- let _ar = enter_realm(&*global);
- let cx = GlobalScope::get_cx();
-
- let handle = unsafe { self.js_stream.handle() };
+ /// <https://streams.spec.whatwg.org/#readable-stream-error>
+ pub fn error(&self, e: SafeHandleValue) {
+ // Assert: stream.[[state]] is "readable".
+ assert!(self.is_readable());
+ // Set stream.[[state]] to "errored".
+ self.state.set(ReadableStreamState::Errored);
+ // Set stream.[[storedError]] to e.
+ self.stored_error.set(e.get());
+
+ // Let reader be stream.[[reader]].
+ match self.reader {
+ ReaderType::Default(ref reader) => {
+ let Some(reader) = reader.get() else {
+ // If reader is undefined, return.
+ return;
+ };
+ reader.error(e);
+ },
+ // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
+ _ => todo!(),
+ }
+ }
- self.external_underlying_source
- .as_ref()
- .expect("No external source to enqueue bytes.")
- .enqueue_chunk(cx, handle, bytes, can_gc);
+ /// <https://streams.spec.whatwg.org/#readablestream-storederror>
+ pub fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
+ handle_mut.set(self.stored_error.get());
}
+ /// <https://streams.spec.whatwg.org/#readable-stream-error>
+ /// Note: in other use cases this call happens via the controller.
#[allow(unsafe_code)]
pub fn error_native(&self, error: Error) {
- let global = self.global();
- let _ar = enter_realm(&*global);
let cx = GlobalScope::get_cx();
-
- unsafe {
- rooted!(in(*cx) let mut js_error = UndefinedValue());
- error.to_jsval(*cx, &global, js_error.handle_mut());
- ReadableStreamError(
- *cx,
- self.js_stream.handle(),
- js_error.handle().into_handle(),
- );
- }
+ rooted!(in(*cx) let mut error_val = UndefinedValue());
+ unsafe { error.to_jsval(*cx, &self.global(), error_val.handle_mut()) };
+ self.error(error_val.handle());
}
- #[allow(unsafe_code)]
- pub fn close_native(&self) {
- let global = self.global();
- let _ar = enter_realm(&*global);
- let cx = GlobalScope::get_cx();
-
- let handle = unsafe { self.js_stream.handle() };
-
- self.external_underlying_source
- .as_ref()
- .expect("No external source to close.")
- .close(cx, handle);
+ /// Call into the controller's `Close` method.
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
+ pub fn controller_close_native(&self) {
+ match self.controller {
+ ControllerType::Default(ref controller) => {
+ let _ = controller
+ .get()
+ .expect("Stream should have controller.")
+ .Close();
+ },
+ ControllerType::Byte(_) => {
+ unreachable!("Native closing is only done on default controllers.")
+ },
+ }
}
- /// Does the stream have all data in memory?
+ /// Returns a boolean reflecting whether the stream has all data in memory.
+ /// Useful for native source integration only.
pub fn in_memory(&self) -> bool {
- self.external_underlying_source
- .as_ref()
- .map(|source| source.in_memory())
- .unwrap_or(false)
+ match self.controller {
+ ControllerType::Default(ref controller) => controller
+ .get()
+ .expect("Stream should have controller.")
+ .in_memory(),
+ ControllerType::Byte(_) => unreachable!(
+ "Checking if source is in memory for a stream with a non-default controller"
+ ),
+ }
}
/// Return bytes for synchronous use, if the stream has all data in memory.
+ /// Useful for native source integration only.
pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
- self.external_underlying_source
- .as_ref()
- .and_then(|source| source.get_in_memory_bytes())
+ match self.controller {
+ ControllerType::Default(ref controller) => controller
+ .get()
+ .expect("Stream should have controller.")
+ .get_in_memory_bytes(),
+ ControllerType::Byte(_) => {
+ unreachable!("Getting in-memory bytes for a stream with a non-default controller")
+ },
+ }
}
/// Acquires a reader and locks the stream,
/// must be done before `read_a_chunk`.
- #[allow(unsafe_code)]
- pub fn start_reading(&self) -> Result<(), ()> {
- if self.is_locked() || self.is_disturbed() {
- return Err(());
- }
+ /// Native call to
+ /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
+ pub fn acquire_default_reader(
+ &self,
+ can_gc: CanGc,
+ ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
+ // Let reader be a new ReadableStreamDefaultReader.
+ let reader = reflect_dom_object(
+ Box::new(ReadableStreamDefaultReader::new_inherited(
+ &self.global(),
+ can_gc,
+ )),
+ &*self.global(),
+ can_gc,
+ );
- let global = self.global();
- let _ar = enter_realm(&*global);
- let cx = GlobalScope::get_cx();
+ // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
+ reader.set_up(self, &self.global(), can_gc)?;
- unsafe {
- rooted!(in(*cx) let reader = ReadableStreamGetReader(
- *cx,
- self.js_stream.handle(),
- ReadableStreamReaderMode::Default,
- ));
+ // Return reader.
+ Ok(reader)
+ }
- // Note: the stream is locked to the reader.
- self.js_reader.set(reader.get());
+ pub fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
+ match self.controller {
+ ControllerType::Default(ref controller) => {
+ controller.get().expect("Stream should have controller.")
+ },
+ ControllerType::Byte(_) => unreachable!(
+ "Getting default controller for a stream with a non-default controller"
+ ),
}
-
- self.has_reader.set(true);
- Ok(())
}
/// Read a chunk from the stream,
/// must be called after `start_reading`,
/// and before `stop_reading`.
- #[allow(unsafe_code)]
- pub fn read_a_chunk(&self) -> Rc<Promise> {
- if !self.has_reader.get() {
- panic!("Attempt to read stream chunk without having acquired a reader.");
- }
-
- let global = self.global();
- let _ar = enter_realm(&*global);
- let _aes = AutoEntryScript::new(&global);
-
- let cx = GlobalScope::get_cx();
-
- unsafe {
- rooted!(in(*cx) let promise_obj = ReadableStreamDefaultReaderRead(
- *cx,
- self.js_reader.handle(),
- ));
- Promise::new_with_js_promise(promise_obj.handle(), cx)
+ /// Native call to
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
+ pub fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
+ match self.reader {
+ ReaderType::Default(ref reader) => {
+ let Some(reader) = reader.get() else {
+ unreachable!(
+ "Attempt to read stream chunk without having first acquired a reader."
+ );
+ };
+ reader.Read(can_gc)
+ },
+ ReaderType::BYOB(_) => {
+ unreachable!("Native reading of a chunk can only be done with a default reader.")
+ },
}
}
/// Releases the lock on the reader,
/// must be done after `start_reading`.
- #[allow(unsafe_code)]
+ /// Native call to
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
pub fn stop_reading(&self) {
- if !self.has_reader.get() {
- panic!("ReadableStream::stop_reading called on a readerless stream.");
- }
-
- self.has_reader.set(false);
-
- let global = self.global();
- let _ar = enter_realm(&*global);
- let cx = GlobalScope::get_cx();
-
- unsafe {
- ReadableStreamReaderReleaseLock(*cx, self.js_reader.handle());
- // Note: is this the way to nullify the Heap?
- self.js_reader.set(ptr::null_mut());
+ match self.reader {
+ ReaderType::Default(ref reader) => {
+ let Some(reader) = reader.get() else {
+ unreachable!("Attempt to stop reading without having first acquired a reader.");
+ };
+ reader.release();
+ },
+ ReaderType::BYOB(_) => {
+ unreachable!("Native stop reading can only be done with a default reader.")
+ },
}
}
- #[allow(unsafe_code)]
+ /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
pub fn is_locked(&self) -> bool {
- // If we natively took a reader, we're locked.
- if self.has_reader.get() {
- return true;
+ match self.reader {
+ ReaderType::Default(ref reader) => reader.get().is_some(),
+ ReaderType::BYOB(ref reader) => reader.get().is_some(),
}
-
- // Otherwise, still double-check that script didn't lock the stream.
- let cx = GlobalScope::get_cx();
- let mut locked_or_disturbed = false;
-
- unsafe {
- ReadableStreamIsLocked(*cx, self.js_stream.handle(), &mut locked_or_disturbed);
- }
-
- locked_or_disturbed
}
- #[allow(unsafe_code)]
pub fn is_disturbed(&self) -> bool {
- // Check that script didn't disturb the stream.
- let cx = GlobalScope::get_cx();
- let mut locked_or_disturbed = false;
-
- unsafe {
- ReadableStreamIsDisturbed(*cx, self.js_stream.handle(), &mut locked_or_disturbed);
- }
-
- locked_or_disturbed
+ self.disturbed.get()
}
-}
-#[allow(unsafe_code)]
-unsafe extern "C" fn request_data(
- source: *const c_void,
- cx: *mut JSContext,
- stream: HandleObject,
- desired_size: usize,
-) {
- let source = &*(source as *const ExternalUnderlyingSourceController);
- source.pull(
- SafeJSContext::from_ptr(cx),
- stream,
- desired_size,
- CanGc::note(),
- );
-}
-
-#[allow(unsafe_code)]
-unsafe extern "C" fn write_into_read_request_buffer(
- source: *const c_void,
- _cx: *mut JSContext,
- _stream: HandleObject,
- chunk: HandleObject,
- length: usize,
- bytes_written: *mut usize,
-) {
- let source = &*(source as *const ExternalUnderlyingSourceController);
- let mut is_shared_memory = false;
- let buffer = JS_GetArrayBufferViewData(
- *chunk,
- &mut is_shared_memory,
- &AutoRequireNoGC { _address: 0 },
- );
- assert!(!is_shared_memory);
- let slice = slice::from_raw_parts_mut(buffer as *mut u8, length);
- source.write_into_buffer(slice);
-
- // Currently we're always able to completely fulfill the write request.
- *bytes_written = length;
-}
-
-#[allow(unsafe_code)]
-unsafe extern "C" fn cancel(
- _source: *const c_void,
- _cx: *mut JSContext,
- _stream: HandleObject,
- _reason: HandleValue,
- _resolve_to: *mut JSVal,
-) {
-}
-
-#[allow(unsafe_code)]
-unsafe extern "C" fn close(_source: *const c_void, _cx: *mut JSContext, _stream: HandleObject) {}
+ pub fn set_is_disturbed(&self, disturbed: bool) {
+ self.disturbed.set(disturbed);
+ }
-#[allow(unsafe_code)]
-unsafe extern "C" fn error(
- _source: *const c_void,
- _cx: *mut JSContext,
- _stream: HandleObject,
- _reason: HandleValue,
-) {
-}
+ pub fn is_closed(&self) -> bool {
+ self.state.get() == ReadableStreamState::Closed
+ }
-#[allow(unsafe_code)]
-unsafe extern "C" fn finalize(source: *mut ReadableStreamUnderlyingSource) {
- DeleteReadableStreamUnderlyingSource(source);
-}
+ pub fn is_errored(&self) -> bool {
+ self.state.get() == ReadableStreamState::Errored
+ }
-pub enum ExternalUnderlyingSource {
- /// Facilitate partial integration with sources
- /// that are currently read into memory.
- Memory(usize),
- /// A blob as underlying source, with a known total size.
- Blob(usize),
- /// A fetch response as underlying source.
- FetchResponse,
-}
+ pub fn is_readable(&self) -> bool {
+ self.state.get() == ReadableStreamState::Readable
+ }
-#[derive(JSTraceable, MallocSizeOf)]
-struct ExternalUnderlyingSourceController {
- /// Loosely matches the underlying queue,
- /// <https://streams.spec.whatwg.org/#internal-queues>
- buffer: RefCell<Vec<u8>>,
- /// Has the stream been closed by native code?
- closed: Cell<bool>,
- /// Does this stream contains all it's data in memory?
- in_memory: Cell<bool>,
-}
+ pub fn has_default_reader(&self) -> bool {
+ match self.reader {
+ ReaderType::Default(ref reader) => reader.get().is_some(),
+ ReaderType::BYOB(_) => false,
+ }
+ }
-impl ExternalUnderlyingSourceController {
- fn new(source: ExternalUnderlyingSource) -> ExternalUnderlyingSourceController {
- let (buffer, in_mem) = match source {
- ExternalUnderlyingSource::Blob(size) => (Vec::with_capacity(size), false),
- ExternalUnderlyingSource::Memory(size) => (Vec::with_capacity(size), true),
- ExternalUnderlyingSource::FetchResponse => (vec![], false),
- };
- ExternalUnderlyingSourceController {
- buffer: RefCell::new(buffer),
- closed: Cell::new(false),
- in_memory: Cell::new(in_mem),
+ /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
+ pub fn get_num_read_requests(&self) -> usize {
+ assert!(self.has_default_reader());
+ match self.reader {
+ ReaderType::Default(ref reader) => {
+ let reader = reader
+ .get()
+ .expect("Stream must have a reader when get num read requests is called into.");
+ reader.get_num_read_requests()
+ },
+ ReaderType::BYOB(_) => unreachable!(
+ "Stream must have a default reader when get num read requests is called into."
+ ),
}
}
- /// Does the stream have all data in memory?
- pub fn in_memory(&self) -> bool {
- self.in_memory.get()
+ /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
+ #[allow(crown::unrooted_must_root)]
+ pub fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool) {
+ // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
+ assert!(self.has_default_reader());
+ match self.reader {
+ ReaderType::Default(ref reader) => {
+ // step 2 - Let reader be stream.[[reader]].
+ let reader = reader
+ .get()
+ .expect("Stream must have a reader when a read request is fulfilled.");
+ // step 3 - Assert: reader.[[readRequests]] is not empty.
+ assert_ne!(reader.get_num_read_requests(), 0);
+ // step 4 & 5
+ // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
+ let request = reader.remove_read_request();
+
+ if done {
+ // step 6 - If done is true, perform readRequest’s close steps.
+ request.close_steps();
+ } else {
+ // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
+ let result = RootedTraceableBox::new(Heap::default());
+ result.set(*chunk);
+ request.chunk_steps(result);
+ }
+ },
+ ReaderType::BYOB(_) => unreachable!(
+ "Stream must have a default reader when fulfill read requests is called into."
+ ),
+ }
}
- /// Return bytes synchronously if the stream has all data in memory.
- pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
- if self.in_memory.get() {
- return Some(self.buffer.borrow().clone());
+ /// <https://streams.spec.whatwg.org/#readable-stream-close>
+ pub fn close(&self) {
+ // Assert: stream.[[state]] is "readable".
+ assert!(self.is_readable());
+ // Set stream.[[state]] to "closed".
+ self.state.set(ReadableStreamState::Closed);
+ // Let reader be stream.[[reader]].
+ match self.reader {
+ ReaderType::Default(ref reader) => {
+ let Some(reader) = reader.get() else {
+ // If reader is undefined, return.
+ return;
+ };
+ // step 5 & 6
+ reader.close();
+ },
+ ReaderType::BYOB(ref _reader) => todo!(),
}
- None
}
- /// Signal available bytes if the stream is currently readable.
- /// The apparently unused CanGc argument represents that the JS API calls like
- /// `ReadableStreamUpdateDataAvailableFromSource` can trigger a GC.
+ /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
#[allow(unsafe_code)]
- fn maybe_signal_available_bytes(
- &self,
- cx: SafeJSContext,
- stream: HandleObject,
- available: usize,
- _can_gc: CanGc,
- ) {
- if available == 0 {
- return;
+ pub fn cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
+ // Set stream.[[disturbed]] to true.
+ self.disturbed.set(true);
+
+ // If stream.[[state]] is "closed", return a promise resolved with undefined.
+ if self.is_closed() {
+ let promise = Promise::new(&self.reflector_.global(), can_gc);
+ promise.resolve_native(&());
+ return promise;
}
- unsafe {
- let mut readable = false;
- if !ReadableStreamIsReadable(*cx, stream, &mut readable) {
- return;
- }
- if readable {
- ReadableStreamUpdateDataAvailableFromSource(*cx, stream, available as u32);
+ // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
+ if self.is_errored() {
+ let promise = Promise::new(&self.reflector_.global(), can_gc);
+ unsafe {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ self.stored_error.to_jsval(*cx, rval.handle_mut());
+ promise.reject_native(&rval.handle());
+ return promise;
}
}
+ // Perform ! ReadableStreamClose(stream).
+ self.close();
+ // step 5, 6, 7, 8
+ // TODO: run the bytes reader steps.
+
+ // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
+ let source_cancel_promise = match self.controller {
+ ControllerType::Default(ref controller) => controller
+ .get()
+ .expect("Stream should have controller.")
+ .perform_cancel_steps(reason, can_gc),
+ ControllerType::Byte(_) => {
+ todo!()
+ },
+ };
+
+ // Create a new promise,
+ // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
+ let global = self.reflector_.global();
+ let result_promise = Promise::new(&global, can_gc);
+ let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
+ result: result_promise.clone(),
+ });
+ let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
+ result: result_promise.clone(),
+ });
+ let handler =
+ PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler));
+ let realm = enter_realm(&*global);
+ let comp = InRealm::Entered(&realm);
+ source_cancel_promise.append_native_handler(&handler, comp, can_gc);
+
+ // Return the result of reacting to sourceCancelPromise
+ // with a fulfillment step that returns undefined.
+ result_promise
}
- /// Close a currently readable js stream.
- #[allow(unsafe_code)]
- fn maybe_close_js_stream(&self, cx: SafeJSContext, stream: HandleObject) {
- unsafe {
- let mut readable = false;
- if !ReadableStreamIsReadable(*cx, stream, &mut readable) {
- return;
- }
- if readable {
- ReadableStreamClose(*cx, stream);
- }
+ pub fn set_reader(&self, new_reader: Option<&ReadableStreamDefaultReader>) {
+ match self.reader {
+ ReaderType::Default(ref reader) => {
+ reader.set(new_reader);
+ },
+ ReaderType::BYOB(_) => {
+ unreachable!("Setting a reader can only be done on a default reader.")
+ },
}
}
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
+ #[allow(crown::unrooted_must_root)]
+ fn default_tee(
+ &self,
+ clone_for_branch_2: bool,
+ can_gc: CanGc,
+ ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
+ // Assert: stream implements ReadableStream.
+
+ // Assert: cloneForBranch2 is a boolean.
+ let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
+
+ // Let reader be ? AcquireReadableStreamDefaultReader(stream).
+ let reader = self.acquire_default_reader(can_gc)?;
+ self.set_reader(Some(&reader));
+
+ // Let reading be false.
+ let reading = Rc::new(Cell::new(false));
+ // Let readAgain be false.
+ let read_again = Rc::new(Cell::new(false));
+ // Let canceled1 be false.
+ let canceled_1 = Rc::new(Cell::new(false));
+ // Let canceled2 be false.
+ let canceled_2 = Rc::new(Cell::new(false));
+
+ // Let reason1 be undefined.
+ let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
+ // Let reason2 be undefined.
+ let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
+ // Let cancelPromise be a new promise.
+ let cancel_promise = Promise::new(&self.reflector_.global(), can_gc);
+
+ let tee_source_1 = DefaultTeeUnderlyingSource::new(
+ &reader,
+ self,
+ reading.clone(),
+ read_again.clone(),
+ canceled_1.clone(),
+ canceled_2.clone(),
+ clone_for_branch_2.clone(),
+ reason_1.clone(),
+ reason_2.clone(),
+ cancel_promise.clone(),
+ TeeCancelAlgorithm::Cancel1Algorithm,
+ can_gc,
+ );
+
+ let underlying_source_type_branch_1 =
+ UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
+
+ let tee_source_2 = DefaultTeeUnderlyingSource::new(
+ &reader,
+ self,
+ reading,
+ read_again,
+ canceled_1.clone(),
+ canceled_2.clone(),
+ clone_for_branch_2,
+ reason_1,
+ reason_2,
+ cancel_promise.clone(),
+ TeeCancelAlgorithm::Cancel2Algorithm,
+ can_gc,
+ );
+
+ let underlying_source_type_branch_2 =
+ UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
+
+ // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
+ let branch_1 = create_readable_stream(
+ &self.reflector_.global(),
+ underlying_source_type_branch_1,
+ QueuingStrategy::empty(),
+ can_gc,
+ );
+ tee_source_1.set_branch_1(&branch_1);
+ tee_source_2.set_branch_1(&branch_1);
+
+ // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
+ let branch_2 = create_readable_stream(
+ &self.reflector_.global(),
+ underlying_source_type_branch_2,
+ QueuingStrategy::empty(),
+ can_gc,
+ );
+ tee_source_1.set_branch_2(&branch_2);
+ tee_source_2.set_branch_2(&branch_2);
+
+ // Upon rejection of reader.[[closedPromise]] with reason r,
+ reader.append_native_handler_to_closed_promise(
+ &branch_1,
+ &branch_2,
+ canceled_1,
+ canceled_2,
+ cancel_promise,
+ can_gc,
+ );
- fn close(&self, cx: SafeJSContext, stream: HandleObject) {
- self.closed.set(true);
- self.maybe_close_js_stream(cx, stream);
+ // Return « branch_1, branch_2 ».
+ Ok(vec![branch_1, branch_2])
}
- fn enqueue_chunk(
+ /// <https://streams.spec.whatwg.org/#readable-stream-tee>
+ fn tee(
&self,
+ clone_for_branch_2: bool,
+ can_gc: CanGc,
+ ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
+ // Assert: stream implements ReadableStream.
+ // Assert: cloneForBranch2 is a boolean.
+
+ match self.controller {
+ ControllerType::Default(ref _controller) => {
+ // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
+ self.default_tee(clone_for_branch_2, can_gc)
+ },
+ ControllerType::Byte(ref _controller) => {
+ // If stream.[[controller]] implements ReadableByteStreamController,
+ // return ? ReadableByteStreamTee(stream).
+ todo!()
+ },
+ }
+ }
+}
+
+impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
+ /// <https://streams.spec.whatwg.org/#rs-constructor>
+ fn Constructor(
cx: SafeJSContext,
- stream: HandleObject,
- mut chunk: Vec<u8>,
+ global: &GlobalScope,
+ proto: Option<SafeHandleObject>,
can_gc: CanGc,
- ) {
- let available = {
- let mut buffer = self.buffer.borrow_mut();
- buffer.append(&mut chunk);
- buffer.len()
+ underlying_source: Option<*mut JSObject>,
+ strategy: &QueuingStrategy,
+ ) -> Fallible<DomRoot<Self>> {
+ // If underlyingSource is missing, set it to null.
+ rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
+ // Let underlyingSourceDict be underlyingSource,
+ // converted to an IDL value of type UnderlyingSource.
+ let underlying_source_dict = if !underlying_source_obj.is_null() {
+ rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
+ match JsUnderlyingSource::new(cx, obj_val.handle()) {
+ Ok(ConversionResult::Success(val)) => val,
+ Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
+ _ => {
+ return Err(Error::JSFailed);
+ },
+ }
+ } else {
+ JsUnderlyingSource::empty()
};
- self.maybe_signal_available_bytes(cx, stream, available, can_gc);
- }
- #[allow(unsafe_code)]
- fn pull(&self, cx: SafeJSContext, stream: HandleObject, _desired_size: usize, can_gc: CanGc) {
- // Note: for pull sources,
- // this would be the time to ask for a chunk.
+ // Perform ! InitializeReadableStream(this).
+ let stream = if underlying_source_dict.type_.is_some() {
+ ReadableStream::new_with_proto(
+ global,
+ proto,
+ ControllerType::Byte(MutNullableDom::new(None)),
+ can_gc,
+ )
+ } else {
+ ReadableStream::new_with_proto(
+ global,
+ proto,
+ ControllerType::Default(MutNullableDom::new(None)),
+ can_gc,
+ )
+ };
- if self.closed.get() {
- return self.maybe_close_js_stream(cx, stream);
- }
+ if underlying_source_dict.type_.is_some() {
+ // TODO: If underlyingSourceDict["type"] is "bytes"
+ return Err(Error::Type("Bytes streams not implemented".to_string()));
+ } else {
+ // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
+ let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
+
+ // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
+ let size_algorithm = extract_size_algorithm(strategy);
+
+ let controller = ReadableStreamDefaultController::new(
+ global,
+ UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
+ high_water_mark,
+ size_algorithm,
+ can_gc,
+ );
+
+ // Note: this must be done before `setup`,
+ // otherwise `thisOb` is null in the start callback.
+ controller.set_underlying_source_this_object(underlying_source_obj.handle());
- let available = {
- let buffer = self.buffer.borrow();
- buffer.len()
+ // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
+ controller.setup(stream.clone(), can_gc)?;
};
- self.maybe_signal_available_bytes(cx, stream, available, can_gc);
+ Ok(stream)
}
- fn get_chunk_with_length(&self, length: usize) -> Vec<u8> {
- let mut buffer = self.buffer.borrow_mut();
- let buffer_len = buffer.len();
- assert!(buffer_len >= length);
- buffer.split_off(buffer_len - length)
+ /// <https://streams.spec.whatwg.org/#rs-locked>
+ fn Locked(&self) -> bool {
+ self.is_locked()
}
- fn write_into_buffer(&self, dest: &mut [u8]) {
- let length = dest.len();
- let chunk = self.get_chunk_with_length(length);
- dest.copy_from_slice(chunk.as_slice());
+ /// <https://streams.spec.whatwg.org/#rs-cancel>
+ fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
+ if self.is_locked() {
+ // If ! IsReadableStreamLocked(this) is true,
+ // return a promise rejected with a TypeError exception.
+ let promise = Promise::new(&self.reflector_.global(), can_gc);
+ promise.reject_error(Error::Type("stream is not locked".to_owned()));
+ promise
+ } else {
+ // Return ! ReadableStreamCancel(this, reason).
+ self.cancel(reason, can_gc)
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-get-reader>
+ fn GetReader(
+ &self,
+ options: &ReadableStreamGetReaderOptions,
+ can_gc: CanGc,
+ ) -> Fallible<ReadableStreamReader> {
+ // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
+ if options.mode.is_none() {
+ return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
+ self.acquire_default_reader(can_gc)?,
+ ));
+ }
+ // 2. Assert: options["mode"] is "byob".
+ assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
+
+ // 3. Return ? AcquireReadableStreamBYOBReader(this).
+ Err(Error::Type(
+ "AcquireReadableStreamBYOBReader is not implemented".to_owned(),
+ ))
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-tee>
+ fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
+ // Return ? ReadableStreamTee(this, false).
+ self.tee(false, can_gc)
}
}
#[allow(unsafe_code)]
/// Get the `done` property of an object that a read promise resolved to.
pub fn get_read_promise_done(cx: SafeJSContext, v: &SafeHandleValue) -> Result<bool, Error> {
+ if !v.is_object() {
+ return Err(Error::Type("Unknown format for done property.".to_string()));
+ }
unsafe {
rooted!(in(*cx) let object = v.to_object());
rooted!(in(*cx) let mut done = UndefinedValue());
@@ -547,6 +944,11 @@ pub fn get_read_promise_done(cx: SafeJSContext, v: &SafeHandleValue) -> Result<b
#[allow(unsafe_code)]
/// Get the `value` property of an object that a read promise resolved to.
pub fn get_read_promise_bytes(cx: SafeJSContext, v: &SafeHandleValue) -> Result<Vec<u8>, Error> {
+ if !v.is_object() {
+ return Err(Error::Type(
+ "Unknown format for for bytes read.".to_string(),
+ ));
+ }
unsafe {
rooted!(in(*cx) let object = v.to_object());
rooted!(in(*cx) let mut bytes = UndefinedValue());
diff --git a/components/script/dom/readablestreambyobreader.rs b/components/script/dom/readablestreambyobreader.rs
new file mode 100644
index 00000000000..556fe15ea54
--- /dev/null
+++ b/components/script/dom/readablestreambyobreader.rs
@@ -0,0 +1,81 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#![allow(dead_code)]
+
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::gc::CustomAutoRooterGuard;
+use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
+use js::typedarray::ArrayBufferView;
+
+use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods;
+use crate::dom::bindings::error::Error;
+use crate::dom::bindings::import::module::Fallible;
+use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector};
+use crate::dom::bindings::root::DomRoot;
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::readablestream::ReadableStream;
+use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
+
+/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
+#[dom_struct]
+pub struct ReadableStreamBYOBReader {
+ reflector_: Reflector,
+}
+
+impl ReadableStreamBYOBReader {
+ fn new_inherited() -> ReadableStreamBYOBReader {
+ ReadableStreamBYOBReader {
+ reflector_: Reflector::new(),
+ }
+ }
+
+ fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
+ reflect_dom_object(
+ Box::new(ReadableStreamBYOBReader::new_inherited()),
+ global,
+ can_gc,
+ )
+ }
+}
+
+impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
+ /// <https://streams.spec.whatwg.org/#byob-reader-constructor>
+ fn Constructor(
+ _global: &GlobalScope,
+ _proto: Option<SafeHandleObject>,
+ _can_gc: CanGc,
+ _stream: &ReadableStream,
+ ) -> Fallible<DomRoot<Self>> {
+ // TODO
+ Err(Error::NotFound)
+ }
+
+ /// <https://streams.spec.whatwg.org/#byob-reader-read>
+ fn Read(&self, _view: CustomAutoRooterGuard<ArrayBufferView>, can_gc: CanGc) -> Rc<Promise> {
+ // TODO
+ Promise::new(&self.reflector_.global(), can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
+ fn ReleaseLock(&self) -> Fallible<()> {
+ // TODO
+ Err(Error::NotFound)
+ }
+
+ /// <https://streams.spec.whatwg.org/#generic-reader-closed>
+ fn Closed(&self, can_gc: CanGc) -> Rc<Promise> {
+ // TODO
+ Promise::new(&self.reflector_.global(), can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
+ fn Cancel(&self, _cx: SafeJSContext, _reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
+ // TODO
+ Promise::new(&self.reflector_.global(), can_gc)
+ }
+}
diff --git a/components/script/dom/readablestreambyobrequest.rs b/components/script/dom/readablestreambyobrequest.rs
new file mode 100644
index 00000000000..e2f7199d170
--- /dev/null
+++ b/components/script/dom/readablestreambyobrequest.rs
@@ -0,0 +1,39 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use dom_struct::dom_struct;
+
+use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBRequestBinding::ReadableStreamBYOBRequestMethods;
+use crate::dom::bindings::import::module::{Error, Fallible};
+use crate::dom::bindings::reflector::Reflector;
+use crate::script_runtime::JSContext as SafeJSContext;
+
+/// <https://streams.spec.whatwg.org/#readablestreambyobrequest>
+#[dom_struct]
+pub struct ReadableStreamBYOBRequest {
+ reflector_: Reflector,
+}
+
+impl ReadableStreamBYOBRequestMethods<crate::DomTypeHolder> for ReadableStreamBYOBRequest {
+ /// <https://streams.spec.whatwg.org/#rs-byob-request-view>
+ fn GetView(&self, _cx: SafeJSContext) -> Option<js::typedarray::ArrayBufferView> {
+ // TODO
+ None
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-byob-request-respond>
+ fn Respond(&self, _bytes_written: u64) -> Fallible<()> {
+ // TODO
+ Err(Error::NotFound)
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-byob-request-respond-with-new-view>
+ fn RespondWithNewView(
+ &self,
+ _view: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
+ ) -> Fallible<()> {
+ // TODO
+ Err(Error::NotFound)
+ }
+}
diff --git a/components/script/dom/readablestreamdefaultcontroller.rs b/components/script/dom/readablestreamdefaultcontroller.rs
new file mode 100644
index 00000000000..1496031d998
--- /dev/null
+++ b/components/script/dom/readablestreamdefaultcontroller.rs
@@ -0,0 +1,878 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
+use std::ptr;
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::{Heap, JSObject};
+use js::jsval::{JSVal, UndefinedValue};
+use js::rust::wrappers::JS_GetPendingException;
+use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
+use js::typedarray::Uint8;
+
+use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
+use crate::dom::bindings::buffer_source::create_buffer_source;
+use crate::dom::bindings::callback::ExceptionHandling;
+use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
+use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
+use crate::dom::bindings::import::module::{throw_dom_exception, Error, Fallible};
+use crate::dom::bindings::refcounted::Trusted;
+use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector};
+use crate::dom::bindings::root::{DomRoot, MutNullableDom};
+use crate::dom::bindings::trace::RootedTraceableBox;
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
+use crate::dom::readablestream::ReadableStream;
+use crate::dom::readablestreamdefaultreader::ReadRequest;
+use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
+use crate::js::conversions::ToJSValConvertible;
+use crate::realms::{enter_realm, InRealm};
+use crate::script_runtime::{CanGc, JSContext, JSContext as SafeJSContext};
+
+/// The fulfillment handler for
+/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+struct PullAlgorithmFulfillmentHandler {
+ #[ignore_malloc_size_of = "Trusted are hard"]
+ controller: Trusted<ReadableStreamDefaultController>,
+}
+
+impl Callback for PullAlgorithmFulfillmentHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+ /// Upon fulfillment of pullPromise
+ fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
+ let controller = self.controller.root();
+
+ // Set controller.[[pulling]] to false.
+ controller.pulling.set(false);
+
+ // If controller.[[pullAgain]] is true,
+ if controller.pull_again.get() {
+ // Set controller.[[pullAgain]] to false.
+ controller.pull_again.set(false);
+
+ // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
+ controller.call_pull_if_needed(can_gc);
+ }
+ }
+}
+
+/// The rejection handler for
+/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+struct PullAlgorithmRejectionHandler {
+ #[ignore_malloc_size_of = "Trusted are hard"]
+ controller: Trusted<ReadableStreamDefaultController>,
+}
+
+impl Callback for PullAlgorithmRejectionHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+ /// Upon rejection of pullPromise with reason e.
+ fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
+ let controller = self.controller.root();
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, e).
+ controller.error(v);
+ }
+}
+
+/// The fulfillment handler for
+/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+struct StartAlgorithmFulfillmentHandler {
+ #[ignore_malloc_size_of = "Trusted are hard"]
+ controller: Trusted<ReadableStreamDefaultController>,
+}
+
+impl Callback for StartAlgorithmFulfillmentHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ /// Upon fulfillment of startPromise,
+ fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
+ let controller = self.controller.root();
+
+ // Set controller.[[started]] to true.
+ controller.started.set(true);
+
+ // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
+ controller.call_pull_if_needed(can_gc);
+ }
+}
+
+/// The rejection handler for
+/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[allow(crown::unrooted_must_root)]
+struct StartAlgorithmRejectionHandler {
+ #[ignore_malloc_size_of = "Trusted are hard"]
+ controller: Trusted<ReadableStreamDefaultController>,
+}
+
+impl Callback for StartAlgorithmRejectionHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ /// Upon rejection of startPromise with reason r,
+ fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
+ let controller = self.controller.root();
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, r).
+ controller.error(v);
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#value-with-size>
+#[derive(JSTraceable)]
+#[crown::unrooted_must_root_lint::must_root]
+pub struct ValueWithSize {
+ value: Box<Heap<JSVal>>,
+ size: f64,
+}
+
+/// <https://streams.spec.whatwg.org/#value-with-size>
+#[derive(JSTraceable)]
+#[crown::unrooted_must_root_lint::must_root]
+pub enum EnqueuedValue {
+ /// A value enqueued from Rust.
+ Native(Box<[u8]>),
+ /// A Js value.
+ Js(ValueWithSize),
+}
+
+impl EnqueuedValue {
+ fn size(&self) -> f64 {
+ match self {
+ EnqueuedValue::Native(v) => v.len() as f64,
+ EnqueuedValue::Js(v) => v.size,
+ }
+ }
+
+ #[allow(unsafe_code)]
+ fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue) {
+ match self {
+ EnqueuedValue::Native(chunk) => {
+ rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
+ create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut())
+ .expect("failed to create buffer source for native chunk.");
+ unsafe { array_buffer_ptr.to_jsval(*cx, rval) };
+ },
+ EnqueuedValue::Js(value_with_size) => unsafe {
+ value_with_size.value.to_jsval(*cx, rval);
+ },
+ }
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#is-non-negative-number>
+fn is_non_negative_number(value: &EnqueuedValue) -> bool {
+ let value_with_size = match value {
+ EnqueuedValue::Native(_) => return true,
+ EnqueuedValue::Js(value_with_size) => value_with_size,
+ };
+
+ // If v is not a Number, return false.
+ // Checked as part of the WebIDL.
+
+ // If v is NaN, return false.
+ if value_with_size.size.is_nan() {
+ return false;
+ }
+
+ // If v < 0, return false.
+ if value_with_size.size.is_sign_negative() {
+ return false;
+ }
+
+ true
+}
+
+/// <https://streams.spec.whatwg.org/#queue-with-sizes>
+#[derive(Default, JSTraceable, MallocSizeOf)]
+#[crown::unrooted_must_root_lint::must_root]
+pub struct QueueWithSizes {
+ #[ignore_malloc_size_of = "EnqueuedValue::Js"]
+ queue: VecDeque<EnqueuedValue>,
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
+ total_size: f64,
+}
+
+impl QueueWithSizes {
+ /// <https://streams.spec.whatwg.org/#dequeue-value>
+ #[allow(crown::unrooted_must_root)]
+ fn dequeue_value(&mut self) -> EnqueuedValue {
+ let value = self
+ .queue
+ .pop_front()
+ .expect("Buffer cannot be empty when dequeue value is called into.");
+ self.total_size -= value.size();
+ value
+ }
+
+ /// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
+ #[allow(crown::unrooted_must_root)]
+ fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> {
+ // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
+ if !is_non_negative_number(&value) {
+ return Err(Error::Range(
+ "The size of the enqueued chunk is not a non-negative number.".to_string(),
+ ));
+ }
+
+ // If size is +∞, throw a RangeError exception.
+ if value.size().is_infinite() {
+ return Err(Error::Range(
+ "The size of the enqueued chunk is infinite.".to_string(),
+ ));
+ }
+
+ self.total_size += value.size();
+ self.queue.push_back(value);
+
+ Ok(())
+ }
+
+ fn is_empty(&self) -> bool {
+ self.queue.is_empty()
+ }
+
+ /// Only used with native sources.
+ fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
+ self.queue
+ .iter()
+ .try_fold(Vec::new(), |mut acc, value| match value {
+ EnqueuedValue::Native(chunk) => {
+ acc.extend(chunk.iter().copied());
+ Some(acc)
+ },
+ _ => {
+ warn!("get_in_memory_bytes called on a controller with non-native source.");
+ None
+ },
+ })
+ }
+
+ /// <https://streams.spec.whatwg.org/#reset-queue>
+ fn reset(&mut self) {
+ self.queue.clear();
+ self.total_size = Default::default();
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
+#[dom_struct]
+pub struct ReadableStreamDefaultController {
+ reflector_: Reflector,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
+ queue: RefCell<QueueWithSizes>,
+
+ /// A mutable reference to the underlying source is used to implement these two
+ /// internal slots:
+ ///
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
+ underlying_source: MutNullableDom<UnderlyingSourceContainer>,
+
+ stream: MutNullableDom<ReadableStream>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
+ strategy_hwm: f64,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
+ #[ignore_malloc_size_of = "mozjs"]
+ strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
+ close_requested: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
+ started: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
+ pulling: Cell<bool>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
+ pull_again: Cell<bool>,
+}
+
+impl ReadableStreamDefaultController {
+ #[allow(crown::unrooted_must_root)]
+ fn new_inherited(
+ global: &GlobalScope,
+ underlying_source_type: UnderlyingSourceType,
+ strategy_hwm: f64,
+ strategy_size: Rc<QueuingStrategySize>,
+ can_gc: CanGc,
+ ) -> ReadableStreamDefaultController {
+ ReadableStreamDefaultController {
+ reflector_: Reflector::new(),
+ queue: RefCell::new(Default::default()),
+ stream: MutNullableDom::new(None),
+ underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
+ global,
+ underlying_source_type,
+ can_gc,
+ ))),
+ strategy_hwm,
+ strategy_size: RefCell::new(Some(strategy_size)),
+ close_requested: Default::default(),
+ started: Default::default(),
+ pulling: Default::default(),
+ pull_again: Default::default(),
+ }
+ }
+
+ #[allow(crown::unrooted_must_root)]
+ pub fn new(
+ global: &GlobalScope,
+ underlying_source: UnderlyingSourceType,
+ strategy_hwm: f64,
+ strategy_size: Rc<QueuingStrategySize>,
+ can_gc: CanGc,
+ ) -> DomRoot<ReadableStreamDefaultController> {
+ reflect_dom_object(
+ Box::new(ReadableStreamDefaultController::new_inherited(
+ global,
+ underlying_source,
+ strategy_hwm,
+ strategy_size,
+ can_gc,
+ )),
+ global,
+ can_gc,
+ )
+ }
+
+ /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ #[allow(unsafe_code)]
+ pub fn setup(&self, stream: DomRoot<ReadableStream>, can_gc: CanGc) -> Result<(), Error> {
+ // Assert: stream.[[controller]] is undefined
+ stream.assert_no_controller();
+
+ // Set controller.[[stream]] to stream.
+ self.stream.set(Some(&stream));
+
+ let global = &*self.global();
+ let rooted_default_controller = DomRoot::from_ref(self);
+
+ // Perform ! ResetQueue(controller).
+ // Set controller.[[started]], controller.[[closeRequested]],
+ // controller.[[pullAgain]], and controller.[[pulling]] to false.
+ // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
+ // and controller.[[strategyHWM]] to highWaterMark.
+ // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
+ // and controller.[[strategyHWM]] to highWaterMark.
+ // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
+
+ // Note: the above steps are done in `new`.
+
+ // Set stream.[[controller]] to controller.
+ stream.set_default_controller(&rooted_default_controller);
+
+ if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
+ // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
+ let start_result = underlying_source
+ .call_start_algorithm(
+ Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
+ can_gc,
+ )
+ .unwrap_or_else(|| {
+ let promise = Promise::new(global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ });
+
+ // Let startPromise be a promise resolved with startResult.
+ let start_promise = start_result?;
+
+ // Upon fulfillment of startPromise,
+ let fulfillment_handler = Box::new(StartAlgorithmFulfillmentHandler {
+ controller: Trusted::new(&*rooted_default_controller),
+ });
+
+ // Upon rejection of startPromise with reason r,
+ let rejection_handler = Box::new(StartAlgorithmRejectionHandler {
+ controller: Trusted::new(&*rooted_default_controller),
+ });
+ let handler = PromiseNativeHandler::new(
+ global,
+ Some(fulfillment_handler),
+ Some(rejection_handler),
+ );
+ let realm = enter_realm(global);
+ let comp = InRealm::Entered(&realm);
+ start_promise.append_native_handler(&handler, comp, can_gc);
+ };
+
+ Ok(())
+ }
+
+ /// Setting the JS object after the heap has settled down.
+ pub fn set_underlying_source_this_object(&self, this_object: HandleObject) {
+ if let Some(underlying_source) = self.underlying_source.get() {
+ underlying_source.set_underlying_source_this_object(this_object);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#dequeue-value>
+ #[allow(crown::unrooted_must_root)]
+ fn dequeue_value(&self) -> EnqueuedValue {
+ let mut queue = self.queue.borrow_mut();
+ queue.dequeue_value()
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
+ fn should_call_pull(&self) -> bool {
+ // Let stream be controller.[[stream]].
+ // Note: the spec does not assert that stream is not undefined here,
+ // so we return false if it is.
+ let Some(stream) = self.stream.get() else {
+ debug!("`should_call_pull` called on a controller without a stream.");
+ return false;
+ };
+
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
+ if !self.can_close_or_enqueue() {
+ return false;
+ }
+
+ // If controller.[[started]] is false, return false.
+ if !self.started.get() {
+ return false;
+ }
+
+ // If ! IsReadableStreamLocked(stream) is true
+ // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
+ if stream.is_locked() && stream.get_num_read_requests() > 0 {
+ return true;
+ }
+
+ // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
+ // Assert: desiredSize is not null.
+ let desired_size = self.get_desired_size().expect("desiredSize is not null.");
+
+ if desired_size > 0. {
+ return true;
+ }
+
+ false
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+ #[allow(unsafe_code)]
+ fn call_pull_if_needed(&self, can_gc: CanGc) {
+ if !self.should_call_pull() {
+ return;
+ }
+
+ // If controller.[[pulling]] is true,
+ if self.pulling.get() {
+ // Set controller.[[pullAgain]] to true.
+ self.pull_again.set(true);
+
+ return;
+ }
+
+ // Set controller.[[pulling]] to true.
+ self.pulling.set(true);
+
+ // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
+ // Continues into the resolve and reject handling of the native handler.
+ let global = self.global();
+ let rooted_default_controller = DomRoot::from_ref(self);
+ let controller =
+ Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
+
+ let Some(underlying_source) = self.underlying_source.get() else {
+ return;
+ };
+
+ let fulfillment_handler = Box::new(PullAlgorithmFulfillmentHandler {
+ controller: Trusted::new(&*rooted_default_controller),
+ });
+ let rejection_handler = Box::new(PullAlgorithmRejectionHandler {
+ controller: Trusted::new(&*rooted_default_controller),
+ });
+ let handler =
+ PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler));
+
+ let realm = enter_realm(&*global);
+ let comp = InRealm::Entered(&realm);
+ let result = underlying_source
+ .call_pull_algorithm(controller, can_gc)
+ .unwrap_or_else(|| {
+ let promise = Promise::new(&global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ });
+ let promise = result.unwrap_or_else(|error| {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ // TODO: check if `self.global()` is the right globalscope.
+ unsafe {
+ error
+ .clone()
+ .to_jsval(*cx, &self.global(), rval.handle_mut())
+ };
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_native(&rval.handle());
+ promise
+ });
+ promise.append_native_handler(&handler, comp, can_gc);
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
+ #[allow(unsafe_code)]
+ pub fn perform_cancel_steps(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
+ // Perform ! ResetQueue(this).
+ self.queue.borrow_mut().reset();
+
+ let underlying_source = self
+ .underlying_source
+ .get()
+ .expect("Controller should have a source when the cancel steps are called into.");
+ let global = self.global();
+
+ // Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
+ let result = underlying_source
+ .call_cancel_algorithm(reason, can_gc)
+ .unwrap_or_else(|| {
+ let promise = Promise::new(&global, can_gc);
+ promise.resolve_native(&());
+ Ok(promise)
+ });
+ let promise = result.unwrap_or_else(|error| {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ // TODO: check if `self.global()` is the right globalscope.
+ unsafe {
+ error
+ .clone()
+ .to_jsval(*cx, &self.global(), rval.handle_mut())
+ };
+ let promise = Promise::new(&global, can_gc);
+ promise.reject_native(&rval.handle());
+ promise
+ });
+
+ // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
+ self.clear_algorithms();
+
+ // Return result(the promise).
+ promise
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
+ #[allow(crown::unrooted_must_root)]
+ pub fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
+ // Let stream be this.[[stream]].
+ // Note: the spec does not assert that there is a stream.
+ let Some(stream) = self.stream.get() else {
+ return;
+ };
+
+ // if queue contains bytes, perform chunk steps.
+ if !self.queue.borrow().is_empty() {
+ // Rooting: chunk will be tied to a rooted value
+ // before calling into a function that can GC.
+ let chunk = self.dequeue_value();
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ let result = RootedTraceableBox::new(Heap::default());
+ chunk.to_jsval(cx, rval.handle_mut());
+ result.set(*rval);
+
+ // If this.[[closeRequested]] is true and this.[[queue]] is empty
+ if self.close_requested.get() && self.queue.borrow().is_empty() {
+ // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ // Perform ! ReadableStreamClose(stream).
+ stream.close();
+ } else {
+ // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
+ self.call_pull_if_needed(can_gc);
+ }
+ // Perform readRequest’s chunk steps, given chunk.
+ read_request.chunk_steps(result);
+ } else {
+ // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
+ stream.add_read_request(read_request);
+
+ // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
+ self.call_pull_if_needed(can_gc);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
+ pub fn perform_release_steps(&self) {
+ // step 1 - Return.
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
+ #[allow(unsafe_code)]
+ pub fn enqueue(
+ &self,
+ cx: SafeJSContext,
+ chunk: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> Result<(), Error> {
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
+ if !self.can_close_or_enqueue() {
+ return Ok(());
+ }
+
+ let stream = self
+ .stream
+ .get()
+ .expect("Controller must have a stream when a chunk is enqueued.");
+
+ // If ! IsReadableStreamLocked(stream) is true
+ // and ! ReadableStreamGetNumReadRequests(stream) > 0,
+ // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
+ if stream.is_locked() && stream.get_num_read_requests() > 0 {
+ stream.fulfill_read_request(chunk, false);
+ } else {
+ // Otherwise,
+ // Let result be the result of performing controller.[[strategySizeAlgorithm]],
+ // passing in chunk, and interpreting the result as a completion record.
+ // Note: the clone is necessary to prevent potential re-borrow panics.
+ let strategy_size = {
+ let reference = self.strategy_size.borrow();
+ reference.clone()
+ };
+ let size = if let Some(strategy_size) = strategy_size {
+ // Note: the Rethrow exception handling is necessary,
+ // otherwise returning JSFailed will panic because no exception is pending.
+ let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow);
+ match result {
+ // Let chunkSize be result.[[Value]].
+ Ok(size) => size,
+ Err(error) => {
+ // If result is an abrupt completion,
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
+ self.error(rval.handle());
+
+ // Return result.
+ // Note: we need to return a type error, because no exception is pending.
+ return Err(error);
+ },
+ }
+ } else {
+ 0.
+ };
+
+ {
+ // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
+ let res = {
+ let mut queue = self.queue.borrow_mut();
+ queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
+ value: Heap::boxed(chunk.get()),
+ size,
+ }))
+ };
+ if let Err(error) = res {
+ // If enqueueResult is an abrupt completion,
+
+ // First, throw the exception.
+ // Note: this must be done manually here,
+ // because `enqueue_value_with_size` does not call into JS.
+ throw_dom_exception(cx, &self.global(), error);
+
+ // Then, get a handle to the JS val for the exception,
+ // and use that to error the stream.
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
+
+ // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
+ self.error(rval.handle());
+
+ // Return enqueueResult.
+ // Note: because we threw the exception above,
+ // there is a pending exception and we can return JSFailed.
+ return Err(Error::JSFailed);
+ }
+ }
+ }
+
+ // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
+ self.call_pull_if_needed(can_gc);
+
+ Ok(())
+ }
+
+ /// Native call to
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
+ #[allow(crown::unrooted_must_root)]
+ pub fn enqueue_native(&self, chunk: Vec<u8>) {
+ let stream = self
+ .stream
+ .get()
+ .expect("Controller must have a stream when a chunk is enqueued.");
+ if stream.is_locked() && stream.get_num_read_requests() > 0 {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ let enqueued_chunk = EnqueuedValue::Native(chunk.into_boxed_slice());
+ enqueued_chunk.to_jsval(cx, rval.handle_mut());
+ stream.fulfill_read_request(rval.handle(), false);
+ } else {
+ let mut queue = self.queue.borrow_mut();
+ queue
+ .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
+ .expect("Enqueuing a chunk from Rust should not fail.");
+ }
+ }
+
+ /// Does the stream have all data in memory?
+ pub fn in_memory(&self) -> bool {
+ let Some(underlying_source) = self.underlying_source.get() else {
+ return false;
+ };
+ underlying_source.in_memory()
+ }
+
+ /// Return bytes synchronously if the stream has all data in memory.
+ pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
+ let underlying_source = self.underlying_source.get()?;
+ if underlying_source.in_memory() {
+ return self.queue.borrow().get_in_memory_bytes();
+ }
+ None
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
+ fn clear_algorithms(&self) {
+ // Set controller.[[pullAlgorithm]] to undefined.
+ // Set controller.[[cancelAlgorithm]] to undefined.
+ self.underlying_source.set(None);
+
+ // Set controller.[[strategySizeAlgorithm]] to undefined.
+ *self.strategy_size.borrow_mut() = None;
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
+ pub fn close(&self) {
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
+ if !self.can_close_or_enqueue() {
+ return;
+ }
+
+ let Some(stream) = self.stream.get() else {
+ return;
+ };
+
+ // Set controller.[[closeRequested]] to true.
+ self.close_requested.set(true);
+
+ if self.queue.borrow().is_empty() {
+ // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ // Perform ! ReadableStreamClose(stream).
+ stream.close();
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
+ fn get_desired_size(&self) -> Option<f64> {
+ let stream = self.stream.get()?;
+
+ // If state is "errored", return null.
+ if stream.is_errored() {
+ return None;
+ }
+
+ // If state is "closed", return 0.
+ if stream.is_closed() {
+ return Some(0.0);
+ }
+
+ // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
+ let queue = self.queue.borrow();
+ let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX);
+ Some(desired_size.clamp(desired_size, self.strategy_hwm))
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
+ fn can_close_or_enqueue(&self) -> bool {
+ let Some(stream) = self.stream.get() else {
+ return false;
+ };
+
+ // If controller.[[closeRequested]] is false and state is "readable", return true.
+ if !self.close_requested.get() && stream.is_readable() {
+ return true;
+ }
+
+ // Otherwise, return false.
+ false
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
+ pub fn error(&self, e: SafeHandleValue) {
+ let Some(stream) = self.stream.get() else {
+ return;
+ };
+
+ // If stream.[[state]] is not "readable", return.
+ if !stream.is_readable() {
+ return;
+ }
+
+ // Perform ! ResetQueue(controller).
+ self.queue.borrow_mut().reset();
+
+ // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
+ self.clear_algorithms();
+
+ stream.error(e);
+ }
+}
+
+impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
+ for ReadableStreamDefaultController
+{
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
+ fn GetDesiredSize(&self) -> Option<f64> {
+ self.get_desired_size()
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-close>
+ fn Close(&self) -> Fallible<()> {
+ if !self.can_close_or_enqueue() {
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
+ // throw a TypeError exception.
+ return Err(Error::Type("Stream cannot be closed.".to_string()));
+ }
+
+ // Perform ! ReadableStreamDefaultControllerClose(this).
+ self.close();
+
+ Ok(())
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
+ fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
+ // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
+ if !self.can_close_or_enqueue() {
+ return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
+ }
+
+ // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
+ self.enqueue(cx, chunk, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#rs-default-controller-error>
+ fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue) -> Fallible<()> {
+ self.error(e);
+ Ok(())
+ }
+}
diff --git a/components/script/dom/readablestreamdefaultreader.rs b/components/script/dom/readablestreamdefaultreader.rs
new file mode 100644
index 00000000000..0508c363ac9
--- /dev/null
+++ b/components/script/dom/readablestreamdefaultreader.rs
@@ -0,0 +1,530 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::mem;
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::Heap;
+use js::jsval::{JSVal, UndefinedValue};
+use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
+
+use super::bindings::root::MutNullableDom;
+use super::types::ReadableStreamDefaultController;
+use crate::dom::bindings::cell::DomRefCell;
+use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
+ ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
+};
+use crate::dom::bindings::error::Error;
+use crate::dom::bindings::import::module::Fallible;
+use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, DomObject, Reflector};
+use crate::dom::bindings::root::{Dom, DomRoot};
+use crate::dom::bindings::trace::RootedTraceableBox;
+use crate::dom::defaultteereadrequest::DefaultTeeReadRequest;
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
+use crate::dom::readablestream::ReadableStream;
+use crate::realms::{enter_realm, InRealm};
+use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
+
+/// <https://streams.spec.whatwg.org/#read-request>
+#[derive(Clone, JSTraceable)]
+#[crown::unrooted_must_root_lint::must_root]
+pub enum ReadRequest {
+ /// <https://streams.spec.whatwg.org/#default-reader-read>
+ Read(Rc<Promise>),
+ /// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
+ DefaultTee {
+ tee_read_request: Dom<DefaultTeeReadRequest>,
+ },
+}
+
+impl ReadRequest {
+ /// <https://streams.spec.whatwg.org/#read-request-chunk-steps>
+ pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) {
+ match self {
+ ReadRequest::Read(promise) => {
+ promise.resolve_native(&ReadableStreamReadResult {
+ done: Some(false),
+ value: chunk,
+ });
+ },
+ ReadRequest::DefaultTee { tee_read_request } => {
+ tee_read_request.enqueue_chunk_steps(chunk);
+ },
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#read-request-close-steps>
+ pub fn close_steps(&self) {
+ match self {
+ ReadRequest::Read(promise) => {
+ let result = RootedTraceableBox::new(Heap::default());
+ result.set(UndefinedValue());
+ promise.resolve_native(&ReadableStreamReadResult {
+ done: Some(true),
+ value: result,
+ });
+ },
+ ReadRequest::DefaultTee { tee_read_request } => {
+ tee_read_request.close_steps();
+ },
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#read-request-error-steps>
+ pub fn error_steps(&self, e: SafeHandleValue) {
+ match self {
+ ReadRequest::Read(promise) => promise.reject_native(&e),
+ ReadRequest::DefaultTee { tee_read_request } => {
+ tee_read_request.error_steps();
+ },
+ }
+ }
+}
+
+/// The rejection handler for
+/// <https://streams.spec.whatwg.org/#readable-stream-tee>
+#[derive(Clone, JSTraceable, MallocSizeOf)]
+#[crown::unrooted_must_root_lint::must_root]
+struct ClosedPromiseRejectionHandler {
+ branch_1_controller: Dom<ReadableStreamDefaultController>,
+ branch_2_controller: Dom<ReadableStreamDefaultController>,
+ #[ignore_malloc_size_of = "Rc"]
+ canceled_1: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ canceled_2: Rc<Cell<bool>>,
+ #[ignore_malloc_size_of = "Rc"]
+ cancel_promise: Rc<Promise>,
+}
+
+impl Callback for ClosedPromiseRejectionHandler {
+ /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
+ /// Upon rejection of reader.[[closedPromise]] with reason r,
+ fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
+ let branch_1_controller = &self.branch_1_controller;
+ let branch_2_controller = &self.branch_2_controller;
+
+ // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], r).
+ branch_1_controller.error(v);
+ // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], r).
+ branch_2_controller.error(v);
+
+ // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
+ if !self.canceled_1.get() || !self.canceled_2.get() {
+ self.cancel_promise.resolve_native(&());
+ }
+ }
+}
+
+/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
+#[dom_struct]
+pub struct ReadableStreamDefaultReader {
+ reflector_: Reflector,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
+ stream: MutNullableDom<ReadableStream>,
+
+ #[ignore_malloc_size_of = "no VecDeque support"]
+ read_requests: DomRefCell<VecDeque<ReadRequest>>,
+
+ /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
+ #[ignore_malloc_size_of = "Rc is hard"]
+ closed_promise: DomRefCell<Rc<Promise>>,
+}
+
+impl ReadableStreamDefaultReader {
+ /// <https://streams.spec.whatwg.org/#default-reader-constructor>
+ #[allow(non_snake_case)]
+ pub fn Constructor(
+ global: &GlobalScope,
+ proto: Option<SafeHandleObject>,
+ can_gc: CanGc,
+ stream: &ReadableStream,
+ ) -> Fallible<DomRoot<Self>> {
+ let reader = Self::new_with_proto(global, proto, can_gc);
+
+ // Perform ? SetUpReadableStreamDefaultReader(this, stream).
+ Self::set_up(&reader, stream, global, can_gc)?;
+
+ Ok(reader)
+ }
+
+ fn new_with_proto(
+ global: &GlobalScope,
+ proto: Option<SafeHandleObject>,
+ can_gc: CanGc,
+ ) -> DomRoot<ReadableStreamDefaultReader> {
+ reflect_dom_object_with_proto(
+ Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
+ global,
+ proto,
+ can_gc,
+ )
+ }
+
+ pub fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
+ ReadableStreamDefaultReader {
+ reflector_: Reflector::new(),
+ stream: MutNullableDom::new(None),
+ read_requests: DomRefCell::new(Default::default()),
+ closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader>
+ pub fn set_up(
+ &self,
+ stream: &ReadableStream,
+ global: &GlobalScope,
+ can_gc: CanGc,
+ ) -> Fallible<()> {
+ // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
+ if stream.is_locked() {
+ return Err(Error::Type("stream is locked".to_owned()));
+ }
+ // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
+
+ self.generic_initialize(global, stream, can_gc)?;
+
+ // Set reader.[[readRequests]] to a new empty list.
+ self.read_requests.borrow_mut().clear();
+
+ Ok(())
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-initialize>
+ pub fn generic_initialize(
+ &self,
+ global: &GlobalScope,
+ stream: &ReadableStream,
+ can_gc: CanGc,
+ ) -> Fallible<()> {
+ // Set reader.[[stream]] to stream.
+ self.stream.set(Some(stream));
+
+ // Set stream.[[reader]] to reader.
+ stream.set_reader(Some(self));
+
+ if stream.is_readable() {
+ // If stream.[[state]] is "readable
+ // Set reader.[[closedPromise]] to a new promise.
+ *self.closed_promise.borrow_mut() = Promise::new(global, can_gc);
+ } else if stream.is_closed() {
+ // Otherwise, if stream.[[state]] is "closed",
+ // Set reader.[[closedPromise]] to a promise resolved with undefined.
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut rval = UndefinedValue());
+ *self.closed_promise.borrow_mut() = Promise::new_resolved(global, cx, rval.handle())?;
+ } else {
+ // Assert: stream.[[state]] is "errored"
+ assert!(stream.is_errored());
+
+ // Set reader.[[closedPromise]] to a promise rejected with stream.[[storedError]].
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ stream.get_stored_error(error.handle_mut());
+ *self.closed_promise.borrow_mut() = Promise::new_rejected(global, cx, error.handle())?;
+
+ // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true
+ self.closed_promise.borrow().set_promise_is_handled();
+ }
+
+ Ok(())
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-close>
+ #[allow(crown::unrooted_must_root)]
+ pub fn close(&self) {
+ // Resolve reader.[[closedPromise]] with undefined.
+ self.closed_promise.borrow().resolve_native(&());
+ // If reader implements ReadableStreamDefaultReader,
+ // Let readRequests be reader.[[readRequests]].
+ let mut read_requests = self.take_read_requests();
+ // Set reader.[[readRequests]] to an empty list.
+ // For each readRequest of readRequests,
+ for request in read_requests.drain(0..) {
+ // Perform readRequest’s close steps.
+ request.close_steps();
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
+ pub fn add_read_request(&self, read_request: &ReadRequest) {
+ self.read_requests
+ .borrow_mut()
+ .push_back(read_request.clone());
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
+ pub fn get_num_read_requests(&self) -> usize {
+ self.read_requests.borrow().len()
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-error>
+ pub fn error(&self, e: SafeHandleValue) {
+ // Reject reader.[[closedPromise]] with e.
+ self.closed_promise.borrow().reject_native(&e);
+
+ // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
+ self.closed_promise.borrow().set_promise_is_handled();
+
+ // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
+ self.error_read_requests(e);
+ }
+
+ /// The removal steps of <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
+ #[allow(crown::unrooted_must_root)]
+ pub fn remove_read_request(&self) -> ReadRequest {
+ self.read_requests
+ .borrow_mut()
+ .pop_front()
+ .expect("Reader must have read request when remove is called into.")
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-release>
+ #[allow(unsafe_code)]
+ pub fn generic_release(&self) {
+ // Let stream be reader.[[stream]].
+
+ // Assert: stream is not undefined.
+ assert!(self.stream.get().is_some());
+
+ if let Some(stream) = self.stream.get() {
+ // Assert: stream.[[reader]] is reader.
+ assert!(stream.has_default_reader());
+
+ if stream.is_readable() {
+ // If stream.[[state]] is "readable", reject reader.[[closedPromise]] with a TypeError exception.
+ self.closed_promise
+ .borrow()
+ .reject_error(Error::Type("stream state is not readable".to_owned()));
+ } else {
+ // Otherwise, set reader.[[closedPromise]] to a promise rejected with a TypeError exception.
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ unsafe {
+ Error::Type("Cannot release lock due to stream state.".to_owned()).to_jsval(
+ *cx,
+ &self.global(),
+ error.handle_mut(),
+ )
+ };
+
+ *self.closed_promise.borrow_mut() =
+ Promise::new_rejected(&self.global(), cx, error.handle()).unwrap();
+ }
+ // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
+ self.closed_promise.borrow().set_promise_is_handled();
+
+ // Perform ! stream.[[controller]].[[ReleaseSteps]]().
+ stream.perform_release_steps();
+
+ // Set stream.[[reader]] to undefined.
+ stream.set_reader(None);
+ // Set reader.[[stream]] to undefined.
+ self.stream.set(None);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
+ #[allow(unsafe_code)]
+ pub fn release(&self) {
+ // Perform ! ReadableStreamReaderGenericRelease(reader).
+ self.generic_release();
+ // Let e be a new TypeError exception.
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ unsafe {
+ Error::Type("Reader is released".to_owned()).to_jsval(
+ *cx,
+ &self.global(),
+ error.handle_mut(),
+ )
+ };
+
+ // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
+ self.error_read_requests(error.handle());
+ }
+
+ #[allow(crown::unrooted_must_root)]
+ fn take_read_requests(&self) -> VecDeque<ReadRequest> {
+ mem::take(&mut *self.read_requests.borrow_mut())
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-cancel>
+ fn generic_cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
+ // Let stream be reader.[[stream]].
+ let stream = self.stream.get();
+
+ // Assert: stream is not undefined.
+ let stream =
+ stream.expect("Reader should have a stream when generic cancel is called into.");
+
+ // Return ! ReadableStreamCancel(stream, reason).
+ stream.cancel(reason, can_gc)
+ }
+
+ /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreadererrorreadrequests>
+ #[allow(crown::unrooted_must_root)]
+ fn error_read_requests(&self, rval: SafeHandleValue) {
+ // step 1
+ let mut read_requests = self.take_read_requests();
+
+ // step 2 & 3
+ for request in read_requests.drain(0..) {
+ request.error_steps(rval);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
+ pub fn read(&self, read_request: &ReadRequest, can_gc: CanGc) {
+ // Let stream be reader.[[stream]].
+
+ // Assert: stream is not undefined.
+ assert!(self.stream.get().is_some());
+
+ let stream = self.stream.get().unwrap();
+
+ // Set stream.[[disturbed]] to true.
+ stream.set_is_disturbed(true);
+ // If stream.[[state]] is "closed", perform readRequest’s close steps.
+ if stream.is_closed() {
+ read_request.close_steps();
+ } else if stream.is_errored() {
+ // Otherwise, if stream.[[state]] is "errored",
+ // perform readRequest’s error steps given stream.[[storedError]].
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ stream.get_stored_error(error.handle_mut());
+ read_request.error_steps(error.handle());
+ } else {
+ // Otherwise
+ // Assert: stream.[[state]] is "readable".
+ assert!(stream.is_readable());
+ // Perform ! stream.[[controller]].[[PullSteps]](readRequest).
+ stream.perform_pull_steps(read_request, can_gc);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#ref-for-readablestreamgenericreader-closedpromise%E2%91%A1>
+ pub fn append_native_handler_to_closed_promise(
+ &self,
+ branch_1: &ReadableStream,
+ branch_2: &ReadableStream,
+ canceled_1: Rc<Cell<bool>>,
+ canceled_2: Rc<Cell<bool>>,
+ cancel_promise: Rc<Promise>,
+ can_gc: CanGc,
+ ) {
+ let branch_1_controller = branch_1.get_default_controller();
+
+ let branch_2_controller = branch_2.get_default_controller();
+
+ let global = self.global();
+ let handler = PromiseNativeHandler::new(
+ &global,
+ None,
+ Some(Box::new(ClosedPromiseRejectionHandler {
+ branch_1_controller: Dom::from_ref(&branch_1_controller),
+ branch_2_controller: Dom::from_ref(&branch_2_controller),
+ canceled_1,
+ canceled_2,
+ cancel_promise,
+ })),
+ );
+
+ let realm = enter_realm(&*global);
+ let comp = InRealm::Entered(&realm);
+
+ self.closed_promise
+ .borrow()
+ .append_native_handler(&handler, comp, can_gc);
+ }
+}
+
+impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
+ /// <https://streams.spec.whatwg.org/#default-reader-constructor>
+ fn Constructor(
+ global: &GlobalScope,
+ proto: Option<SafeHandleObject>,
+ can_gc: CanGc,
+ stream: &ReadableStream,
+ ) -> Fallible<DomRoot<Self>> {
+ ReadableStreamDefaultReader::Constructor(global, proto, can_gc, stream)
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-reader-read>
+ #[allow(unsafe_code)]
+ #[allow(crown::unrooted_must_root)]
+ fn Read(&self, can_gc: CanGc) -> Rc<Promise> {
+ // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
+ if self.stream.get().is_none() {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut error = UndefinedValue());
+ unsafe {
+ Error::Type("stream is undefined".to_owned()).to_jsval(
+ *cx,
+ &self.global(),
+ error.handle_mut(),
+ )
+ };
+ return Promise::new_rejected(&self.global(), cx, error.handle()).unwrap();
+ }
+ // Let promise be a new promise.
+ let promise = Promise::new(&self.reflector_.global(), can_gc);
+
+ // Let readRequest be a new read request with the following items:
+ // chunk steps, given chunk
+ // Resolve promise with «[ "value" → chunk, "done" → false ]».
+ //
+ // close steps
+ // Resolve promise with «[ "value" → undefined, "done" → true ]».
+ //
+ // error steps, given e
+ // Reject promise with e.
+
+ // Rooting(unrooted_must_root): the read request contains only a promise,
+ // which does not need to be rooted,
+ // as it is safely managed natively via an Rc.
+ let read_request = ReadRequest::Read(promise.clone());
+
+ // Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
+ self.read(&read_request, can_gc);
+
+ // Return promise.
+ promise
+ }
+
+ /// <https://streams.spec.whatwg.org/#default-reader-release-lock>
+ fn ReleaseLock(&self) {
+ if self.stream.get().is_some() {
+ // step 2 - Perform ! ReadableStreamDefaultReaderRelease(this).
+ self.release();
+ }
+ // step 1 - If this.[[stream]] is undefined, return.
+ }
+
+ /// <https://streams.spec.whatwg.org/#generic-reader-closed>
+ fn Closed(&self) -> Rc<Promise> {
+ self.closed_promise.borrow().clone()
+ }
+
+ /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
+ fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
+ if self.stream.get().is_none() {
+ // If this.[[stream]] is undefined,
+ // return a promise rejected with a TypeError exception.
+ let promise = Promise::new(&self.reflector_.global(), can_gc);
+ promise.reject_error(Error::Type("stream is undefined".to_owned()));
+ promise
+ } else {
+ // Return ! ReadableStreamReaderGenericCancel(this, reason).
+ self.generic_cancel(reason, can_gc)
+ }
+ }
+}
diff --git a/components/script/dom/response.rs b/components/script/dom/response.rs
index 83c94e91874..49081868777 100644
--- a/components/script/dom/response.rs
+++ b/components/script/dom/response.rs
@@ -30,7 +30,8 @@ use crate::dom::bindings::str::{ByteString, USVString};
use crate::dom::globalscope::GlobalScope;
use crate::dom::headers::{is_obs_text, is_vchar, Guard, Headers};
use crate::dom::promise::Promise;
-use crate::dom::readablestream::{ExternalUnderlyingSource, ReadableStream};
+use crate::dom::readablestream::ReadableStream;
+use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
use crate::script_runtime::{CanGc, JSContext as SafeJSContext, StreamConsumer};
#[dom_struct]
@@ -53,10 +54,11 @@ pub struct Response {
#[allow(non_snake_case)]
impl Response {
- pub fn new_inherited(global: &GlobalScope) -> Response {
+ pub fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> Response {
let stream = ReadableStream::new_with_external_underlying_source(
global,
- ExternalUnderlyingSource::FetchResponse,
+ UnderlyingSourceType::FetchResponse,
+ can_gc,
);
Response {
reflector_: Reflector::new(),
@@ -82,7 +84,7 @@ impl Response {
can_gc: CanGc,
) -> DomRoot<Response> {
reflect_dom_object_with_proto(
- Box::new(Response::new_inherited(global)),
+ Box::new(Response::new_inherited(global, can_gc)),
global,
proto,
can_gc,
@@ -444,19 +446,19 @@ impl Response {
*self.stream_consumer.borrow_mut() = sc;
}
- pub fn stream_chunk(&self, chunk: Vec<u8>, can_gc: CanGc) {
+ pub fn stream_chunk(&self, chunk: Vec<u8>) {
// Note, are these two actually mutually exclusive?
if let Some(stream_consumer) = self.stream_consumer.borrow().as_ref() {
stream_consumer.consume_chunk(chunk.as_slice());
} else if let Some(body) = self.body_stream.get() {
- body.enqueue_native(chunk, can_gc);
+ body.enqueue_native(chunk);
}
}
#[allow(crown::unrooted_must_root)]
pub fn finish(&self) {
if let Some(body) = self.body_stream.get() {
- body.close_native();
+ body.controller_close_native();
}
let stream_consumer = self.stream_consumer.borrow_mut().take();
if let Some(stream_consumer) = stream_consumer {
diff --git a/components/script/dom/underlyingsourcecontainer.rs b/components/script/dom/underlyingsourcecontainer.rs
new file mode 100644
index 00000000000..06db74fb4bd
--- /dev/null
+++ b/components/script/dom/underlyingsourcecontainer.rs
@@ -0,0 +1,225 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::ptr;
+use std::rc::Rc;
+
+use dom_struct::dom_struct;
+use js::jsapi::{Heap, IsPromiseObject, JSObject};
+use js::jsval::JSVal;
+use js::rust::{Handle as SafeHandle, HandleObject, HandleValue as SafeHandleValue, IntoHandle};
+
+use crate::dom::bindings::callback::ExceptionHandling;
+use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
+use crate::dom::bindings::import::module::Error;
+use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
+use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, DomObject, Reflector};
+use crate::dom::bindings::root::{Dom, DomRoot};
+use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
+use crate::dom::globalscope::GlobalScope;
+use crate::dom::promise::Promise;
+use crate::script_runtime::CanGc;
+
+/// <https://streams.spec.whatwg.org/#underlying-source-api>
+/// The `Js` variant corresponds to
+/// the JavaScript object representing the underlying source.
+/// The other variants are native sources in Rust.
+#[derive(JSTraceable)]
+#[crown::unrooted_must_root_lint::must_root]
+pub enum UnderlyingSourceType {
+ /// Facilitate partial integration with sources
+ /// that are currently read into memory.
+ Memory(usize),
+ /// A blob as underlying source, with a known total size.
+ Blob(usize),
+ /// A fetch response as underlying source.
+ FetchResponse,
+ /// A struct representing a JS object as underlying source,
+ /// and the actual JS object for use as `thisArg` in callbacks.
+ Js(JsUnderlyingSource, Heap<*mut JSObject>),
+ /// Tee
+ Tee(Dom<DefaultTeeUnderlyingSource>),
+}
+
+impl UnderlyingSourceType {
+ /// Is the source backed by a Rust native source?
+ pub fn is_native(&self) -> bool {
+ matches!(
+ self,
+ UnderlyingSourceType::Memory(_) |
+ UnderlyingSourceType::Blob(_) |
+ UnderlyingSourceType::FetchResponse
+ )
+ }
+
+ /// Does the source have all data in memory?
+ pub fn in_memory(&self) -> bool {
+ matches!(self, UnderlyingSourceType::Memory(_))
+ }
+}
+
+/// Wrapper around the underlying source.
+#[dom_struct]
+pub struct UnderlyingSourceContainer {
+ reflector_: Reflector,
+ #[ignore_malloc_size_of = "JsUnderlyingSource implemented in SM."]
+ underlying_source_type: UnderlyingSourceType,
+}
+
+impl UnderlyingSourceContainer {
+ #[allow(crown::unrooted_must_root)]
+ fn new_inherited(underlying_source_type: UnderlyingSourceType) -> UnderlyingSourceContainer {
+ UnderlyingSourceContainer {
+ reflector_: Reflector::new(),
+ underlying_source_type,
+ }
+ }
+
+ #[allow(crown::unrooted_must_root)]
+ pub fn new(
+ global: &GlobalScope,
+ underlying_source_type: UnderlyingSourceType,
+ can_gc: CanGc,
+ ) -> DomRoot<UnderlyingSourceContainer> {
+ // TODO: setting the underlying source dict as the prototype of the
+ // `UnderlyingSourceContainer`, as it is later used as the "this" in Call_.
+ // Is this a good idea?
+ reflect_dom_object_with_proto(
+ Box::new(UnderlyingSourceContainer::new_inherited(
+ underlying_source_type,
+ )),
+ global,
+ None,
+ can_gc,
+ )
+ }
+
+ /// Setting the JS object after the heap has settled down.
+ pub fn set_underlying_source_this_object(&self, object: HandleObject) {
+ if let UnderlyingSourceType::Js(_source, this_obj) = &self.underlying_source_type {
+ this_obj.set(*object);
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#dom-underlyingsource-cancel>
+ #[allow(unsafe_code)]
+ pub fn call_cancel_algorithm(
+ &self,
+ reason: SafeHandleValue,
+ can_gc: CanGc,
+ ) -> Option<Result<Rc<Promise>, Error>> {
+ match &self.underlying_source_type {
+ UnderlyingSourceType::Js(source, this_obj) => {
+ if let Some(algo) = &source.cancel {
+ let result = unsafe {
+ algo.Call_(
+ &SafeHandle::from_raw(this_obj.handle()),
+ Some(reason),
+ ExceptionHandling::Rethrow,
+ )
+ };
+ return Some(result);
+ }
+ None
+ },
+ UnderlyingSourceType::Tee(tee_underlyin_source) => {
+ // Call the cancel algorithm for the appropriate branch.
+ tee_underlyin_source.cancel_algorithm(reason, can_gc)
+ },
+ _ => None,
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#dom-underlyingsource-pull>
+ #[allow(unsafe_code)]
+ pub fn call_pull_algorithm(
+ &self,
+ controller: Controller,
+ can_gc: CanGc,
+ ) -> Option<Result<Rc<Promise>, Error>> {
+ match &self.underlying_source_type {
+ UnderlyingSourceType::Js(source, this_obj) => {
+ if let Some(algo) = &source.pull {
+ let result = unsafe {
+ algo.Call_(
+ &SafeHandle::from_raw(this_obj.handle()),
+ controller,
+ ExceptionHandling::Rethrow,
+ )
+ };
+ return Some(result);
+ }
+ None
+ },
+ UnderlyingSourceType::Tee(tee_underlyin_source) => {
+ // Call the pull algorithm for the appropriate branch.
+ tee_underlyin_source.pull_algorithm(can_gc)
+ },
+ // Note: other source type have no pull steps for now.
+ _ => None,
+ }
+ }
+
+ /// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
+ ///
+ /// Note: The algorithm can return any value, including a promise,
+ /// we always transform the result into a promise for convenience,
+ /// and it is also how to spec deals with the situation.
+ /// see "Let startPromise be a promise resolved with startResult."
+ /// at <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
+ #[allow(unsafe_code)]
+ pub fn call_start_algorithm(
+ &self,
+ controller: Controller,
+ can_gc: CanGc,
+ ) -> Option<Result<Rc<Promise>, Error>> {
+ match &self.underlying_source_type {
+ UnderlyingSourceType::Js(source, this_obj) => {
+ if let Some(start) = &source.start {
+ let cx = GlobalScope::get_cx();
+ rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
+ rooted!(in(*cx) let mut result: JSVal);
+ unsafe {
+ if let Err(error) = start.Call_(
+ &SafeHandle::from_raw(this_obj.handle()),
+ controller,
+ result.handle_mut(),
+ ExceptionHandling::Rethrow,
+ ) {
+ return Some(Err(error));
+ }
+ }
+ let is_promise = unsafe {
+ if result.is_object() {
+ result_object.set(result.to_object());
+ IsPromiseObject(result_object.handle().into_handle())
+ } else {
+ false
+ }
+ };
+ let promise = if is_promise {
+ let promise = Promise::new_with_js_promise(result_object.handle(), cx);
+ promise
+ } else {
+ let promise = Promise::new(&self.global(), can_gc);
+ promise.resolve_native(&result.get());
+ promise
+ };
+ return Some(Ok(promise));
+ }
+ None
+ },
+ UnderlyingSourceType::Tee(_) => {
+ // Let startAlgorithm be an algorithm that returns undefined.
+ None
+ },
+ _ => None,
+ }
+ }
+
+ /// Does the source have all data in memory?
+ pub fn in_memory(&self) -> bool {
+ self.underlying_source_type.in_memory()
+ }
+}
diff --git a/components/script/dom/webidls/QueuingStrategy.webidl b/components/script/dom/webidls/QueuingStrategy.webidl
new file mode 100644
index 00000000000..66870289de9
--- /dev/null
+++ b/components/script/dom/webidls/QueuingStrategy.webidl
@@ -0,0 +1,34 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// https://streams.spec.whatwg.org/#qs
+
+dictionary QueuingStrategy {
+ unrestricted double highWaterMark;
+ QueuingStrategySize size;
+};
+
+callback QueuingStrategySize = unrestricted double (any chunk);
+
+dictionary QueuingStrategyInit {
+ required unrestricted double highWaterMark;
+};
+
+[Exposed=*]
+interface ByteLengthQueuingStrategy {
+ constructor(QueuingStrategyInit init);
+
+ readonly attribute unrestricted double highWaterMark;
+ [Throws]
+ readonly attribute Function size;
+};
+
+[Exposed=*]
+interface CountQueuingStrategy {
+ constructor(QueuingStrategyInit init);
+
+ readonly attribute unrestricted double highWaterMark;
+ [Throws]
+ readonly attribute Function size;
+};
diff --git a/components/script/dom/webidls/ReadableByteStreamController.webidl b/components/script/dom/webidls/ReadableByteStreamController.webidl
new file mode 100644
index 00000000000..2734bd1e4c2
--- /dev/null
+++ b/components/script/dom/webidls/ReadableByteStreamController.webidl
@@ -0,0 +1,19 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// https://streams.spec.whatwg.org/#rbs-controller-class-definition
+
+[Exposed=*]
+interface ReadableByteStreamController {
+ [Throws] // Throws on OOM
+ readonly attribute ReadableStreamBYOBRequest? byobRequest;
+ readonly attribute unrestricted double? desiredSize;
+
+ [Throws]
+ undefined close();
+ [Throws]
+ undefined enqueue(ArrayBufferView chunk);
+ [Throws]
+ undefined error(optional any e);
+};
diff --git a/components/script/dom/webidls/ReadableStream.webidl b/components/script/dom/webidls/ReadableStream.webidl
index d03212e3e9e..6378dc2ba87 100644
--- a/components/script/dom/webidls/ReadableStream.webidl
+++ b/components/script/dom/webidls/ReadableStream.webidl
@@ -2,10 +2,59 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
-// This interface is entirely internal to Servo, and should not be accessible to
-// web pages.
+// https://streams.spec.whatwg.org/#readablestream
-[LegacyNoInterfaceObject, Exposed=(Window,Worker)]
-// Need to escape "ReadableStream" so it's treated as an identifier.
+[Exposed=*] // [Transferable] - See Bug 1562065
interface _ReadableStream {
+ [Throws]
+ constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});
+
+ // [Throws]
+ // static ReadableStream from(any asyncIterable);
+
+ readonly attribute boolean locked;
+
+ [NewObject]
+ Promise<undefined> cancel(optional any reason);
+
+ [Throws]
+ ReadableStreamReader getReader(optional ReadableStreamGetReaderOptions options = {});
+
+ // [Throws]
+ // ReadableStream pipeThrough(ReadableWritablePair transform, optional StreamPipeOptions options = {});
+
+ // [NewObject]
+ // Promise<undefined> pipeTo(WritableStream destination, optional StreamPipeOptions options = {});
+
+ [Throws]
+ sequence<ReadableStream> tee();
+
+ // [GenerateReturnMethod]
+ // async iterable<any>(optional ReadableStreamIteratorOptions options = {});
+};
+
+enum ReadableStreamType { "bytes" };
+
+enum ReadableStreamReaderMode { "byob" };
+
+dictionary ReadableStreamGetReaderOptions {
+ ReadableStreamReaderMode mode;
+};
+
+/*
+dictionary ReadableStreamIteratorOptions {
+ boolean preventCancel = false;
+};
+
+dictionary ReadableWritablePair {
+ required ReadableStream readable;
+ required WritableStream writable;
+};
+
+dictionary StreamPipeOptions {
+ boolean preventClose = false;
+ boolean preventAbort = false;
+ boolean preventCancel = false;
+ AbortSignal signal;
};
+*/
diff --git a/components/script/dom/webidls/ReadableStreamBYOBReader.webidl b/components/script/dom/webidls/ReadableStreamBYOBReader.webidl
new file mode 100644
index 00000000000..cae85b43350
--- /dev/null
+++ b/components/script/dom/webidls/ReadableStreamBYOBReader.webidl
@@ -0,0 +1,19 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// https://streams.spec.whatwg.org/#byob-reader-class-definition
+
+[Exposed=*]
+interface ReadableStreamBYOBReader {
+ [Throws]
+ constructor(ReadableStream stream);
+
+ [NewObject]
+ Promise<ReadableStreamReadResult> read(ArrayBufferView view);
+
+ [Throws]
+ undefined releaseLock();
+};
+ReadableStreamBYOBReader includes ReadableStreamGenericReader;
+
diff --git a/components/script/dom/webidls/ReadableStreamBYOBRequest.webidl b/components/script/dom/webidls/ReadableStreamBYOBRequest.webidl
new file mode 100644
index 00000000000..dad656c98e1
--- /dev/null
+++ b/components/script/dom/webidls/ReadableStreamBYOBRequest.webidl
@@ -0,0 +1,15 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// https://streams.spec.whatwg.org/#rs-byob-request-class-definition
+
+[Exposed=*]
+interface ReadableStreamBYOBRequest {
+ readonly attribute ArrayBufferView? view;
+
+ [Throws]
+ undefined respond([EnforceRange] unsigned long long bytesWritten);
+ [Throws]
+ undefined respondWithNewView(ArrayBufferView view);
+};
diff --git a/components/script/dom/webidls/ReadableStreamDefaultController.webidl b/components/script/dom/webidls/ReadableStreamDefaultController.webidl
new file mode 100644
index 00000000000..6e3e1546dc8
--- /dev/null
+++ b/components/script/dom/webidls/ReadableStreamDefaultController.webidl
@@ -0,0 +1,17 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// https://streams.spec.whatwg.org/#rs-default-controller-class-definition
+
+[Exposed=*]
+interface ReadableStreamDefaultController {
+ readonly attribute unrestricted double? desiredSize;
+
+ [Throws]
+ undefined close();
+ [Throws]
+ undefined enqueue(optional any chunk);
+ [Throws]
+ undefined error(optional any e);
+};
diff --git a/components/script/dom/webidls/ReadableStreamDefaultReader.webidl b/components/script/dom/webidls/ReadableStreamDefaultReader.webidl
new file mode 100644
index 00000000000..63593912c67
--- /dev/null
+++ b/components/script/dom/webidls/ReadableStreamDefaultReader.webidl
@@ -0,0 +1,47 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// https://streams.spec.whatwg.org/#generic-reader-mixin-definition
+// https://streams.spec.whatwg.org/#default-reader-class-definition
+
+typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader) ReadableStreamReader;
+
+interface mixin ReadableStreamGenericReader {
+ readonly attribute Promise<undefined> closed;
+
+ [NewObject]
+ Promise<undefined> cancel(optional any reason);
+};
+
+[Exposed=*]
+interface ReadableStreamDefaultReader {
+ [Throws]
+ constructor(ReadableStream stream);
+
+ [NewObject]
+ Promise<ReadableStreamReadResult> read();
+
+ undefined releaseLock();
+};
+ReadableStreamDefaultReader includes ReadableStreamGenericReader;
+
+
+dictionary ReadableStreamReadResult {
+ any value;
+ boolean done;
+};
+
+// The DefaultTeeReadRequest interface is entirely internal to Servo, and should not be accessible to
+// web pages.
+[LegacyNoInterfaceObject, Exposed=(Window,Worker)]
+// Need to escape "DefaultTeeReadRequest" so it's treated as an identifier.
+interface _DefaultTeeReadRequest {
+};
+
+// The DefaultTeeUnderlyingSource interface is entirely internal to Servo, and should not be accessible to
+// web pages.
+[LegacyNoInterfaceObject, Exposed=(Window,Worker)]
+// Need to escape "DefaultTeeUnderlyingSource" so it's treated as an identifier.
+interface _DefaultTeeUnderlyingSource {
+};
diff --git a/components/script/dom/webidls/UnderlyingSource.webidl b/components/script/dom/webidls/UnderlyingSource.webidl
new file mode 100644
index 00000000000..5295129ee38
--- /dev/null
+++ b/components/script/dom/webidls/UnderlyingSource.webidl
@@ -0,0 +1,21 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// https://streams.spec.whatwg.org/#underlying-source-api
+
+[GenerateInit]
+dictionary UnderlyingSource {
+ UnderlyingSourceStartCallback start;
+ UnderlyingSourcePullCallback pull;
+ UnderlyingSourceCancelCallback cancel;
+ ReadableStreamType type;
+ [EnforceRange] unsigned long long autoAllocateChunkSize;
+};
+
+typedef (ReadableStreamDefaultController or ReadableByteStreamController) ReadableStreamController;
+
+callback UnderlyingSourceStartCallback = any (ReadableStreamController controller);
+callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
+callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);
+
diff --git a/components/script/dom/webidls/UnderlyingSourceContainer.webidl b/components/script/dom/webidls/UnderlyingSourceContainer.webidl
new file mode 100644
index 00000000000..b0bcfcaf242
--- /dev/null
+++ b/components/script/dom/webidls/UnderlyingSourceContainer.webidl
@@ -0,0 +1,11 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// This interface is entirely internal to Servo, and should not be accessible to
+// web pages.
+
+[LegacyNoInterfaceObject, Exposed=(Window,Worker)]
+// Need to escape "ReadableStream" so it's treated as an identifier.
+interface _UnderlyingSourceContainer {
+};
diff --git a/components/script/fetch.rs b/components/script/fetch.rs
index b6c2181538d..266f848ec7a 100644
--- a/components/script/fetch.rs
+++ b/components/script/fetch.rs
@@ -279,7 +279,7 @@ impl FetchResponseListener for FetchContext {
fn process_response_chunk(&mut self, _: RequestId, chunk: Vec<u8>) {
let response = self.response_object.root();
- response.stream_chunk(chunk, CanGc::note());
+ response.stream_chunk(chunk);
}
fn process_response_eof(
diff --git a/components/script/microtask.rs b/components/script/microtask.rs
index 684a6f9d3b2..87bd4e1da1a 100644
--- a/components/script/microtask.rs
+++ b/components/script/microtask.rs
@@ -18,6 +18,7 @@ use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::PromiseBinding::PromiseJobCallback;
use crate::dom::bindings::codegen::Bindings::VoidFunctionBinding::VoidFunction;
use crate::dom::bindings::root::DomRoot;
+use crate::dom::defaultteereadrequest::DefaultTeeReadRequestMicrotask;
use crate::dom::globalscope::GlobalScope;
use crate::dom::htmlimageelement::ImageElementMicrotask;
use crate::dom::htmlmediaelement::MediaElementMicrotask;
@@ -41,6 +42,7 @@ pub enum Microtask {
User(UserMicrotask),
MediaElement(MediaElementMicrotask),
ImageElement(ImageElementMicrotask),
+ ReadableStreamTeeReadRequest(DefaultTeeReadRequestMicrotask),
CustomElementReaction,
NotifyMutationObservers,
}
@@ -140,6 +142,9 @@ impl MicrotaskQueue {
Microtask::NotifyMutationObservers => {
MutationObserver::notify_mutation_observers();
},
+ Microtask::ReadableStreamTeeReadRequest(ref task) => {
+ task.microtask_chunk_steps(can_gc)
+ },
}
}
}