diff options
Diffstat (limited to 'components/net/decoder.rs')
-rw-r--r-- | components/net/decoder.rs | 198 |
1 files changed, 119 insertions, 79 deletions
diff --git a/components/net/decoder.rs b/components/net/decoder.rs index b54035fa4d9..d151e08ceec 100644 --- a/components/net/decoder.rs +++ b/components/net/decoder.rs @@ -29,20 +29,24 @@ The following types directly support the gzip compression case: use crate::connector::BUF_SIZE; use brotli::Decompressor; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use flate2::read::DeflateDecoder; -use futures::{Async, Future, Poll, Stream}; +use futures::{task::Context, task::Poll, Future, Stream}; use hyper::header::{HeaderValue, CONTENT_ENCODING, TRANSFER_ENCODING}; -use hyper::{self, Body, Chunk, Response}; +use hyper::{self, Body, Response}; use libflate::non_blocking::gzip; use std::cmp; use std::fmt; use std::io::{self, Read}; use std::mem; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::Waker; +#[derive(Debug)] pub enum Error { Io(io::Error), - Hyper(hyper::error::Error), + Hyper(hyper::Error), } impl From<io::Error> for Error { @@ -51,8 +55,8 @@ impl From<io::Error> for Error { } } -impl From<hyper::error::Error> for Error { - fn from(err: hyper::error::Error) -> Error { +impl From<hyper::Error> for Error { + fn from(err: hyper::Error) -> Error { Error::Hyper(err) } } @@ -93,10 +97,11 @@ struct Pending { } /// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results -/// as a `Chunk`. +/// as a `Bytes`. struct Gzip { inner: Box<gzip::Decoder<Peeked<ReadableChunks<Body>>>>, buf: BytesMut, + reader: Arc<Mutex<ReadableChunks<Body>>>, } impl fmt::Debug for Decoder { @@ -162,37 +167,36 @@ impl Decoder { } impl Stream for Decoder { - type Item = Chunk; - type Error = Error; + type Item = Result<Bytes, Error>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Do a read or poll for a pending decoder value. let new_value = match self.inner { - Inner::Pending(ref mut future) => match future.poll() { - Ok(Async::Ready(inner)) => inner, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => return Err(e.into()), + Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { + Poll::Ready(inner) => inner, + Poll::Pending => return Poll::Pending, }, - Inner::PlainText(ref mut body) => return body.poll().map_err(|e| e.into()), - Inner::Gzip(ref mut decoder) => return decoder.poll(), - Inner::Brotli(ref mut decoder) => return decoder.poll(), - Inner::Deflate(ref mut decoder) => return decoder.poll(), + Inner::PlainText(ref mut body) => { + return Pin::new(body).poll_next(cx).map_err(|e| e.into()) + }, + Inner::Gzip(ref mut decoder) => return Pin::new(decoder).poll_next(cx), + Inner::Brotli(ref mut decoder) => return Pin::new(decoder).poll_next(cx), + Inner::Deflate(ref mut decoder) => return Pin::new(decoder).poll_next(cx), }; + // self.inner = new_value; - self.poll() + self.poll_next(cx) } } impl Future for Pending { - type Item = Inner; - type Error = hyper::error::Error; - - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let body_state = match self.body.poll_stream() { - Ok(Async::Ready(state)) => state, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => return Err(e), + type Output = Inner; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let body_state = match self.body.poll_stream(cx) { + Poll::Ready(state) => state, + Poll::Pending => return Poll::Pending, }; let body = mem::replace(&mut self.body, ReadableChunks::new(Body::empty())); @@ -200,110 +204,133 @@ impl Future for Pending { // if the stream was empty, or truly had an UnexpectedEof. // Therefore, we need to check for EOF first. match body_state { - StreamState::Eof => Ok(Async::Ready(Inner::PlainText(Body::empty()))), - StreamState::HasMore => Ok(Async::Ready(match self.type_ { + StreamState::Eof => Poll::Ready(Inner::PlainText(Body::empty())), + StreamState::HasMore => Poll::Ready(match self.type_ { DecoderType::Gzip => Inner::Gzip(Gzip::new(body)), DecoderType::Brotli => Inner::Brotli(Brotli::new(body)), DecoderType::Deflate => Inner::Deflate(Deflate::new(body)), - })), + }), } } } impl Gzip { fn new(stream: ReadableChunks<Body>) -> Self { + let stream = Arc::new(Mutex::new(stream)); + let reader = stream.clone(); Gzip { buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), inner: Box::new(gzip::Decoder::new(Peeked::new(stream))), + reader: reader, } } } #[allow(unsafe_code)] -fn poll_with_read(reader: &mut dyn Read, buf: &mut BytesMut) -> Poll<Option<Chunk>, Error> { - if buf.remaining_mut() == 0 { - buf.reserve(INIT_BUFFER_SIZE); - } +fn poll_with_read(reader: &mut dyn Read, buf: &mut BytesMut) -> Poll<Option<Result<Bytes, Error>>> { + // Ensure a full size buffer is available. + // `reserve` is optimized to reclaim space over allocating. + buf.reserve(INIT_BUFFER_SIZE); // The buffer contains uninitialised memory so getting a readable slice is unsafe. // We trust the reader not to read from the memory given. // // To be safe, this memory could be zeroed before passing to the reader. // Otherwise we might need to deal with the case where the reader panics. + let read = { - let mut buf = unsafe { buf.bytes_mut() }; - reader.read(&mut buf) + let buf = unsafe { + let ptr = buf.chunk_mut().as_mut_ptr(); + std::slice::from_raw_parts_mut(ptr, buf.capacity()) + }; + reader.read(&mut *buf) }; match read { - Ok(read) if read == 0 => Ok(Async::Ready(None)), + Ok(read) if read == 0 => Poll::Ready(None), Ok(read) => { unsafe { buf.advance_mut(read) }; - let chunk = Chunk::from(buf.split_to(read).freeze()); - - Ok(Async::Ready(Some(chunk))) + let chunk = buf.split_to(read).freeze(); + Poll::Ready(Some(Ok(chunk))) }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(Async::NotReady), - Err(e) => Err(e.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending, + Err(e) => Poll::Ready(Some(Err(e.into()))), } } impl Stream for Gzip { - type Item = Chunk; - type Error = Error; + type Item = Result<Bytes, Error>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - poll_with_read(&mut self.inner, &mut self.buf) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut buf = self.buf.clone(); + if let Ok(mut reader) = self.reader.lock() { + reader.waker = Some(cx.waker().clone()); + } + poll_with_read(&mut self.inner, &mut buf) } } /// A brotli decoder that reads from a `brotli::Decompressor` into a `BytesMut` and emits the results -/// as a `Chunk`. +/// as a `Bytes`. struct Brotli { inner: Box<Decompressor<Peeked<ReadableChunks<Body>>>>, buf: BytesMut, + reader: Arc<Mutex<ReadableChunks<Body>>>, } impl Brotli { fn new(stream: ReadableChunks<Body>) -> Self { + let stream = Arc::new(Mutex::new(stream)); + let reader = stream.clone(); Self { buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), inner: Box::new(Decompressor::new(Peeked::new(stream), BUF_SIZE)), + reader: reader, } } } impl Stream for Brotli { - type Item = Chunk; - type Error = Error; + type Item = Result<Bytes, Error>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - poll_with_read(&mut self.inner, &mut self.buf) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut buf = self.buf.clone(); + if let Ok(mut reader) = self.reader.lock() { + reader.waker = Some(cx.waker().clone()); + } + poll_with_read(&mut self.inner, &mut buf) } } /// A deflate decoder that reads from a `deflate::Decoder` into a `BytesMut` and emits the results -/// as a `Chunk`. +/// as a `Bytes`. struct Deflate { inner: Box<DeflateDecoder<Peeked<ReadableChunks<Body>>>>, buf: BytesMut, + reader: Arc<Mutex<ReadableChunks<Body>>>, } impl Deflate { fn new(stream: ReadableChunks<Body>) -> Self { + let stream = Arc::new(Mutex::new(stream)); + let reader = stream.clone(); Self { buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), inner: Box::new(DeflateDecoder::new(Peeked::new(stream))), + reader: reader, } } } impl Stream for Deflate { - type Item = Chunk; - type Error = Error; + type Item = Result<Bytes, Error>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - poll_with_read(&mut self.inner, &mut self.buf) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut buf = self.buf.clone(); + if let Ok(mut reader) = self.reader.lock() { + reader.waker = Some(cx.waker().clone()); + } + poll_with_read(&mut self.inner, &mut buf) } } @@ -311,17 +338,21 @@ impl Stream for Deflate { pub struct ReadableChunks<S> { state: ReadState, stream: S, + waker: Option<Waker>, } enum ReadState { /// A chunk is ready to be read from. - Ready(Chunk), + Ready(Bytes), /// The next chunk isn't ready yet. NotReady, /// The stream has finished. Eof, + /// Stream is in err + Error(hyper::Error), } +#[derive(Debug)] enum StreamState { /// More bytes can be read from the stream. HasMore, @@ -334,7 +365,7 @@ struct Peeked<R> { state: PeekedState, peeked_buf: [u8; 10], pos: usize, - inner: R, + inner: Arc<Mutex<R>>, } enum PeekedState { @@ -346,7 +377,7 @@ enum PeekedState { impl<R> Peeked<R> { #[inline] - fn new(inner: R) -> Self { + fn new(inner: Arc<Mutex<R>>) -> Self { Peeked { state: PeekedState::NotReady, peeked_buf: [0; 10], @@ -383,11 +414,13 @@ impl<R: Read> Read for Peeked<R> { if self.pos == peeked_buf_len { self.not_ready(); } - return Ok(len); }, PeekedState::NotReady => { - let read = self.inner.read(&mut self.peeked_buf[self.pos..]); + let mut buf = &mut self.peeked_buf[self.pos..]; + let stream = self.inner.clone(); + let mut reader = stream.lock().unwrap(); + let read = reader.read(&mut buf); match read { Ok(0) => self.ready(), @@ -411,6 +444,7 @@ impl<S> ReadableChunks<S> { ReadableChunks { state: ReadState::NotReady, stream: stream, + waker: None, } } } @@ -423,9 +457,12 @@ impl<S> fmt::Debug for ReadableChunks<S> { impl<S> Read for ReadableChunks<S> where - S: Stream<Item = Chunk, Error = hyper::error::Error>, + S: Stream<Item = Result<Bytes, hyper::Error>> + std::marker::Unpin, { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let waker = self.waker.as_ref().unwrap().clone(); + let mut cx = Context::from_waker(&waker); + loop { let ret; match self.state { @@ -440,15 +477,15 @@ where return Ok(len); } }, - ReadState::NotReady => match self.poll_stream() { - Ok(Async::Ready(StreamState::HasMore)) => continue, - Ok(Async::Ready(StreamState::Eof)) => return Ok(0), - Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()), - Err(e) => { - return Err(io::Error::new(io::ErrorKind::Other, e)); - }, + ReadState::NotReady => match self.poll_stream(&mut cx) { + Poll::Ready(StreamState::HasMore) => continue, + Poll::Ready(StreamState::Eof) => return Ok(0), + Poll::Pending => return Err(io::ErrorKind::WouldBlock.into()), }, ReadState::Eof => return Ok(0), + ReadState::Error(ref err) => { + return Err(io::Error::new(io::ErrorKind::Other, err.to_string())) + }, } self.state = ReadState::NotReady; return Ok(ret); @@ -458,26 +495,29 @@ where impl<S> ReadableChunks<S> where - S: Stream<Item = Chunk, Error = hyper::error::Error>, + S: Stream<Item = Result<Bytes, hyper::Error>> + std::marker::Unpin, { /// Poll the readiness of the inner reader. /// /// This function will update the internal state and return a simplified /// version of the `ReadState`. - fn poll_stream(&mut self) -> Poll<StreamState, hyper::error::Error> { - match self.stream.poll() { - Ok(Async::Ready(Some(chunk))) => { + fn poll_stream(&mut self, cx: &mut Context<'_>) -> Poll<StreamState> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { self.state = ReadState::Ready(chunk); - Ok(Async::Ready(StreamState::HasMore)) + Poll::Ready(StreamState::HasMore) }, - Ok(Async::Ready(None)) => { - self.state = ReadState::Eof; + Poll::Ready(Some(Err(err))) => { + self.state = ReadState::Error(err); - Ok(Async::Ready(StreamState::Eof)) + Poll::Ready(StreamState::Eof) + }, + Poll::Ready(None) => { + self.state = ReadState::Eof; + Poll::Ready(StreamState::Eof) }, - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => Err(e), + Poll::Pending => Poll::Pending, } } } |