1use crate::varint::VarInt;
2use octets::Octets;
3use octets::OctetsMut;
4use std::fmt::Debug;
5use std::ops::Deref;
6use std::ops::DerefMut;
7
8#[derive(Debug, thiserror::Error)]
11#[error("end of buffer has been reached")]
12pub struct EndOfBuffer;
13
14pub trait BytesReader<'a> {
16 fn get_varint(&mut self) -> Option<VarInt>;
21
22 fn get_bytes(&mut self, len: usize) -> Option<&'a [u8]>;
27}
28
29impl<'a> BytesReader<'a> for &'a [u8] {
30 fn get_varint(&mut self) -> Option<VarInt> {
31 let varint_size = VarInt::parse_size(*self.first()?);
32 let buffer = self.get(..varint_size)?;
33 let varint = BufferReader::new(buffer)
34 .get_varint()
35 .expect("Varint parsable");
36 *self = &self[varint_size..];
37 Some(varint)
38 }
39
40 fn get_bytes(&mut self, len: usize) -> Option<&'a [u8]> {
41 let buffer = self.get(..len)?;
42 *self = &self[len..];
43 Some(buffer)
44 }
45}
46
47pub trait BytesWriter {
49 fn put_varint(&mut self, varint: VarInt) -> Result<(), EndOfBuffer>;
54
55 fn put_bytes(&mut self, bytes: &[u8]) -> Result<(), EndOfBuffer>;
59}
60
61impl BytesWriter for Vec<u8> {
62 fn put_varint(&mut self, varint: VarInt) -> Result<(), EndOfBuffer> {
63 let offset = self.len();
64
65 self.resize(offset + varint.size(), 0);
66
67 BufferWriter::new(&mut self[offset..])
68 .put_varint(varint)
69 .expect("Enough capacity pre-allocated");
70
71 Ok(())
72 }
73
74 fn put_bytes(&mut self, bytes: &[u8]) -> Result<(), EndOfBuffer> {
75 self.extend_from_slice(bytes);
76 Ok(())
77 }
78}
79
80pub struct BufferReader<'a>(Octets<'a>);
84
85impl<'a> BufferReader<'a> {
86 #[inline(always)]
90 pub fn new(buffer: &'a [u8]) -> Self {
91 Self(Octets::with_slice(buffer))
92 }
93
94 #[inline(always)]
96 pub fn capacity(&self) -> usize {
97 self.0.cap()
98 }
99
100 #[inline(always)]
102 pub fn offset(&self) -> usize {
103 self.0.off()
104 }
105
106 #[inline(always)]
110 pub fn skip(&mut self, len: usize) -> Result<(), EndOfBuffer> {
111 self.0
112 .skip(len)
113 .map_err(|octets::BufferTooShortError| EndOfBuffer)
114 }
115
116 #[inline(always)]
120 pub fn buffer(&self) -> &'a [u8] {
121 self.0.buf()
122 }
123
124 #[inline(always)]
126 pub fn buffer_remaining(&mut self) -> &'a [u8] {
127 &self.buffer()[self.offset()..]
128 }
129
130 #[inline(always)]
132 pub fn child(&mut self) -> BufferReaderChild<'a, '_> {
133 BufferReaderChild::with_parent(self)
134 }
135}
136
137impl<'a> BytesReader<'a> for BufferReader<'a> {
138 #[inline(always)]
139 fn get_varint(&mut self) -> Option<VarInt> {
140 match self.0.get_varint() {
141 Ok(value) => {
142 Some(unsafe {
144 debug_assert!(value <= VarInt::MAX.into_inner());
145 VarInt::from_u64_unchecked(value)
146 })
147 }
148 Err(octets::BufferTooShortError) => None,
149 }
150 }
151
152 #[inline(always)]
153 fn get_bytes(&mut self, len: usize) -> Option<&'a [u8]> {
154 self.0.get_bytes(len).ok().map(|o| o.buf())
155 }
156}
157
158impl Debug for BufferReader<'_> {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.debug_struct("BufferReader")
161 .field("buf", &self.0.buf())
162 .field("off", &self.offset())
163 .finish()
164 }
165}
166
167pub struct BufferReaderChild<'a, 'b> {
177 reader: BufferReader<'a>,
178 parent: &'b mut BufferReader<'a>,
179}
180
181impl<'a, 'b> BufferReaderChild<'a, 'b> {
182 #[inline(always)]
184 pub fn commit(self) {
185 self.parent
186 .skip(self.reader.offset())
187 .expect("Child offset is bounded to parent");
188 }
189
190 #[inline(always)]
191 fn with_parent(parent: &'b mut BufferReader<'a>) -> Self {
192 Self {
193 reader: BufferReader::new(parent.buffer_remaining()),
194 parent,
195 }
196 }
197}
198
199impl<'a> Deref for BufferReaderChild<'a, '_> {
200 type Target = BufferReader<'a>;
201
202 #[inline(always)]
203 fn deref(&self) -> &Self::Target {
204 &self.reader
205 }
206}
207
208impl DerefMut for BufferReaderChild<'_, '_> {
209 #[inline(always)]
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 &mut self.reader
212 }
213}
214
215pub struct BufferWriter<'a>(OctetsMut<'a>);
217
218impl<'a> BufferWriter<'a> {
219 #[inline(always)]
223 pub fn new(bytes: &'a mut [u8]) -> Self {
224 Self(OctetsMut::with_slice(bytes))
225 }
226
227 #[inline(always)]
229 pub fn capacity(&self) -> usize {
230 self.0.cap()
231 }
232
233 #[inline(always)]
235 pub fn offset(&self) -> usize {
236 self.0.off()
237 }
238
239 #[inline(always)]
241 pub fn buffer_written(&self) -> &[u8] {
242 &self.0.buf()[..self.offset()]
243 }
244}
245
246impl BytesWriter for BufferWriter<'_> {
247 #[inline(always)]
248 fn put_varint(&mut self, varint: VarInt) -> Result<(), EndOfBuffer> {
249 self.0
250 .put_varint(varint.into_inner())
251 .map_err(|octets::BufferTooShortError| EndOfBuffer)?;
252
253 Ok(())
254 }
255
256 #[inline(always)]
257 fn put_bytes(&mut self, bytes: &[u8]) -> Result<(), EndOfBuffer> {
258 self.0
259 .put_bytes(bytes)
260 .map_err(|octets::BufferTooShortError| EndOfBuffer)
261 }
262}
263
264#[cfg(feature = "async")]
266#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
267pub mod r#async {
268 use super::*;
269 use std::future::Future;
270 use std::io::ErrorKind as IoErrorKind;
271 use std::pin::Pin;
272 use std::task::ready;
273 use std::task::Context;
274 use std::task::Poll;
275
276 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
278 #[derive(Debug, thiserror::Error)]
279 pub enum IoReadError {
280 #[error("read operation failed because EOF reached on first byte")]
284 ImmediateFin,
285
286 #[error("read operation failed because EOF reached after first byte")]
290 UnexpectedFin,
291
292 #[error("read operation failed because interrupted")]
296 Reset,
297
298 #[error("read operation failed because not connected")]
302 NotConnected,
303 }
304
305 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
307 #[derive(Debug, thiserror::Error)]
308 pub enum IoWriteError {
309 #[error("write operation failed because operation has been stopped")]
313 Stopped,
314
315 #[error("write operation failed because not connected")]
319 NotConnected,
320 }
321
322 impl From<std::io::Error> for IoReadError {
323 fn from(error: std::io::Error) -> Self {
324 match error.kind() {
325 IoErrorKind::ConnectionReset => IoReadError::Reset,
326 _ => IoReadError::NotConnected,
327 }
328 }
329 }
330
331 impl From<std::io::Error> for IoWriteError {
332 fn from(error: std::io::Error) -> Self {
333 match error.kind() {
334 IoErrorKind::ConnectionReset => IoWriteError::Stopped,
335 _ => IoWriteError::NotConnected,
336 }
337 }
338 }
339
340 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
342 pub trait AsyncRead {
343 fn poll_read(
360 self: Pin<&mut Self>,
361 cx: &mut Context<'_>,
362 buf: &mut [u8],
363 ) -> Poll<std::io::Result<usize>>;
364 }
365
366 impl AsyncRead for &[u8] {
367 fn poll_read(
368 mut self: Pin<&mut Self>,
369 _cx: &mut Context<'_>,
370 buf: &mut [u8],
371 ) -> Poll<std::io::Result<usize>> {
372 let amt = std::cmp::min(self.len(), buf.len());
373 let (a, b) = self.split_at(amt);
374 buf[..amt].copy_from_slice(a);
375 *self = b;
376 Poll::Ready(Ok(amt))
377 }
378 }
379
380 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
382 pub trait AsyncWrite {
383 fn poll_write(
400 self: Pin<&mut Self>,
401 cx: &mut Context<'_>,
402 buf: &[u8],
403 ) -> Poll<std::io::Result<usize>>;
404 }
405
406 impl AsyncWrite for Vec<u8> {
407 fn poll_write(
408 mut self: Pin<&mut Self>,
409 _cx: &mut Context<'_>,
410 buf: &[u8],
411 ) -> Poll<std::io::Result<usize>> {
412 self.extend_from_slice(buf);
413 Poll::Ready(std::io::Result::Ok(buf.len()))
414 }
415 }
416
417 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
419 pub trait BytesReaderAsync {
420 fn get_varint(&mut self) -> GetVarint<Self>;
422
423 fn get_buffer<'a>(&'a mut self, buffer: &'a mut [u8]) -> GetBuffer<'a, Self>;
425 }
426
427 impl<T> BytesReaderAsync for T
428 where
429 T: AsyncRead + ?Sized,
430 {
431 fn get_varint(&mut self) -> GetVarint<Self> {
432 GetVarint::new(self)
433 }
434
435 fn get_buffer<'a>(&'a mut self, buffer: &'a mut [u8]) -> GetBuffer<'a, Self> {
436 GetBuffer::new(self, buffer)
437 }
438 }
439
440 impl<T> BytesWriterAsync for T
441 where
442 T: AsyncWrite + ?Sized,
443 {
444 fn put_varint(&mut self, varint: VarInt) -> PutVarint<Self> {
445 PutVarint::new(self, varint)
446 }
447
448 fn put_buffer<'a>(&'a mut self, buffer: &'a [u8]) -> PutBuffer<'a, Self> {
449 PutBuffer::new(self, buffer)
450 }
451 }
452
453 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
455 pub trait BytesWriterAsync {
456 fn put_varint(&mut self, varint: VarInt) -> PutVarint<Self>;
459
460 fn put_buffer<'a>(&'a mut self, buffer: &'a [u8]) -> PutBuffer<'a, Self>;
462 }
463
464 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
468 pub struct GetVarint<'a, R: ?Sized> {
469 reader: &'a mut R,
470 buffer: [u8; VarInt::MAX_SIZE],
471 offset: usize,
472 varint_size: usize,
473 }
474
475 impl<'a, R> GetVarint<'a, R>
476 where
477 R: AsyncRead + ?Sized,
478 {
479 fn new(reader: &'a mut R) -> Self {
480 Self {
481 reader,
482 buffer: [0; VarInt::MAX_SIZE],
483 offset: 0,
484 varint_size: 0,
485 }
486 }
487 }
488
489 impl<R> Future for GetVarint<'_, R>
490 where
491 R: AsyncRead + Unpin + ?Sized,
492 {
493 type Output = Result<VarInt, IoReadError>;
494
495 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
496 let this = self.get_mut();
497
498 if this.offset == 0 {
499 debug_assert_eq!(this.varint_size, 0);
500
501 let read = ready!(AsyncRead::poll_read(
502 Pin::new(this.reader),
503 cx,
504 &mut this.buffer[0..1]
505 ))?;
506
507 debug_assert!(read == 0 || read == 1);
508
509 if read == 1 {
510 this.offset = 1;
511 this.varint_size = VarInt::parse_size(this.buffer[0]);
512 debug_assert_ne!(this.varint_size, 0);
513 } else {
514 return Poll::Ready(Err(IoReadError::ImmediateFin));
515 }
516 }
517
518 while this.offset < this.varint_size {
519 let read = ready!(AsyncRead::poll_read(
520 Pin::new(this.reader),
521 cx,
522 &mut this.buffer[this.offset..this.varint_size]
523 ))?;
524
525 debug_assert!(read <= this.varint_size - this.offset);
526
527 if read > 0 {
528 this.offset += read;
529 } else {
530 return Poll::Ready(Err(IoReadError::UnexpectedFin));
531 }
532 }
533
534 let varint = BufferReader::new(&this.buffer[..this.varint_size])
535 .get_varint()
536 .expect("Varint is parsable");
537
538 Poll::Ready(Ok(varint))
539 }
540 }
541
542 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
546 pub struct GetBuffer<'a, R: ?Sized> {
547 reader: &'a mut R,
548 buffer: &'a mut [u8],
549 offset: usize,
550 }
551
552 impl<'a, R> GetBuffer<'a, R>
553 where
554 R: AsyncRead + ?Sized,
555 {
556 fn new(reader: &'a mut R, buffer: &'a mut [u8]) -> Self {
557 Self {
558 reader,
559 buffer,
560 offset: 0,
561 }
562 }
563 }
564
565 impl<R> Future for GetBuffer<'_, R>
566 where
567 R: AsyncRead + Unpin + ?Sized,
568 {
569 type Output = Result<(), IoReadError>;
570
571 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
572 let this = self.get_mut();
573
574 while this.offset < this.buffer.len() {
575 let read = ready!(AsyncRead::poll_read(
576 Pin::new(this.reader),
577 cx,
578 &mut this.buffer[this.offset..],
579 ))?;
580
581 debug_assert!(read <= this.buffer.len() - this.offset);
582
583 if read > 0 {
584 this.offset += read;
585 } else if this.offset > 0 {
586 return Poll::Ready(Err(IoReadError::UnexpectedFin));
587 } else {
588 return Poll::Ready(Err(IoReadError::ImmediateFin));
589 }
590 }
591
592 Poll::Ready(Ok(()))
593 }
594 }
595
596 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
600 pub struct PutVarint<'a, W: ?Sized> {
601 writer: &'a mut W,
602 buffer: [u8; VarInt::MAX_SIZE],
603 offset: usize,
604 varint_size: usize,
605 }
606
607 impl<'a, W> PutVarint<'a, W>
608 where
609 W: AsyncWrite + ?Sized,
610 {
611 fn new(writer: &'a mut W, varint: VarInt) -> Self {
612 let mut this = Self {
613 writer,
614 buffer: [0; VarInt::MAX_SIZE],
615 offset: 0,
616 varint_size: 0,
617 };
618
619 let mut buffer_writer = BufferWriter::new(&mut this.buffer);
620 buffer_writer
621 .put_varint(varint)
622 .expect("Inner buffer is enough for max varint");
623
624 this.varint_size = buffer_writer.offset();
625
626 this
627 }
628 }
629
630 impl<W> Future for PutVarint<'_, W>
631 where
632 W: AsyncWrite + Unpin + ?Sized,
633 {
634 type Output = Result<(), IoWriteError>;
635
636 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
637 let this = self.get_mut();
638
639 while this.offset < this.varint_size {
640 let written = ready!(AsyncWrite::poll_write(
641 Pin::new(this.writer),
642 cx,
643 &this.buffer[this.offset..this.varint_size]
644 ))?;
645
646 debug_assert!(written > 0);
647
648 this.offset += written;
649 }
650
651 Poll::Ready(Ok(()))
652 }
653 }
654
655 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
659 pub struct PutBuffer<'a, W: ?Sized> {
660 writer: &'a mut W,
661 buffer: &'a [u8],
662 offset: usize,
663 }
664
665 impl<'a, W> PutBuffer<'a, W>
666 where
667 W: AsyncWrite + ?Sized,
668 {
669 fn new(writer: &'a mut W, buffer: &'a [u8]) -> Self {
670 Self {
671 writer,
672 buffer,
673 offset: 0,
674 }
675 }
676 }
677
678 impl<W> Future for PutBuffer<'_, W>
679 where
680 W: AsyncWrite + Unpin + ?Sized,
681 {
682 type Output = Result<(), IoWriteError>;
683
684 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
685 let this = self.get_mut();
686
687 while this.offset < this.buffer.len() {
688 let written = ready!(AsyncWrite::poll_write(
689 Pin::new(this.writer),
690 cx,
691 &this.buffer[this.offset..]
692 ))?;
693
694 debug_assert!(written > 0);
695
696 this.offset += written;
697 }
698
699 Poll::Ready(Ok(()))
700 }
701 }
702}
703
704#[cfg(feature = "async")]
705pub use r#async::AsyncRead;
706
707#[cfg(feature = "async")]
708pub use r#async::AsyncWrite;
709
710#[cfg(feature = "async")]
711pub use r#async::BytesReaderAsync;
712
713#[cfg(feature = "async")]
714pub use r#async::BytesWriterAsync;
715
716#[cfg(feature = "async")]
717pub use r#async::IoReadError;
718
719#[cfg(feature = "async")]
720pub use r#async::IoWriteError;
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725
726 #[test]
727 fn read_varint() {
728 for (varint_buffer, value_expect) in utils::VARINT_TEST_CASES {
729 let mut buffer_reader = BufferReader::new(varint_buffer);
730
731 assert_eq!(buffer_reader.offset(), 0);
732 assert_eq!(buffer_reader.capacity(), varint_buffer.len());
733
734 let value = buffer_reader.get_varint().unwrap();
735
736 assert_eq!(value, value_expect);
737 assert_eq!(buffer_reader.offset(), varint_buffer.len());
738 assert_eq!(buffer_reader.capacity(), 0);
739 }
740 }
741
742 #[tokio::test]
743 async fn read_varint_async() {
744 for (varint_buffer, value_expect) in utils::VARINT_TEST_CASES {
745 let mut reader = utils::StepReader::new(varint_buffer);
746 let value = reader.get_varint().await.unwrap();
747 assert_eq!(value, value_expect);
748 }
749 }
750
751 #[test]
752 fn read_buffer() {
753 let mut buffer_reader = BufferReader::new(utils::BUFFER_TEST);
754 let value = buffer_reader.get_bytes(utils::BUFFER_TEST.len()).unwrap();
755 assert_eq!(value, utils::BUFFER_TEST);
756 }
757
758 #[tokio::test]
759 async fn read_buffer_async() {
760 let mut value = [0; utils::BUFFER_TEST.len()];
761 let mut reader = utils::StepReader::new(utils::BUFFER_TEST);
762 reader.get_buffer(&mut value).await.unwrap();
763 assert_eq!(value, utils::BUFFER_TEST);
764 }
765
766 #[test]
767 fn write_varint() {
768 let mut buffer = [0; VarInt::MAX_SIZE];
769 for (varint_buffer, value) in utils::VARINT_TEST_CASES {
770 let mut buffer_writer = BufferWriter::new(&mut buffer);
771
772 assert_eq!(buffer_writer.offset(), 0);
773 assert_eq!(buffer_writer.capacity(), VarInt::MAX_SIZE);
774
775 buffer_writer.put_varint(value).unwrap();
776
777 assert_eq!(buffer_writer.offset(), varint_buffer.len());
778 assert_eq!(buffer_writer.buffer_written(), varint_buffer);
779 }
780 }
781
782 #[tokio::test]
783 async fn write_varint_async() {
784 for (varint_buffer, value) in utils::VARINT_TEST_CASES {
785 let mut writer = utils::StepWriter::new(Some(8));
786
787 writer.put_varint(value).await.unwrap();
788 assert_eq!(value.size(), writer.written().len());
789 assert_eq!(writer.written(), varint_buffer);
790 }
791 }
792
793 #[test]
794 fn child_commit() {
795 let mut buffer_reader = BufferReader::new(&[0x1, 0x2]);
796
797 buffer_reader.skip(1).unwrap();
798 assert_eq!(buffer_reader.offset(), 1);
799 assert_eq!(buffer_reader.capacity(), 1);
800
801 let mut buffer_reader_child = buffer_reader.child();
802 assert_eq!(buffer_reader_child.offset(), 0);
803 assert_eq!(buffer_reader_child.capacity(), 1);
804
805 assert!(matches!(buffer_reader_child.get_bytes(1), Some([0x2])));
806 assert_eq!(buffer_reader_child.offset(), 1);
807
808 buffer_reader_child.commit();
809
810 assert_eq!(buffer_reader.offset(), 2);
811 assert_eq!(buffer_reader.capacity(), 0);
812 }
813
814 #[test]
815 fn child_drop() {
816 let mut buffer_reader = BufferReader::new(&[0x1, 0x2]);
817
818 buffer_reader.skip(1).unwrap();
819 assert_eq!(buffer_reader.offset(), 1);
820 assert_eq!(buffer_reader.capacity(), 1);
821
822 let mut buffer_reader_child = buffer_reader.child();
823 assert_eq!(buffer_reader_child.offset(), 0);
824 assert_eq!(buffer_reader_child.capacity(), 1);
825
826 assert!(matches!(buffer_reader_child.get_bytes(1), Some([0x2])));
827 assert_eq!(buffer_reader_child.offset(), 1);
828
829 assert_eq!(buffer_reader.offset(), 1);
830 assert_eq!(buffer_reader.capacity(), 1);
831 }
832
833 #[test]
834 fn none() {
835 let mut buffer_reader = BufferReader::new(&[]);
836 assert!(buffer_reader.get_varint().is_none());
837 assert!(buffer_reader.get_bytes(1).is_none());
838
839 let mut buffer_writer = BufferWriter::new(&mut []);
840 assert!(buffer_writer.put_varint(VarInt::from_u32(0)).is_err());
841 assert!(buffer_writer.put_bytes(&[0x0]).is_err());
842 }
843
844 #[tokio::test]
845 async fn none_async() {
846 let mut reader = utils::StepReader::new(vec![]);
847 assert!(reader.get_varint().await.is_err());
848 assert!(reader.get_buffer(&mut [0x0]).await.is_err());
849
850 let mut writer = utils::StepWriter::new(Some(0));
851 assert!(writer.put_varint(VarInt::from_u32(0)).await.is_err());
852 assert!(writer.put_buffer(&[0x0]).await.is_err());
853 }
854
855 #[tokio::test]
856 async fn fin_varint() {
857 for (buffer, _) in utils::VARINT_TEST_CASES {
858 for len in 0..buffer.len() {
859 let result = BytesReaderAsync::get_varint(&mut &buffer[..len]).await;
860
861 match len {
862 0 => assert!(matches!(result, Err(IoReadError::ImmediateFin))),
863 _ => assert!(matches!(result, Err(IoReadError::UnexpectedFin))),
864 }
865 }
866 }
867 }
868
869 #[tokio::test]
870 async fn fin_buffer() {
871 let mut buffer = [0; utils::BUFFER_TEST.len()];
872
873 for len in 0..utils::BUFFER_TEST.len() {
874 let result = (&mut &utils::BUFFER_TEST[..len])
875 .get_buffer(&mut buffer)
876 .await;
877
878 match len {
879 0 => assert!(matches!(result, Err(IoReadError::ImmediateFin))),
880 _ => assert!(matches!(result, Err(IoReadError::UnexpectedFin))),
881 }
882 }
883 }
884
885 mod utils {
886 use super::*;
887
888 pub const VARINT_TEST_CASES: [(&[u8], VarInt); 4] = [
889 (&[0xc2, 0x19, 0x7c, 0x5e, 0xff, 0x14, 0xe8, 0x8c], unsafe {
890 VarInt::from_u64_unchecked(151_288_809_941_952_652)
891 }),
892 (&[0x9d, 0x7f, 0x3e, 0x7d], VarInt::from_u32(494_878_333)),
893 (&[0x7b, 0xbd], VarInt::from_u32(15_293)),
894 (&[0x25], VarInt::from_u32(37)),
895 ];
896
897 pub const BUFFER_TEST: &[u8] = b"WebTransport";
898
899 #[cfg(feature = "async")]
900 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
901 pub mod r#async {
902 use super::*;
903 use std::pin::Pin;
904 use std::task::Context;
905 use std::task::Poll;
906
907 pub struct StepReader {
908 data: Box<[u8]>,
909 offset: usize,
910 to_pending: bool,
911 }
912
913 impl StepReader {
914 pub fn new<T>(data: T) -> Self
915 where
916 T: Into<Box<[u8]>>,
917 {
918 Self {
919 data: data.into(),
920 offset: 0,
921 to_pending: true,
922 }
923 }
924 }
925
926 impl AsyncRead for StepReader {
927 fn poll_read(
928 mut self: Pin<&mut Self>,
929 cx: &mut Context<'_>,
930 buf: &mut [u8],
931 ) -> Poll<std::io::Result<usize>> {
932 let new_pending = !self.to_pending;
933 let to_pending = std::mem::replace(&mut self.to_pending, new_pending);
934
935 if buf.is_empty() {
936 return Poll::Ready(Ok(0));
937 }
938
939 if to_pending {
940 cx.waker().wake_by_ref();
941 Poll::Pending
942 } else if let Some(&byte) = self.data.get(self.offset) {
943 buf[0] = byte;
944 self.offset += 1;
945 Poll::Ready(Ok(1))
946 } else {
947 Poll::Ready(Ok(0))
948 }
949 }
950 }
951
952 pub struct StepWriter {
953 buffer: Vec<u8>,
954 max_len: Option<usize>,
955 to_pending: bool,
956 }
957
958 impl StepWriter {
959 pub fn new(max_len: Option<usize>) -> Self {
960 Self {
961 buffer: Vec::new(),
962 max_len,
963 to_pending: true,
964 }
965 }
966
967 pub fn written(&self) -> &[u8] {
968 &self.buffer
969 }
970 }
971
972 impl AsyncWrite for StepWriter {
973 fn poll_write(
974 mut self: Pin<&mut Self>,
975 cx: &mut Context<'_>,
976 buf: &[u8],
977 ) -> Poll<Result<usize, std::io::Error>> {
978 let new_pending = !self.to_pending;
979 let to_pending = std::mem::replace(&mut self.to_pending, new_pending);
980
981 if buf.is_empty() {
982 return Poll::Ready(Ok(0));
983 }
984
985 if to_pending {
986 cx.waker().wake_by_ref();
987 Poll::Pending
988 } else if self.buffer.len() < self.max_len.unwrap_or(usize::MAX) {
989 let byte = buf[0];
990 self.buffer.push(byte);
991 Poll::Ready(Ok(1))
992 } else {
993 Poll::Ready(Err(std::io::Error::new(
994 std::io::ErrorKind::ConnectionReset,
995 "Reached max len",
996 )))
997 }
998 }
999 }
1000 }
1001
1002 #[cfg(feature = "async")]
1003 pub use r#async::StepReader;
1004
1005 #[cfg(feature = "async")]
1006 pub use r#async::StepWriter;
1007 }
1008}