wtransport_proto/
bytes.rs

1use crate::varint::VarInt;
2use octets::Octets;
3use octets::OctetsMut;
4use std::fmt::Debug;
5use std::ops::Deref;
6use std::ops::DerefMut;
7
8/// An error indicating write operation was not able to complete because
9/// end of buffer has been reached.
10#[derive(Debug, thiserror::Error)]
11#[error("end of buffer has been reached")]
12pub struct EndOfBuffer;
13
14/// Reads bytes or varint from a source.
15pub trait BytesReader<'a> {
16    /// Reads an unsigned variable-length integer in network byte-order from
17    /// the current offset and advances the offset.
18    ///
19    /// Returns [`None`] if not enough capacity (offset is not advanced in that case).
20    fn get_varint(&mut self) -> Option<VarInt>;
21
22    /// Reads `len` bytes from the current offset **without copying** and advances
23    /// the offset.
24    ///
25    /// Returns [`None`] if not enough capacity (offset is not advanced in that case).
26    fn get_bytes(&mut self, len: usize) -> Option<&'a [u8]>;
27}
28
29impl<'a> BytesReader<'a> for &'a [u8] {
30    fn get_varint(&mut self) -> Option<VarInt> {
31        let varint_size = VarInt::parse_size(*self.first()?);
32        let buffer = self.get(..varint_size)?;
33        let varint = BufferReader::new(buffer)
34            .get_varint()
35            .expect("Varint parsable");
36        *self = &self[varint_size..];
37        Some(varint)
38    }
39
40    fn get_bytes(&mut self, len: usize) -> Option<&'a [u8]> {
41        let buffer = self.get(..len)?;
42        *self = &self[len..];
43        Some(buffer)
44    }
45}
46
47/// Writes bytes or varint on a source.
48pub trait BytesWriter {
49    /// Writes an unsigned variable-length integer in network byte-order at the
50    /// current offset and advances the offset.
51    ///
52    /// Returns [`Err`] if source is exhausted and no space is available.
53    fn put_varint(&mut self, varint: VarInt) -> Result<(), EndOfBuffer>;
54
55    /// Writes (by **copy**) all `bytes` at the current offset and advances it.
56    ///
57    /// Returns [`Err`] if source is exhausted and no space is available.
58    fn put_bytes(&mut self, bytes: &[u8]) -> Result<(), EndOfBuffer>;
59}
60
61impl BytesWriter for Vec<u8> {
62    fn put_varint(&mut self, varint: VarInt) -> Result<(), EndOfBuffer> {
63        let offset = self.len();
64
65        self.resize(offset + varint.size(), 0);
66
67        BufferWriter::new(&mut self[offset..])
68            .put_varint(varint)
69            .expect("Enough capacity pre-allocated");
70
71        Ok(())
72    }
73
74    fn put_bytes(&mut self, bytes: &[u8]) -> Result<(), EndOfBuffer> {
75        self.extend_from_slice(bytes);
76        Ok(())
77    }
78}
79
80/// A zero-copy immutable byte-buffer reader.
81///
82/// Internally, it stores an offset that is increased during reading.
83pub struct BufferReader<'a>(Octets<'a>);
84
85impl<'a> BufferReader<'a> {
86    /// Creates a [`BufferReader`] from the given slice, without copying.
87    ///
88    /// Inner offset is initialized to zero.
89    #[inline(always)]
90    pub fn new(buffer: &'a [u8]) -> Self {
91        Self(Octets::with_slice(buffer))
92    }
93
94    /// Returns the remaining capacity in the buffer.
95    #[inline(always)]
96    pub fn capacity(&self) -> usize {
97        self.0.cap()
98    }
99
100    /// Returns the current offset of the buffer.
101    #[inline(always)]
102    pub fn offset(&self) -> usize {
103        self.0.off()
104    }
105
106    /// Advances the offset.
107    ///
108    /// In case of [`Err`] the offset is not advanced.
109    #[inline(always)]
110    pub fn skip(&mut self, len: usize) -> Result<(), EndOfBuffer> {
111        self.0
112            .skip(len)
113            .map_err(|octets::BufferTooShortError| EndOfBuffer)
114    }
115
116    /// Returns a reference to the internal buffer.
117    ///
118    /// **Note**: this is the entire buffer (despite offset).
119    #[inline(always)]
120    pub fn buffer(&self) -> &'a [u8] {
121        self.0.buf()
122    }
123
124    /// Returns the inner buffer starting from the current offset.
125    #[inline(always)]
126    pub fn buffer_remaining(&mut self) -> &'a [u8] {
127        &self.buffer()[self.offset()..]
128    }
129
130    /// Creates a [`BufferReaderChild`] with this parent.
131    #[inline(always)]
132    pub fn child(&mut self) -> BufferReaderChild<'a, '_> {
133        BufferReaderChild::with_parent(self)
134    }
135}
136
137impl<'a> BytesReader<'a> for BufferReader<'a> {
138    #[inline(always)]
139    fn get_varint(&mut self) -> Option<VarInt> {
140        match self.0.get_varint() {
141            Ok(value) => {
142                // SAFETY: octets returns a legit varint
143                Some(unsafe {
144                    debug_assert!(value <= VarInt::MAX.into_inner());
145                    VarInt::from_u64_unchecked(value)
146                })
147            }
148            Err(octets::BufferTooShortError) => None,
149        }
150    }
151
152    #[inline(always)]
153    fn get_bytes(&mut self, len: usize) -> Option<&'a [u8]> {
154        self.0.get_bytes(len).ok().map(|o| o.buf())
155    }
156}
157
158impl Debug for BufferReader<'_> {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        f.debug_struct("BufferReader")
161            .field("buf", &self.0.buf())
162            .field("off", &self.offset())
163            .finish()
164    }
165}
166
167/// It acts like a copy of a parent [`BufferReader`].
168///
169/// You can create this from [`BufferReader::child`]. The child offset will be set
170/// to `0`, but its underlying buffer will start from the current parent's offset.
171///
172/// Having a copy it allows reading the buffer preserving the parent's original offset.
173///
174/// If you want to commit the number of bytes read to the parent, use [`BufferReaderChild::commit`].
175/// Instead, dropping this without committing, it will not alter the parent.
176pub struct BufferReaderChild<'a, 'b> {
177    reader: BufferReader<'a>,
178    parent: &'b mut BufferReader<'a>,
179}
180
181impl<'a, 'b> BufferReaderChild<'a, 'b> {
182    /// Advances the parent [`BufferReader`] offset of the amount read with this child.
183    #[inline(always)]
184    pub fn commit(self) {
185        self.parent
186            .skip(self.reader.offset())
187            .expect("Child offset is bounded to parent");
188    }
189
190    #[inline(always)]
191    fn with_parent(parent: &'b mut BufferReader<'a>) -> Self {
192        Self {
193            reader: BufferReader::new(parent.buffer_remaining()),
194            parent,
195        }
196    }
197}
198
199impl<'a> Deref for BufferReaderChild<'a, '_> {
200    type Target = BufferReader<'a>;
201
202    #[inline(always)]
203    fn deref(&self) -> &Self::Target {
204        &self.reader
205    }
206}
207
208impl DerefMut for BufferReaderChild<'_, '_> {
209    #[inline(always)]
210    fn deref_mut(&mut self) -> &mut Self::Target {
211        &mut self.reader
212    }
213}
214
215/// A zero-copy mutable buffer writer.
216pub struct BufferWriter<'a>(OctetsMut<'a>);
217
218impl<'a> BufferWriter<'a> {
219    /// Creates an [`BufferWriter`] by using `bytes` as inner buffer.
220    ///
221    /// Inner offset is initialized to zero.
222    #[inline(always)]
223    pub fn new(bytes: &'a mut [u8]) -> Self {
224        Self(OctetsMut::with_slice(bytes))
225    }
226
227    /// Returns the remaining capacity in the buffer.
228    #[inline(always)]
229    pub fn capacity(&self) -> usize {
230        self.0.cap()
231    }
232
233    /// Returns the current offset of the buffer.
234    #[inline(always)]
235    pub fn offset(&self) -> usize {
236        self.0.off()
237    }
238
239    /// Returns the portion of the inner buffer written so far.
240    #[inline(always)]
241    pub fn buffer_written(&self) -> &[u8] {
242        &self.0.buf()[..self.offset()]
243    }
244}
245
246impl BytesWriter for BufferWriter<'_> {
247    #[inline(always)]
248    fn put_varint(&mut self, varint: VarInt) -> Result<(), EndOfBuffer> {
249        self.0
250            .put_varint(varint.into_inner())
251            .map_err(|octets::BufferTooShortError| EndOfBuffer)?;
252
253        Ok(())
254    }
255
256    #[inline(always)]
257    fn put_bytes(&mut self, bytes: &[u8]) -> Result<(), EndOfBuffer> {
258        self.0
259            .put_bytes(bytes)
260            .map_err(|octets::BufferTooShortError| EndOfBuffer)
261    }
262}
263
264/// Async operations.
265#[cfg(feature = "async")]
266#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
267pub mod r#async {
268    use super::*;
269    use std::future::Future;
270    use std::io::ErrorKind as IoErrorKind;
271    use std::pin::Pin;
272    use std::task::ready;
273    use std::task::Context;
274    use std::task::Poll;
275
276    /// Error during read operations.
277    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
278    #[derive(Debug, thiserror::Error)]
279    pub enum IoReadError {
280        /// Read failed because immediate EOF (attempt reading the first byte).
281        ///
282        /// In this case, *zero* bytes have been read during the operation.
283        #[error("read operation failed because EOF reached on first byte")]
284        ImmediateFin,
285
286        /// Read failed because EOF reached in the middle of operation.
287        ///
288        /// In this case, *at least* one byte has been read during the operation.
289        #[error("read operation failed because EOF reached after first byte")]
290        UnexpectedFin,
291
292        /// Read failed because peer interrupted operation (at any point).
293        ///
294        /// In this case, zero or more bytes might be have read during the operation.
295        #[error("read operation failed because interrupted")]
296        Reset,
297
298        /// Read failed because peer is not connected, or disconnected (at any point).
299        ///
300        /// In this case, zero or more bytes might be have read during the operation.
301        #[error("read operation failed because not connected")]
302        NotConnected,
303    }
304
305    /// Error during write operation.
306    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
307    #[derive(Debug, thiserror::Error)]
308    pub enum IoWriteError {
309        /// Write failed because peer stopped operation.
310        ///
311        /// In this case, zero or more bytes might be have written during the operation.
312        #[error("write operation failed because operation has been stopped")]
313        Stopped,
314
315        /// Write failed because peer not is not connected.
316        ///
317        /// In this case, zero or more bytes might be have written during the operation.
318        #[error("write operation failed because not connected")]
319        NotConnected,
320    }
321
322    impl From<std::io::Error> for IoReadError {
323        fn from(error: std::io::Error) -> Self {
324            match error.kind() {
325                IoErrorKind::ConnectionReset => IoReadError::Reset,
326                _ => IoReadError::NotConnected,
327            }
328        }
329    }
330
331    impl From<std::io::Error> for IoWriteError {
332        fn from(error: std::io::Error) -> Self {
333            match error.kind() {
334                IoErrorKind::ConnectionReset => IoWriteError::Stopped,
335                _ => IoWriteError::NotConnected,
336            }
337        }
338    }
339
340    /// Reads bytes from a source.
341    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
342    pub trait AsyncRead {
343        /// Attempt to read from the source into `buf`.
344        ///
345        /// Generally, an implementation will perform a **copy**.
346        ///
347        /// On success, it returns `Ok(num_bytes_read)`, that is the
348        /// length of bytes written into `buf`.
349        ///
350        /// It returns `0` if and only if:
351        ///   * `buf` is empty; or
352        ///   * The source reached its end (the stream is exhausted / EOF).
353        ///
354        /// An implementation SHOULD only generates the following errors:
355        ///   * [`std::io::ErrorKind::ConnectionReset`] if the read operation was explicitly truncated
356        ///      by the source.
357        ///   * [`std::io::ErrorKind::NotConnected`] if the read operation aborted at any point because
358        ///      lack of communication with the source.
359        fn poll_read(
360            self: Pin<&mut Self>,
361            cx: &mut Context<'_>,
362            buf: &mut [u8],
363        ) -> Poll<std::io::Result<usize>>;
364    }
365
366    impl AsyncRead for &[u8] {
367        fn poll_read(
368            mut self: Pin<&mut Self>,
369            _cx: &mut Context<'_>,
370            buf: &mut [u8],
371        ) -> Poll<std::io::Result<usize>> {
372            let amt = std::cmp::min(self.len(), buf.len());
373            let (a, b) = self.split_at(amt);
374            buf[..amt].copy_from_slice(a);
375            *self = b;
376            Poll::Ready(Ok(amt))
377        }
378    }
379
380    /// Writes bytes into a destination.
381    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
382    pub trait AsyncWrite {
383        /// Attempt to write `buf` into the destination.
384        ///
385        /// Generally, an implementation will perform a **copy**.
386        ///
387        /// On success, it returns `Ok(num_bytes_written)`, that is the number
388        /// of bytes written.
389        /// Note that, it is possible that not the entire `buf` will be written (for instance,
390        /// because of a mechanism of flow controller or limited capacity).
391        ///
392        /// An implementation SHOULD never return `Ok(0)` if `buf` is not empty.
393        ///
394        /// An implementation SHOULD only generates the following errors:
395        ///   * [`std::io::ErrorKind::ConnectionReset`] if the write operation was explicitly stopped
396        ///      by the destination.
397        ///   * [`std::io::ErrorKind::NotConnected`] if the write operation aborted at any point because
398        ///      lack of communication with the destination.
399        fn poll_write(
400            self: Pin<&mut Self>,
401            cx: &mut Context<'_>,
402            buf: &[u8],
403        ) -> Poll<std::io::Result<usize>>;
404    }
405
406    impl AsyncWrite for Vec<u8> {
407        fn poll_write(
408            mut self: Pin<&mut Self>,
409            _cx: &mut Context<'_>,
410            buf: &[u8],
411        ) -> Poll<std::io::Result<usize>> {
412            self.extend_from_slice(buf);
413            Poll::Ready(std::io::Result::Ok(buf.len()))
414        }
415    }
416
417    /// Reads bytes or varints asynchronously.
418    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
419    pub trait BytesReaderAsync {
420        /// Reads an unsigned variable-length integer in network byte-order from a source.
421        fn get_varint(&mut self) -> GetVarint<Self>;
422
423        /// Reads the source until `buffer` is completely filled.
424        fn get_buffer<'a>(&'a mut self, buffer: &'a mut [u8]) -> GetBuffer<'a, Self>;
425    }
426
427    impl<T> BytesReaderAsync for T
428    where
429        T: AsyncRead + ?Sized,
430    {
431        fn get_varint(&mut self) -> GetVarint<Self> {
432            GetVarint::new(self)
433        }
434
435        fn get_buffer<'a>(&'a mut self, buffer: &'a mut [u8]) -> GetBuffer<'a, Self> {
436            GetBuffer::new(self, buffer)
437        }
438    }
439
440    impl<T> BytesWriterAsync for T
441    where
442        T: AsyncWrite + ?Sized,
443    {
444        fn put_varint(&mut self, varint: VarInt) -> PutVarint<Self> {
445            PutVarint::new(self, varint)
446        }
447
448        fn put_buffer<'a>(&'a mut self, buffer: &'a [u8]) -> PutBuffer<'a, Self> {
449            PutBuffer::new(self, buffer)
450        }
451    }
452
453    /// Writes bytes or varints asynchronously.
454    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
455    pub trait BytesWriterAsync {
456        /// Writes an unsigned variable-length integer in network byte-order to
457        /// the source advancing the buffer's internal cursor.
458        fn put_varint(&mut self, varint: VarInt) -> PutVarint<Self>;
459
460        /// Pushes some bytes into the source advancing the buffer’s internal cursor.
461        fn put_buffer<'a>(&'a mut self, buffer: &'a [u8]) -> PutBuffer<'a, Self>;
462    }
463
464    /// [`Future`] for reading a varint.
465    ///
466    /// Created by [`BytesReaderAsync::get_varint`].
467    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
468    pub struct GetVarint<'a, R: ?Sized> {
469        reader: &'a mut R,
470        buffer: [u8; VarInt::MAX_SIZE],
471        offset: usize,
472        varint_size: usize,
473    }
474
475    impl<'a, R> GetVarint<'a, R>
476    where
477        R: AsyncRead + ?Sized,
478    {
479        fn new(reader: &'a mut R) -> Self {
480            Self {
481                reader,
482                buffer: [0; VarInt::MAX_SIZE],
483                offset: 0,
484                varint_size: 0,
485            }
486        }
487    }
488
489    impl<R> Future for GetVarint<'_, R>
490    where
491        R: AsyncRead + Unpin + ?Sized,
492    {
493        type Output = Result<VarInt, IoReadError>;
494
495        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
496            let this = self.get_mut();
497
498            if this.offset == 0 {
499                debug_assert_eq!(this.varint_size, 0);
500
501                let read = ready!(AsyncRead::poll_read(
502                    Pin::new(this.reader),
503                    cx,
504                    &mut this.buffer[0..1]
505                ))?;
506
507                debug_assert!(read == 0 || read == 1);
508
509                if read == 1 {
510                    this.offset = 1;
511                    this.varint_size = VarInt::parse_size(this.buffer[0]);
512                    debug_assert_ne!(this.varint_size, 0);
513                } else {
514                    return Poll::Ready(Err(IoReadError::ImmediateFin));
515                }
516            }
517
518            while this.offset < this.varint_size {
519                let read = ready!(AsyncRead::poll_read(
520                    Pin::new(this.reader),
521                    cx,
522                    &mut this.buffer[this.offset..this.varint_size]
523                ))?;
524
525                debug_assert!(read <= this.varint_size - this.offset);
526
527                if read > 0 {
528                    this.offset += read;
529                } else {
530                    return Poll::Ready(Err(IoReadError::UnexpectedFin));
531                }
532            }
533
534            let varint = BufferReader::new(&this.buffer[..this.varint_size])
535                .get_varint()
536                .expect("Varint is parsable");
537
538            Poll::Ready(Ok(varint))
539        }
540    }
541
542    /// [`Future`] for reading a buffer of bytes.
543    ///
544    /// Created by [`BytesReaderAsync::get_buffer`].
545    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
546    pub struct GetBuffer<'a, R: ?Sized> {
547        reader: &'a mut R,
548        buffer: &'a mut [u8],
549        offset: usize,
550    }
551
552    impl<'a, R> GetBuffer<'a, R>
553    where
554        R: AsyncRead + ?Sized,
555    {
556        fn new(reader: &'a mut R, buffer: &'a mut [u8]) -> Self {
557            Self {
558                reader,
559                buffer,
560                offset: 0,
561            }
562        }
563    }
564
565    impl<R> Future for GetBuffer<'_, R>
566    where
567        R: AsyncRead + Unpin + ?Sized,
568    {
569        type Output = Result<(), IoReadError>;
570
571        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
572            let this = self.get_mut();
573
574            while this.offset < this.buffer.len() {
575                let read = ready!(AsyncRead::poll_read(
576                    Pin::new(this.reader),
577                    cx,
578                    &mut this.buffer[this.offset..],
579                ))?;
580
581                debug_assert!(read <= this.buffer.len() - this.offset);
582
583                if read > 0 {
584                    this.offset += read;
585                } else if this.offset > 0 {
586                    return Poll::Ready(Err(IoReadError::UnexpectedFin));
587                } else {
588                    return Poll::Ready(Err(IoReadError::ImmediateFin));
589                }
590            }
591
592            Poll::Ready(Ok(()))
593        }
594    }
595
596    /// [`Future`] for writing a varint.
597    ///
598    /// Created by [`BytesWriterAsync::put_varint`].
599    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
600    pub struct PutVarint<'a, W: ?Sized> {
601        writer: &'a mut W,
602        buffer: [u8; VarInt::MAX_SIZE],
603        offset: usize,
604        varint_size: usize,
605    }
606
607    impl<'a, W> PutVarint<'a, W>
608    where
609        W: AsyncWrite + ?Sized,
610    {
611        fn new(writer: &'a mut W, varint: VarInt) -> Self {
612            let mut this = Self {
613                writer,
614                buffer: [0; VarInt::MAX_SIZE],
615                offset: 0,
616                varint_size: 0,
617            };
618
619            let mut buffer_writer = BufferWriter::new(&mut this.buffer);
620            buffer_writer
621                .put_varint(varint)
622                .expect("Inner buffer is enough for max varint");
623
624            this.varint_size = buffer_writer.offset();
625
626            this
627        }
628    }
629
630    impl<W> Future for PutVarint<'_, W>
631    where
632        W: AsyncWrite + Unpin + ?Sized,
633    {
634        type Output = Result<(), IoWriteError>;
635
636        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
637            let this = self.get_mut();
638
639            while this.offset < this.varint_size {
640                let written = ready!(AsyncWrite::poll_write(
641                    Pin::new(this.writer),
642                    cx,
643                    &this.buffer[this.offset..this.varint_size]
644                ))?;
645
646                debug_assert!(written > 0);
647
648                this.offset += written;
649            }
650
651            Poll::Ready(Ok(()))
652        }
653    }
654
655    /// [`Future`] for writing a buffer of bytes.
656    ///
657    /// Created by [`BytesWriterAsync::put_buffer`].
658    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
659    pub struct PutBuffer<'a, W: ?Sized> {
660        writer: &'a mut W,
661        buffer: &'a [u8],
662        offset: usize,
663    }
664
665    impl<'a, W> PutBuffer<'a, W>
666    where
667        W: AsyncWrite + ?Sized,
668    {
669        fn new(writer: &'a mut W, buffer: &'a [u8]) -> Self {
670            Self {
671                writer,
672                buffer,
673                offset: 0,
674            }
675        }
676    }
677
678    impl<W> Future for PutBuffer<'_, W>
679    where
680        W: AsyncWrite + Unpin + ?Sized,
681    {
682        type Output = Result<(), IoWriteError>;
683
684        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
685            let this = self.get_mut();
686
687            while this.offset < this.buffer.len() {
688                let written = ready!(AsyncWrite::poll_write(
689                    Pin::new(this.writer),
690                    cx,
691                    &this.buffer[this.offset..]
692                ))?;
693
694                debug_assert!(written > 0);
695
696                this.offset += written;
697            }
698
699            Poll::Ready(Ok(()))
700        }
701    }
702}
703
704#[cfg(feature = "async")]
705pub use r#async::AsyncRead;
706
707#[cfg(feature = "async")]
708pub use r#async::AsyncWrite;
709
710#[cfg(feature = "async")]
711pub use r#async::BytesReaderAsync;
712
713#[cfg(feature = "async")]
714pub use r#async::BytesWriterAsync;
715
716#[cfg(feature = "async")]
717pub use r#async::IoReadError;
718
719#[cfg(feature = "async")]
720pub use r#async::IoWriteError;
721
722#[cfg(test)]
723mod tests {
724    use super::*;
725
726    #[test]
727    fn read_varint() {
728        for (varint_buffer, value_expect) in utils::VARINT_TEST_CASES {
729            let mut buffer_reader = BufferReader::new(varint_buffer);
730
731            assert_eq!(buffer_reader.offset(), 0);
732            assert_eq!(buffer_reader.capacity(), varint_buffer.len());
733
734            let value = buffer_reader.get_varint().unwrap();
735
736            assert_eq!(value, value_expect);
737            assert_eq!(buffer_reader.offset(), varint_buffer.len());
738            assert_eq!(buffer_reader.capacity(), 0);
739        }
740    }
741
742    #[tokio::test]
743    async fn read_varint_async() {
744        for (varint_buffer, value_expect) in utils::VARINT_TEST_CASES {
745            let mut reader = utils::StepReader::new(varint_buffer);
746            let value = reader.get_varint().await.unwrap();
747            assert_eq!(value, value_expect);
748        }
749    }
750
751    #[test]
752    fn read_buffer() {
753        let mut buffer_reader = BufferReader::new(utils::BUFFER_TEST);
754        let value = buffer_reader.get_bytes(utils::BUFFER_TEST.len()).unwrap();
755        assert_eq!(value, utils::BUFFER_TEST);
756    }
757
758    #[tokio::test]
759    async fn read_buffer_async() {
760        let mut value = [0; utils::BUFFER_TEST.len()];
761        let mut reader = utils::StepReader::new(utils::BUFFER_TEST);
762        reader.get_buffer(&mut value).await.unwrap();
763        assert_eq!(value, utils::BUFFER_TEST);
764    }
765
766    #[test]
767    fn write_varint() {
768        let mut buffer = [0; VarInt::MAX_SIZE];
769        for (varint_buffer, value) in utils::VARINT_TEST_CASES {
770            let mut buffer_writer = BufferWriter::new(&mut buffer);
771
772            assert_eq!(buffer_writer.offset(), 0);
773            assert_eq!(buffer_writer.capacity(), VarInt::MAX_SIZE);
774
775            buffer_writer.put_varint(value).unwrap();
776
777            assert_eq!(buffer_writer.offset(), varint_buffer.len());
778            assert_eq!(buffer_writer.buffer_written(), varint_buffer);
779        }
780    }
781
782    #[tokio::test]
783    async fn write_varint_async() {
784        for (varint_buffer, value) in utils::VARINT_TEST_CASES {
785            let mut writer = utils::StepWriter::new(Some(8));
786
787            writer.put_varint(value).await.unwrap();
788            assert_eq!(value.size(), writer.written().len());
789            assert_eq!(writer.written(), varint_buffer);
790        }
791    }
792
793    #[test]
794    fn child_commit() {
795        let mut buffer_reader = BufferReader::new(&[0x1, 0x2]);
796
797        buffer_reader.skip(1).unwrap();
798        assert_eq!(buffer_reader.offset(), 1);
799        assert_eq!(buffer_reader.capacity(), 1);
800
801        let mut buffer_reader_child = buffer_reader.child();
802        assert_eq!(buffer_reader_child.offset(), 0);
803        assert_eq!(buffer_reader_child.capacity(), 1);
804
805        assert!(matches!(buffer_reader_child.get_bytes(1), Some([0x2])));
806        assert_eq!(buffer_reader_child.offset(), 1);
807
808        buffer_reader_child.commit();
809
810        assert_eq!(buffer_reader.offset(), 2);
811        assert_eq!(buffer_reader.capacity(), 0);
812    }
813
814    #[test]
815    fn child_drop() {
816        let mut buffer_reader = BufferReader::new(&[0x1, 0x2]);
817
818        buffer_reader.skip(1).unwrap();
819        assert_eq!(buffer_reader.offset(), 1);
820        assert_eq!(buffer_reader.capacity(), 1);
821
822        let mut buffer_reader_child = buffer_reader.child();
823        assert_eq!(buffer_reader_child.offset(), 0);
824        assert_eq!(buffer_reader_child.capacity(), 1);
825
826        assert!(matches!(buffer_reader_child.get_bytes(1), Some([0x2])));
827        assert_eq!(buffer_reader_child.offset(), 1);
828
829        assert_eq!(buffer_reader.offset(), 1);
830        assert_eq!(buffer_reader.capacity(), 1);
831    }
832
833    #[test]
834    fn none() {
835        let mut buffer_reader = BufferReader::new(&[]);
836        assert!(buffer_reader.get_varint().is_none());
837        assert!(buffer_reader.get_bytes(1).is_none());
838
839        let mut buffer_writer = BufferWriter::new(&mut []);
840        assert!(buffer_writer.put_varint(VarInt::from_u32(0)).is_err());
841        assert!(buffer_writer.put_bytes(&[0x0]).is_err());
842    }
843
844    #[tokio::test]
845    async fn none_async() {
846        let mut reader = utils::StepReader::new(vec![]);
847        assert!(reader.get_varint().await.is_err());
848        assert!(reader.get_buffer(&mut [0x0]).await.is_err());
849
850        let mut writer = utils::StepWriter::new(Some(0));
851        assert!(writer.put_varint(VarInt::from_u32(0)).await.is_err());
852        assert!(writer.put_buffer(&[0x0]).await.is_err());
853    }
854
855    #[tokio::test]
856    async fn fin_varint() {
857        for (buffer, _) in utils::VARINT_TEST_CASES {
858            for len in 0..buffer.len() {
859                let result = BytesReaderAsync::get_varint(&mut &buffer[..len]).await;
860
861                match len {
862                    0 => assert!(matches!(result, Err(IoReadError::ImmediateFin))),
863                    _ => assert!(matches!(result, Err(IoReadError::UnexpectedFin))),
864                }
865            }
866        }
867    }
868
869    #[tokio::test]
870    async fn fin_buffer() {
871        let mut buffer = [0; utils::BUFFER_TEST.len()];
872
873        for len in 0..utils::BUFFER_TEST.len() {
874            let result = (&mut &utils::BUFFER_TEST[..len])
875                .get_buffer(&mut buffer)
876                .await;
877
878            match len {
879                0 => assert!(matches!(result, Err(IoReadError::ImmediateFin))),
880                _ => assert!(matches!(result, Err(IoReadError::UnexpectedFin))),
881            }
882        }
883    }
884
885    mod utils {
886        use super::*;
887
888        pub const VARINT_TEST_CASES: [(&[u8], VarInt); 4] = [
889            (&[0xc2, 0x19, 0x7c, 0x5e, 0xff, 0x14, 0xe8, 0x8c], unsafe {
890                VarInt::from_u64_unchecked(151_288_809_941_952_652)
891            }),
892            (&[0x9d, 0x7f, 0x3e, 0x7d], VarInt::from_u32(494_878_333)),
893            (&[0x7b, 0xbd], VarInt::from_u32(15_293)),
894            (&[0x25], VarInt::from_u32(37)),
895        ];
896
897        pub const BUFFER_TEST: &[u8] = b"WebTransport";
898
899        #[cfg(feature = "async")]
900        #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
901        pub mod r#async {
902            use super::*;
903            use std::pin::Pin;
904            use std::task::Context;
905            use std::task::Poll;
906
907            pub struct StepReader {
908                data: Box<[u8]>,
909                offset: usize,
910                to_pending: bool,
911            }
912
913            impl StepReader {
914                pub fn new<T>(data: T) -> Self
915                where
916                    T: Into<Box<[u8]>>,
917                {
918                    Self {
919                        data: data.into(),
920                        offset: 0,
921                        to_pending: true,
922                    }
923                }
924            }
925
926            impl AsyncRead for StepReader {
927                fn poll_read(
928                    mut self: Pin<&mut Self>,
929                    cx: &mut Context<'_>,
930                    buf: &mut [u8],
931                ) -> Poll<std::io::Result<usize>> {
932                    let new_pending = !self.to_pending;
933                    let to_pending = std::mem::replace(&mut self.to_pending, new_pending);
934
935                    if buf.is_empty() {
936                        return Poll::Ready(Ok(0));
937                    }
938
939                    if to_pending {
940                        cx.waker().wake_by_ref();
941                        Poll::Pending
942                    } else if let Some(&byte) = self.data.get(self.offset) {
943                        buf[0] = byte;
944                        self.offset += 1;
945                        Poll::Ready(Ok(1))
946                    } else {
947                        Poll::Ready(Ok(0))
948                    }
949                }
950            }
951
952            pub struct StepWriter {
953                buffer: Vec<u8>,
954                max_len: Option<usize>,
955                to_pending: bool,
956            }
957
958            impl StepWriter {
959                pub fn new(max_len: Option<usize>) -> Self {
960                    Self {
961                        buffer: Vec::new(),
962                        max_len,
963                        to_pending: true,
964                    }
965                }
966
967                pub fn written(&self) -> &[u8] {
968                    &self.buffer
969                }
970            }
971
972            impl AsyncWrite for StepWriter {
973                fn poll_write(
974                    mut self: Pin<&mut Self>,
975                    cx: &mut Context<'_>,
976                    buf: &[u8],
977                ) -> Poll<Result<usize, std::io::Error>> {
978                    let new_pending = !self.to_pending;
979                    let to_pending = std::mem::replace(&mut self.to_pending, new_pending);
980
981                    if buf.is_empty() {
982                        return Poll::Ready(Ok(0));
983                    }
984
985                    if to_pending {
986                        cx.waker().wake_by_ref();
987                        Poll::Pending
988                    } else if self.buffer.len() < self.max_len.unwrap_or(usize::MAX) {
989                        let byte = buf[0];
990                        self.buffer.push(byte);
991                        Poll::Ready(Ok(1))
992                    } else {
993                        Poll::Ready(Err(std::io::Error::new(
994                            std::io::ErrorKind::ConnectionReset,
995                            "Reached max len",
996                        )))
997                    }
998                }
999            }
1000        }
1001
1002        #[cfg(feature = "async")]
1003        pub use r#async::StepReader;
1004
1005        #[cfg(feature = "async")]
1006        pub use r#async::StepWriter;
1007    }
1008}