1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
use crate::codec::encoder::Encoder;
use crate::codec::framed_impl::{FramedImpl, WriteFrame};
use futures_core::Stream;
use tokio::io::AsyncWrite;
use bytes::BytesMut;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::fmt;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// A [`Sink`] of frames encoded to an `AsyncWrite`.
///
/// [`Sink`]: futures_sink::Sink
pub struct FramedWrite<T, E> {
#[pin]
inner: FramedImpl<T, E, WriteFrame>,
}
}
impl<T, E> FramedWrite<T, E>
where
T: AsyncWrite,
{
/// Creates a new `FramedWrite` with the given `encoder`.
pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
FramedWrite {
inner: FramedImpl {
inner,
codec: encoder,
state: WriteFrame::default(),
},
}
}
}
impl<T, E> FramedWrite<T, E> {
/// Returns a reference to the underlying I/O stream wrapped by
/// `FramedWrite`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
&self.inner.inner
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `FramedWrite`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner.inner
}
/// Returns a pinned mutable reference to the underlying I/O stream wrapped by
/// `FramedWrite`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().inner.project().inner
}
/// Consumes the `FramedWrite`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
self.inner.inner
}
/// Returns a reference to the underlying encoder.
pub fn encoder(&self) -> &E {
&self.inner.codec
}
/// Returns a mutable reference to the underlying encoder.
pub fn encoder_mut(&mut self) -> &mut E {
&mut self.inner.codec
}
/// Returns a reference to the write buffer.
pub fn write_buffer(&self) -> &BytesMut {
&self.inner.state.buffer
}
/// Returns a mutable reference to the write buffer.
pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
&mut self.inner.state.buffer
}
}
// This impl just defers to the underlying FramedImpl
impl<T, I, E> Sink<I> for FramedWrite<T, E>
where
T: AsyncWrite,
E: Encoder<I>,
E::Error: From<io::Error>,
{
type Error = E::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
self.project().inner.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx)
}
}
// This impl just defers to the underlying T: Stream
impl<T, D> Stream for FramedWrite<T, D>
where
T: Stream,
{
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.project().inner.poll_next(cx)
}
}
impl<T, U> fmt::Debug for FramedWrite<T, U>
where
T: fmt::Debug,
U: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FramedWrite")
.field("inner", &self.get_ref())
.field("encoder", &self.encoder())
.field("buffer", &self.inner.state.buffer)
.finish()
}
}