diff options
author | Gregory Terzian <gterzian@users.noreply.github.com> | 2020-02-29 11:59:10 +0800 |
---|---|---|
committer | Gregory Terzian <gterzian@users.noreply.github.com> | 2020-06-04 11:38:35 +0800 |
commit | bd5796c90b8e8e066a32e7da9cfa5251d1559046 (patch) | |
tree | aa5ed3de6608b8046924ef6e8e8ad43b91cf81a7 /components/script/dom/readablestream.rs | |
parent | 0281acea955085aec01a4fc6da5f89f326a14842 (diff) | |
download | servo-bd5796c90b8e8e066a32e7da9cfa5251d1559046.tar.gz servo-bd5796c90b8e8e066a32e7da9cfa5251d1559046.zip |
integrate readablestream with fetch and blob
Diffstat (limited to 'components/script/dom/readablestream.rs')
-rw-r--r-- | components/script/dom/readablestream.rs | 509 |
1 files changed, 509 insertions, 0 deletions
diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs new file mode 100644 index 00000000000..d2677e80547 --- /dev/null +++ b/components/script/dom/readablestream.rs @@ -0,0 +1,509 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult}; +use crate::dom::bindings::error::Error; +use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector}; +use crate::dom::bindings::root::DomRoot; +use crate::dom::bindings::settings_stack::AutoEntryScript; +use crate::dom::bindings::utils::get_dictionary_property; +use crate::dom::globalscope::GlobalScope; +use crate::dom::promise::Promise; +use crate::js::conversions::FromJSValConvertible; +use crate::realms::{enter_realm, InRealm}; +use crate::script_runtime::JSContext as SafeJSContext; +use dom_struct::dom_struct; +use js::glue::{ + CreateReadableStreamUnderlyingSource, DeleteReadableStreamUnderlyingSource, + ReadableStreamUnderlyingSourceTraps, +}; +use js::jsapi::{HandleObject, HandleValue, Heap, JSContext, JSObject}; +use js::jsapi::{ + IsReadableStream, NewReadableExternalSourceStreamObject, ReadableStreamClose, + ReadableStreamDefaultReaderRead, ReadableStreamError, ReadableStreamGetReader, + ReadableStreamIsDisturbed, ReadableStreamIsLocked, ReadableStreamIsReadable, + ReadableStreamReaderMode, ReadableStreamReaderReleaseLock, ReadableStreamUnderlyingSource, + ReadableStreamUpdateDataAvailableFromSource, UnwrapReadableStream, +}; +use js::jsval::JSVal; +use js::jsval::UndefinedValue; +use js::rust::HandleValue as SafeHandleValue; +use js::rust::IntoHandle; +use std::cell::{Cell, RefCell}; +use std::os::raw::c_void; +use std::ptr::{self, NonNull}; +use std::rc::Rc; +use std::slice; + +static UNDERLYING_SOURCE_TRAPS: ReadableStreamUnderlyingSourceTraps = + ReadableStreamUnderlyingSourceTraps { + requestData: Some(request_data), + writeIntoReadRequestBuffer: Some(write_into_read_request_buffer), + cancel: Some(cancel), + onClosed: Some(close), + onErrored: Some(error), + finalize: Some(finalize), + }; + +#[dom_struct] +pub struct ReadableStream { + reflector_: Reflector, + #[ignore_malloc_size_of = "SM handles JS values"] + js_stream: Heap<*mut JSObject>, + #[ignore_malloc_size_of = "SM handles JS values"] + js_reader: Heap<*mut JSObject>, + has_reader: Cell<bool>, + #[ignore_malloc_size_of = "Rc is hard"] + external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>, +} + +impl ReadableStream { + fn new_inherited( + external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>, + ) -> ReadableStream { + ReadableStream { + reflector_: Reflector::new(), + js_stream: Heap::default(), + js_reader: Heap::default(), + has_reader: Default::default(), + external_underlying_source: external_underlying_source, + } + } + + fn new( + global: &GlobalScope, + external_underlying_source: Option<Rc<ExternalUnderlyingSourceController>>, + ) -> DomRoot<ReadableStream> { + reflect_dom_object( + Box::new(ReadableStream::new_inherited(external_underlying_source)), + global, + ) + } + + /// Used from RustCodegen.py + #[allow(unsafe_code)] + pub unsafe fn from_js( + cx: SafeJSContext, + obj: *mut JSObject, + realm: InRealm, + ) -> Result<DomRoot<ReadableStream>, ()> { + if !IsReadableStream(obj) { + return Err(()); + } + + let global = GlobalScope::from_safe_context(cx, realm); + + let stream = ReadableStream::new(&global, None); + stream.js_stream.set(UnwrapReadableStream(obj)); + + Ok(stream) + } + + /// Build a stream backed by a Rust source that has already been read into memory. + pub fn new_from_bytes(global: &GlobalScope, bytes: Vec<u8>) -> DomRoot<ReadableStream> { + let stream = ReadableStream::new_with_external_underlying_source( + &global, + ExternalUnderlyingSource::Memory(bytes.len()), + ); + stream.enqueue_native(bytes); + stream.close_native(); + stream + } + + /// Build a stream backed by a Rust underlying source. + #[allow(unsafe_code)] + pub fn new_with_external_underlying_source( + global: &GlobalScope, + source: ExternalUnderlyingSource, + ) -> DomRoot<ReadableStream> { + let _ar = enter_realm(global); + let cx = global.get_cx(); + + let source = Rc::new(ExternalUnderlyingSourceController::new(source)); + + let stream = ReadableStream::new(&global, Some(source.clone())); + + unsafe { + let js_wrapper = CreateReadableStreamUnderlyingSource( + &UNDERLYING_SOURCE_TRAPS, + &*source as *const _ as *const c_void, + ); + + rooted!(in(*cx) + let js_stream = NewReadableExternalSourceStreamObject( + *cx, + js_wrapper, + ptr::null_mut(), + HandleObject::null(), + ) + ); + + stream.js_stream.set(UnwrapReadableStream(js_stream.get())); + } + + stream + } + + /// Get a pointer to the underlying JS object. + pub fn get_js_stream(&self) -> NonNull<JSObject> { + NonNull::new(self.js_stream.get()) + .expect("Couldn't get a non-null pointer to JS stream object.") + } + + #[allow(unsafe_code)] + pub fn enqueue_native(&self, bytes: Vec<u8>) { + let global = self.global(); + let _ar = enter_realm(&*global); + let cx = global.get_cx(); + + let handle = unsafe { self.js_stream.handle() }; + + self.external_underlying_source + .as_ref() + .expect("No external source to enqueue bytes.") + .enqueue_chunk(cx, handle, bytes); + } + + #[allow(unsafe_code)] + pub fn error_native(&self, error: Error) { + let global = self.global(); + let _ar = enter_realm(&*global); + let cx = global.get_cx(); + + unsafe { + rooted!(in(*cx) let mut js_error = UndefinedValue()); + error.to_jsval(*cx, &global, js_error.handle_mut()); + ReadableStreamError( + *cx, + self.js_stream.handle(), + js_error.handle().into_handle(), + ); + } + } + + #[allow(unsafe_code)] + pub fn close_native(&self) { + let global = self.global(); + let _ar = enter_realm(&*global); + let cx = global.get_cx(); + + let handle = unsafe { self.js_stream.handle() }; + + self.external_underlying_source + .as_ref() + .expect("No external source to close.") + .close(cx, handle); + } + + /// Acquires a reader and locks the stream, + /// must be done before `read_a_chunk`. + #[allow(unsafe_code)] + pub fn start_reading(&self) -> Result<(), ()> { + if self.is_locked() || self.is_disturbed() { + return Err(()); + } + + let global = self.global(); + let _ar = enter_realm(&*global); + let cx = global.get_cx(); + + unsafe { + rooted!(in(*cx) let reader = ReadableStreamGetReader( + *cx, + self.js_stream.handle(), + ReadableStreamReaderMode::Default, + )); + + // Note: the stream is locked to the reader. + self.js_reader.set(reader.get()); + } + + self.has_reader.set(true); + Ok(()) + } + + /// Read a chunk from the stream, + /// must be called after `start_reading`, + /// and before `stop_reading`. + #[allow(unsafe_code)] + pub fn read_a_chunk(&self) -> Rc<Promise> { + if !self.has_reader.get() { + panic!("Attempt to read stream chunk without having acquired a reader."); + } + + let global = self.global(); + let _ar = enter_realm(&*global); + let _aes = AutoEntryScript::new(&*global); + + let cx = global.get_cx(); + + unsafe { + rooted!(in(*cx) let promise_obj = ReadableStreamDefaultReaderRead( + *cx, + self.js_reader.handle(), + )); + Promise::new_with_js_promise(promise_obj.handle(), cx) + } + } + + /// Releases the lock on the reader, + /// must be done after `start_reading`. + #[allow(unsafe_code)] + pub fn stop_reading(&self) { + if !self.has_reader.get() { + panic!("ReadableStream::stop_reading called on a readerless stream."); + } + + self.has_reader.set(false); + + let global = self.global(); + let _ar = enter_realm(&*global); + let cx = global.get_cx(); + + unsafe { + ReadableStreamReaderReleaseLock(*cx, self.js_reader.handle()); + // Note: is this the way to nullify the Heap? + self.js_reader.set(ptr::null_mut()); + } + } + + #[allow(unsafe_code)] + pub fn is_locked(&self) -> bool { + // If we natively took a reader, we're locked. + if self.has_reader.get() { + return true; + } + + // Otherwise, still double-check that script didn't lock the stream. + let cx = self.global().get_cx(); + let mut locked_or_disturbed = false; + + unsafe { + ReadableStreamIsLocked(*cx, self.js_stream.handle(), &mut locked_or_disturbed); + } + + locked_or_disturbed + } + + #[allow(unsafe_code)] + pub fn is_disturbed(&self) -> bool { + // Check that script didn't disturb the stream. + let cx = self.global().get_cx(); + let mut locked_or_disturbed = false; + + unsafe { + ReadableStreamIsDisturbed(*cx, self.js_stream.handle(), &mut locked_or_disturbed); + } + + locked_or_disturbed + } +} + +#[allow(unsafe_code)] +unsafe extern "C" fn request_data( + source: *const c_void, + cx: *mut JSContext, + stream: HandleObject, + desired_size: usize, +) { + let source = &*(source as *const ExternalUnderlyingSourceController); + source.pull(SafeJSContext::from_ptr(cx), stream, desired_size); +} + +#[allow(unsafe_code)] +unsafe extern "C" fn write_into_read_request_buffer( + source: *const c_void, + _cx: *mut JSContext, + _stream: HandleObject, + buffer: *mut c_void, + length: usize, + bytes_written: *mut usize, +) { + let source = &*(source as *const ExternalUnderlyingSourceController); + let slice = slice::from_raw_parts_mut(buffer as *mut u8, length); + source.write_into_buffer(slice); + + // Currently we're always able to completely fulfill the write request. + *bytes_written = length; +} + +#[allow(unsafe_code)] +unsafe extern "C" fn cancel( + _source: *const c_void, + _cx: *mut JSContext, + _stream: HandleObject, + _reason: HandleValue, + _resolve_to: *mut JSVal, +) { +} + +#[allow(unsafe_code)] +unsafe extern "C" fn close(_source: *const c_void, _cx: *mut JSContext, _stream: HandleObject) {} + +#[allow(unsafe_code)] +unsafe extern "C" fn error( + _source: *const c_void, + _cx: *mut JSContext, + _stream: HandleObject, + _reason: HandleValue, +) { +} + +#[allow(unsafe_code)] +unsafe extern "C" fn finalize(source: *mut ReadableStreamUnderlyingSource) { + DeleteReadableStreamUnderlyingSource(source); +} + +pub enum ExternalUnderlyingSource { + /// Facilitate partial integration with sources + /// that are currently read into memory. + Memory(usize), + /// A blob as underlying source, with a known total size. + Blob(usize), + /// A fetch response as underlying source. + FetchResponse, +} + +#[derive(JSTraceable, MallocSizeOf)] +struct ExternalUnderlyingSourceController { + /// Loosely matches the underlying queue, + /// <https://streams.spec.whatwg.org/#internal-queues> + buffer: RefCell<Vec<u8>>, + /// Has the stream been closed by native code? + closed: Cell<bool>, +} + +impl ExternalUnderlyingSourceController { + fn new(source: ExternalUnderlyingSource) -> ExternalUnderlyingSourceController { + let buffer = match source { + ExternalUnderlyingSource::Blob(size) | ExternalUnderlyingSource::Memory(size) => { + Vec::with_capacity(size) + }, + ExternalUnderlyingSource::FetchResponse => vec![], + }; + ExternalUnderlyingSourceController { + buffer: RefCell::new(buffer), + closed: Cell::new(false), + } + } + + /// Signal available bytes if the stream is currently readable. + #[allow(unsafe_code)] + fn maybe_signal_available_bytes( + &self, + cx: SafeJSContext, + stream: HandleObject, + available: usize, + ) { + if available == 0 { + return; + } + unsafe { + let mut readable = false; + if !ReadableStreamIsReadable(*cx, stream, &mut readable) { + return; + } + if readable { + ReadableStreamUpdateDataAvailableFromSource(*cx, stream, available as u32); + } + } + } + + /// Close a currently readable js stream. + #[allow(unsafe_code)] + fn maybe_close_js_stream(&self, cx: SafeJSContext, stream: HandleObject) { + unsafe { + let mut readable = false; + if !ReadableStreamIsReadable(*cx, stream, &mut readable) { + return; + } + if readable { + ReadableStreamClose(*cx, stream); + } + } + } + + fn close(&self, cx: SafeJSContext, stream: HandleObject) { + self.closed.set(true); + self.maybe_close_js_stream(cx, stream); + } + + fn enqueue_chunk(&self, cx: SafeJSContext, stream: HandleObject, mut chunk: Vec<u8>) { + let available = { + let mut buffer = self.buffer.borrow_mut(); + chunk.append(&mut buffer); + *buffer = chunk; + buffer.len() + }; + self.maybe_signal_available_bytes(cx, stream, available); + } + + #[allow(unsafe_code)] + fn pull(&self, cx: SafeJSContext, stream: HandleObject, _desired_size: usize) { + // Note: for pull sources, + // this would be the time to ask for a chunk. + + if self.closed.get() { + return self.maybe_close_js_stream(cx, stream); + } + + let available = { + let buffer = self.buffer.borrow(); + buffer.len() + }; + + self.maybe_signal_available_bytes(cx, stream, available); + } + + fn get_chunk_with_length(&self, length: usize) -> Vec<u8> { + let mut buffer = self.buffer.borrow_mut(); + let buffer_len = buffer.len(); + assert!(buffer_len >= length as usize); + buffer.split_off(buffer_len - length) + } + + fn write_into_buffer(&self, dest: &mut [u8]) { + let length = dest.len(); + let chunk = self.get_chunk_with_length(length); + dest.copy_from_slice(chunk.as_slice()); + } +} + +#[allow(unsafe_code)] +/// Get the `done` property of an object that a read promise resolved to. +pub fn get_read_promise_done(cx: SafeJSContext, v: &SafeHandleValue) -> Result<bool, Error> { + unsafe { + rooted!(in(*cx) let object = v.to_object()); + rooted!(in(*cx) let mut done = UndefinedValue()); + match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut()) { + Ok(true) => match bool::from_jsval(*cx, done.handle(), ()) { + Ok(ConversionResult::Success(val)) => Ok(val), + Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())), + _ => Err(Error::Type("Unknown format for done property.".to_string())), + }, + Ok(false) => Err(Error::Type("Promise has no done property.".to_string())), + Err(()) => Err(Error::JSFailed), + } + } +} + +#[allow(unsafe_code)] +/// Get the `value` property of an object that a read promise resolved to. +pub fn get_read_promise_bytes(cx: SafeJSContext, v: &SafeHandleValue) -> Result<Vec<u8>, Error> { + unsafe { + rooted!(in(*cx) let object = v.to_object()); + rooted!(in(*cx) let mut bytes = UndefinedValue()); + match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut()) { + Ok(true) => { + match Vec::<u8>::from_jsval(*cx, bytes.handle(), ConversionBehavior::EnforceRange) { + Ok(ConversionResult::Success(val)) => Ok(val), + Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())), + _ => Err(Error::Type("Unknown format for bytes read.".to_string())), + } + }, + Ok(false) => Err(Error::Type("Promise has no value property.".to_string())), + Err(()) => Err(Error::JSFailed), + } + } +} |