diff options
author | gterzian <2792687+gterzian@users.noreply.github.com> | 2024-07-15 18:49:02 +0800 |
---|---|---|
committer | gterzian <2792687+gterzian@users.noreply.github.com> | 2024-07-15 18:49:02 +0800 |
commit | 24fcefc67390a371df43bbaf3cef25b5c3feef10 (patch) | |
tree | fe9466fe070248034bbf207e4573910a0ed76cec | |
parent | 2d2513a8626981a1017de832a555e10511cbb5a3 (diff) | |
download | servo-24fcefc67390a371df43bbaf3cef25b5c3feef10.tar.gz servo-24fcefc67390a371df43bbaf3cef25b5c3feef10.zip |
add queue with sizes concept
4 files changed, 82 insertions, 18 deletions
diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs index 512e375c61f..ad634e6abf1 100644 --- a/components/script/dom/readablestream.rs +++ b/components/script/dom/readablestream.rs @@ -238,7 +238,7 @@ impl ReadableStream { /// Note: in other use cases this call happens via the controller. pub fn enqueue_native(&self, bytes: Vec<u8>) { match self.controller { - ControllerType::Default(ref controller) => controller.enqueue_chunk(bytes), + ControllerType::Default(ref controller) => controller.enqueue_native(bytes), _ => unreachable!( "Enqueueing chunk to a stream from Rust on other than default controller" ), diff --git a/components/script/dom/readablestreamdefaultcontroller.rs b/components/script/dom/readablestreamdefaultcontroller.rs index dcc98fa46a9..74aff79e066 100644 --- a/components/script/dom/readablestreamdefaultcontroller.rs +++ b/components/script/dom/readablestreamdefaultcontroller.rs @@ -3,11 +3,16 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use std::cell::RefCell; +use std::collections::VecDeque; +use std::rc::Rc; use dom_struct::dom_struct; +use js::jsapi::Heap; use js::rust::{HandleValue as SafeHandleValue, HandleValue}; -use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods; +use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::{ + ReadableStreamDefaultControllerMethods, ValueWithSize, +}; use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller; use crate::dom::bindings::import::module::{Error, Fallible}; use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector}; @@ -48,13 +53,64 @@ impl Callback for PullAlgorithmRejectionHandler { } } +/// <https://streams.spec.whatwg.org/#value-with-size> +#[derive(JSTraceable)] +#[allow(crown::unrooted_must_root)] +pub enum EnqueuedValue { + /// A value enqueued from Rust. + Native(Vec<u8>), + /// A Js value. + Js(ValueWithSize), +} + +/// <https://streams.spec.whatwg.org/#queue-with-sizes> +#[derive(Default, JSTraceable, MallocSizeOf)] +pub struct QueueWithSizes { + total_size: usize, + #[ignore_malloc_size_of = "Rc is hard"] + queue: VecDeque<EnqueuedValue>, +} + +impl QueueWithSizes { + /// <https://streams.spec.whatwg.org/#dequeue-value> + fn dequeue_value(&mut self) -> EnqueuedValue { + self.queue + .pop_front() + .expect("Buffer cannot be empty when dequeue value is called into.") + } + + /// <https://streams.spec.whatwg.org/#enqueue-value-with-size> + fn enqueue_value_with_size(&mut self, value: EnqueuedValue) { + self.queue.push_back(value); + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + /// Only used with native sources. + fn get_in_memory_bytes(&self) -> Vec<u8> { + self.queue + .iter() + .flat_map(|value| { + let EnqueuedValue::Native(chunk) = value else { + unreachable!( + "`get_in_memory_bytes` can only be called on a queue with native values." + ) + }; + chunk.clone() + }) + .collect() + } +} + /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller> #[dom_struct] pub struct ReadableStreamDefaultController { reflector_: Reflector, /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue> - buffer: RefCell<Vec<u8>>, + queue: RefCell<QueueWithSizes>, underlying_source: Dom<UnderlyingSourceContainer>, @@ -68,7 +124,7 @@ impl ReadableStreamDefaultController { ) -> ReadableStreamDefaultController { ReadableStreamDefaultController { reflector_: Reflector::new(), - buffer: RefCell::new(vec![]), + queue: RefCell::new(Default::default()), stream: MutNullableDom::new(None), underlying_source: Dom::from_ref(&*UnderlyingSourceContainer::new( global, @@ -93,11 +149,10 @@ impl ReadableStreamDefaultController { self.stream.set(Some(stream)) } - fn get_chunk_with_length(&self, length: usize) -> Vec<u8> { - let mut buffer = self.buffer.borrow_mut(); - let buffer_len = buffer.len(); - assert!(buffer_len >= length); - buffer.split_off(buffer_len - length) + /// <https://streams.spec.whatwg.org/#dequeue-value> + fn dequeue_value(&self) -> EnqueuedValue { + let mut queue = self.queue.borrow_mut(); + queue.dequeue_value() } /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull> @@ -135,16 +190,18 @@ impl ReadableStreamDefaultController { /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-pullsteps> pub fn perform_pull_steps(&self, read_request: ReadRequest) { - // if buffer contains bytes, perform chunk steps. - if !self.buffer.borrow().is_empty() { + // if queue contains bytes, perform chunk steps. + if !self.queue.borrow().is_empty() { // TODO: use <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm> - let chunk = self.get_chunk_with_length(self.buffer.borrow().len()); + let chunk = self.dequeue_value(); // TODO: handle close requested. self.call_pull_if_needed(); - read_request.chunk_steps(chunk); + if let EnqueuedValue::Native(chunk) = chunk { + read_request.chunk_steps(chunk); + } } // else, append read request to reader. @@ -156,8 +213,9 @@ impl ReadableStreamDefaultController { self.call_pull_if_needed(); } + /// Native call to /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue> - pub fn enqueue_chunk(&self, mut chunk: Vec<u8>) { + pub fn enqueue_native(&self, chunk: Vec<u8>) { let stream = self .stream .get() @@ -170,9 +228,8 @@ impl ReadableStreamDefaultController { // TODO: strategy size algo. // <https://streams.spec.whatwg.org/#enqueue-value-with-size> - let mut buffer = self.buffer.borrow_mut(); - chunk.append(&mut buffer); - *buffer = chunk; + let mut queue = self.queue.borrow_mut(); + queue.enqueue_value_with_size(EnqueuedValue::Native(chunk)); self.call_pull_if_needed(); } @@ -185,7 +242,7 @@ impl ReadableStreamDefaultController { /// Return bytes synchronously if the stream has all data in memory. pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> { if self.underlying_source.in_memory() { - return Some(self.buffer.borrow().clone()); + return Some(self.queue.borrow().get_in_memory_bytes()); } None } diff --git a/components/script/dom/readablestreamdefaultreader.rs b/components/script/dom/readablestreamdefaultreader.rs index 252aa3d093a..33ec82011d2 100644 --- a/components/script/dom/readablestreamdefaultreader.rs +++ b/components/script/dom/readablestreamdefaultreader.rs @@ -25,6 +25,7 @@ use crate::dom::bindings::utils::set_dictionary_property; use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; use crate::dom::readablestream::ReadableStream; +use crate::dom::readablestreamdefaultcontroller::EnqueuedValue; use crate::script_runtime::JSContext as SafeJSContext; /// <https://streams.spec.whatwg.org/#read-request> diff --git a/components/script/dom/webidls/ReadableStreamDefaultController.webidl b/components/script/dom/webidls/ReadableStreamDefaultController.webidl index 6e3e1546dc8..55112191859 100644 --- a/components/script/dom/webidls/ReadableStreamDefaultController.webidl +++ b/components/script/dom/webidls/ReadableStreamDefaultController.webidl @@ -15,3 +15,9 @@ interface ReadableStreamDefaultController { [Throws] undefined error(optional any e); }; + +dictionary ValueWithSize { + any value; + long size; +}; + |