aboutsummaryrefslogtreecommitdiffstats
path: root/components/net/decoder.rs
diff options
context:
space:
mode:
Diffstat (limited to 'components/net/decoder.rs')
-rw-r--r--components/net/decoder.rs198
1 files changed, 119 insertions, 79 deletions
diff --git a/components/net/decoder.rs b/components/net/decoder.rs
index b54035fa4d9..d151e08ceec 100644
--- a/components/net/decoder.rs
+++ b/components/net/decoder.rs
@@ -29,20 +29,24 @@ The following types directly support the gzip compression case:
use crate::connector::BUF_SIZE;
use brotli::Decompressor;
-use bytes::{Buf, BufMut, BytesMut};
+use bytes::{Buf, BufMut, Bytes, BytesMut};
use flate2::read::DeflateDecoder;
-use futures::{Async, Future, Poll, Stream};
+use futures::{task::Context, task::Poll, Future, Stream};
use hyper::header::{HeaderValue, CONTENT_ENCODING, TRANSFER_ENCODING};
-use hyper::{self, Body, Chunk, Response};
+use hyper::{self, Body, Response};
use libflate::non_blocking::gzip;
use std::cmp;
use std::fmt;
use std::io::{self, Read};
use std::mem;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::Waker;
+#[derive(Debug)]
pub enum Error {
Io(io::Error),
- Hyper(hyper::error::Error),
+ Hyper(hyper::Error),
}
impl From<io::Error> for Error {
@@ -51,8 +55,8 @@ impl From<io::Error> for Error {
}
}
-impl From<hyper::error::Error> for Error {
- fn from(err: hyper::error::Error) -> Error {
+impl From<hyper::Error> for Error {
+ fn from(err: hyper::Error) -> Error {
Error::Hyper(err)
}
}
@@ -93,10 +97,11 @@ struct Pending {
}
/// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results
-/// as a `Chunk`.
+/// as a `Bytes`.
struct Gzip {
inner: Box<gzip::Decoder<Peeked<ReadableChunks<Body>>>>,
buf: BytesMut,
+ reader: Arc<Mutex<ReadableChunks<Body>>>,
}
impl fmt::Debug for Decoder {
@@ -162,37 +167,36 @@ impl Decoder {
}
impl Stream for Decoder {
- type Item = Chunk;
- type Error = Error;
+ type Item = Result<Bytes, Error>;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Do a read or poll for a pending decoder value.
let new_value = match self.inner {
- Inner::Pending(ref mut future) => match future.poll() {
- Ok(Async::Ready(inner)) => inner,
- Ok(Async::NotReady) => return Ok(Async::NotReady),
- Err(e) => return Err(e.into()),
+ Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
+ Poll::Ready(inner) => inner,
+ Poll::Pending => return Poll::Pending,
},
- Inner::PlainText(ref mut body) => return body.poll().map_err(|e| e.into()),
- Inner::Gzip(ref mut decoder) => return decoder.poll(),
- Inner::Brotli(ref mut decoder) => return decoder.poll(),
- Inner::Deflate(ref mut decoder) => return decoder.poll(),
+ Inner::PlainText(ref mut body) => {
+ return Pin::new(body).poll_next(cx).map_err(|e| e.into())
+ },
+ Inner::Gzip(ref mut decoder) => return Pin::new(decoder).poll_next(cx),
+ Inner::Brotli(ref mut decoder) => return Pin::new(decoder).poll_next(cx),
+ Inner::Deflate(ref mut decoder) => return Pin::new(decoder).poll_next(cx),
};
+ //
self.inner = new_value;
- self.poll()
+ self.poll_next(cx)
}
}
impl Future for Pending {
- type Item = Inner;
- type Error = hyper::error::Error;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let body_state = match self.body.poll_stream() {
- Ok(Async::Ready(state)) => state,
- Ok(Async::NotReady) => return Ok(Async::NotReady),
- Err(e) => return Err(e),
+ type Output = Inner;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let body_state = match self.body.poll_stream(cx) {
+ Poll::Ready(state) => state,
+ Poll::Pending => return Poll::Pending,
};
let body = mem::replace(&mut self.body, ReadableChunks::new(Body::empty()));
@@ -200,110 +204,133 @@ impl Future for Pending {
// if the stream was empty, or truly had an UnexpectedEof.
// Therefore, we need to check for EOF first.
match body_state {
- StreamState::Eof => Ok(Async::Ready(Inner::PlainText(Body::empty()))),
- StreamState::HasMore => Ok(Async::Ready(match self.type_ {
+ StreamState::Eof => Poll::Ready(Inner::PlainText(Body::empty())),
+ StreamState::HasMore => Poll::Ready(match self.type_ {
DecoderType::Gzip => Inner::Gzip(Gzip::new(body)),
DecoderType::Brotli => Inner::Brotli(Brotli::new(body)),
DecoderType::Deflate => Inner::Deflate(Deflate::new(body)),
- })),
+ }),
}
}
}
impl Gzip {
fn new(stream: ReadableChunks<Body>) -> Self {
+ let stream = Arc::new(Mutex::new(stream));
+ let reader = stream.clone();
Gzip {
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
inner: Box::new(gzip::Decoder::new(Peeked::new(stream))),
+ reader: reader,
}
}
}
#[allow(unsafe_code)]
-fn poll_with_read(reader: &mut dyn Read, buf: &mut BytesMut) -> Poll<Option<Chunk>, Error> {
- if buf.remaining_mut() == 0 {
- buf.reserve(INIT_BUFFER_SIZE);
- }
+fn poll_with_read(reader: &mut dyn Read, buf: &mut BytesMut) -> Poll<Option<Result<Bytes, Error>>> {
+ // Ensure a full size buffer is available.
+ // `reserve` is optimized to reclaim space over allocating.
+ buf.reserve(INIT_BUFFER_SIZE);
// The buffer contains uninitialised memory so getting a readable slice is unsafe.
// We trust the reader not to read from the memory given.
//
// To be safe, this memory could be zeroed before passing to the reader.
// Otherwise we might need to deal with the case where the reader panics.
+
let read = {
- let mut buf = unsafe { buf.bytes_mut() };
- reader.read(&mut buf)
+ let buf = unsafe {
+ let ptr = buf.chunk_mut().as_mut_ptr();
+ std::slice::from_raw_parts_mut(ptr, buf.capacity())
+ };
+ reader.read(&mut *buf)
};
match read {
- Ok(read) if read == 0 => Ok(Async::Ready(None)),
+ Ok(read) if read == 0 => Poll::Ready(None),
Ok(read) => {
unsafe { buf.advance_mut(read) };
- let chunk = Chunk::from(buf.split_to(read).freeze());
-
- Ok(Async::Ready(Some(chunk)))
+ let chunk = buf.split_to(read).freeze();
+ Poll::Ready(Some(Ok(chunk)))
},
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(Async::NotReady),
- Err(e) => Err(e.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending,
+ Err(e) => Poll::Ready(Some(Err(e.into()))),
}
}
impl Stream for Gzip {
- type Item = Chunk;
- type Error = Error;
+ type Item = Result<Bytes, Error>;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- poll_with_read(&mut self.inner, &mut self.buf)
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut buf = self.buf.clone();
+ if let Ok(mut reader) = self.reader.lock() {
+ reader.waker = Some(cx.waker().clone());
+ }
+ poll_with_read(&mut self.inner, &mut buf)
}
}
/// A brotli decoder that reads from a `brotli::Decompressor` into a `BytesMut` and emits the results
-/// as a `Chunk`.
+/// as a `Bytes`.
struct Brotli {
inner: Box<Decompressor<Peeked<ReadableChunks<Body>>>>,
buf: BytesMut,
+ reader: Arc<Mutex<ReadableChunks<Body>>>,
}
impl Brotli {
fn new(stream: ReadableChunks<Body>) -> Self {
+ let stream = Arc::new(Mutex::new(stream));
+ let reader = stream.clone();
Self {
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
inner: Box::new(Decompressor::new(Peeked::new(stream), BUF_SIZE)),
+ reader: reader,
}
}
}
impl Stream for Brotli {
- type Item = Chunk;
- type Error = Error;
+ type Item = Result<Bytes, Error>;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- poll_with_read(&mut self.inner, &mut self.buf)
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut buf = self.buf.clone();
+ if let Ok(mut reader) = self.reader.lock() {
+ reader.waker = Some(cx.waker().clone());
+ }
+ poll_with_read(&mut self.inner, &mut buf)
}
}
/// A deflate decoder that reads from a `deflate::Decoder` into a `BytesMut` and emits the results
-/// as a `Chunk`.
+/// as a `Bytes`.
struct Deflate {
inner: Box<DeflateDecoder<Peeked<ReadableChunks<Body>>>>,
buf: BytesMut,
+ reader: Arc<Mutex<ReadableChunks<Body>>>,
}
impl Deflate {
fn new(stream: ReadableChunks<Body>) -> Self {
+ let stream = Arc::new(Mutex::new(stream));
+ let reader = stream.clone();
Self {
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
inner: Box::new(DeflateDecoder::new(Peeked::new(stream))),
+ reader: reader,
}
}
}
impl Stream for Deflate {
- type Item = Chunk;
- type Error = Error;
+ type Item = Result<Bytes, Error>;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- poll_with_read(&mut self.inner, &mut self.buf)
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut buf = self.buf.clone();
+ if let Ok(mut reader) = self.reader.lock() {
+ reader.waker = Some(cx.waker().clone());
+ }
+ poll_with_read(&mut self.inner, &mut buf)
}
}
@@ -311,17 +338,21 @@ impl Stream for Deflate {
pub struct ReadableChunks<S> {
state: ReadState,
stream: S,
+ waker: Option<Waker>,
}
enum ReadState {
/// A chunk is ready to be read from.
- Ready(Chunk),
+ Ready(Bytes),
/// The next chunk isn't ready yet.
NotReady,
/// The stream has finished.
Eof,
+ /// Stream is in err
+ Error(hyper::Error),
}
+#[derive(Debug)]
enum StreamState {
/// More bytes can be read from the stream.
HasMore,
@@ -334,7 +365,7 @@ struct Peeked<R> {
state: PeekedState,
peeked_buf: [u8; 10],
pos: usize,
- inner: R,
+ inner: Arc<Mutex<R>>,
}
enum PeekedState {
@@ -346,7 +377,7 @@ enum PeekedState {
impl<R> Peeked<R> {
#[inline]
- fn new(inner: R) -> Self {
+ fn new(inner: Arc<Mutex<R>>) -> Self {
Peeked {
state: PeekedState::NotReady,
peeked_buf: [0; 10],
@@ -383,11 +414,13 @@ impl<R: Read> Read for Peeked<R> {
if self.pos == peeked_buf_len {
self.not_ready();
}
-
return Ok(len);
},
PeekedState::NotReady => {
- let read = self.inner.read(&mut self.peeked_buf[self.pos..]);
+ let mut buf = &mut self.peeked_buf[self.pos..];
+ let stream = self.inner.clone();
+ let mut reader = stream.lock().unwrap();
+ let read = reader.read(&mut buf);
match read {
Ok(0) => self.ready(),
@@ -411,6 +444,7 @@ impl<S> ReadableChunks<S> {
ReadableChunks {
state: ReadState::NotReady,
stream: stream,
+ waker: None,
}
}
}
@@ -423,9 +457,12 @@ impl<S> fmt::Debug for ReadableChunks<S> {
impl<S> Read for ReadableChunks<S>
where
- S: Stream<Item = Chunk, Error = hyper::error::Error>,
+ S: Stream<Item = Result<Bytes, hyper::Error>> + std::marker::Unpin,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ let waker = self.waker.as_ref().unwrap().clone();
+ let mut cx = Context::from_waker(&waker);
+
loop {
let ret;
match self.state {
@@ -440,15 +477,15 @@ where
return Ok(len);
}
},
- ReadState::NotReady => match self.poll_stream() {
- Ok(Async::Ready(StreamState::HasMore)) => continue,
- Ok(Async::Ready(StreamState::Eof)) => return Ok(0),
- Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
- Err(e) => {
- return Err(io::Error::new(io::ErrorKind::Other, e));
- },
+ ReadState::NotReady => match self.poll_stream(&mut cx) {
+ Poll::Ready(StreamState::HasMore) => continue,
+ Poll::Ready(StreamState::Eof) => return Ok(0),
+ Poll::Pending => return Err(io::ErrorKind::WouldBlock.into()),
},
ReadState::Eof => return Ok(0),
+ ReadState::Error(ref err) => {
+ return Err(io::Error::new(io::ErrorKind::Other, err.to_string()))
+ },
}
self.state = ReadState::NotReady;
return Ok(ret);
@@ -458,26 +495,29 @@ where
impl<S> ReadableChunks<S>
where
- S: Stream<Item = Chunk, Error = hyper::error::Error>,
+ S: Stream<Item = Result<Bytes, hyper::Error>> + std::marker::Unpin,
{
/// Poll the readiness of the inner reader.
///
/// This function will update the internal state and return a simplified
/// version of the `ReadState`.
- fn poll_stream(&mut self) -> Poll<StreamState, hyper::error::Error> {
- match self.stream.poll() {
- Ok(Async::Ready(Some(chunk))) => {
+ fn poll_stream(&mut self, cx: &mut Context<'_>) -> Poll<StreamState> {
+ match Pin::new(&mut self.stream).poll_next(cx) {
+ Poll::Ready(Some(Ok(chunk))) => {
self.state = ReadState::Ready(chunk);
- Ok(Async::Ready(StreamState::HasMore))
+ Poll::Ready(StreamState::HasMore)
},
- Ok(Async::Ready(None)) => {
- self.state = ReadState::Eof;
+ Poll::Ready(Some(Err(err))) => {
+ self.state = ReadState::Error(err);
- Ok(Async::Ready(StreamState::Eof))
+ Poll::Ready(StreamState::Eof)
+ },
+ Poll::Ready(None) => {
+ self.state = ReadState::Eof;
+ Poll::Ready(StreamState::Eof)
},
- Ok(Async::NotReady) => Ok(Async::NotReady),
- Err(e) => Err(e),
+ Poll::Pending => Poll::Pending,
}
}
}