wtransport_proto/
frame.rs

1use crate::bytes::BufferReader;
2use crate::bytes::BufferWriter;
3use crate::bytes::BytesReader;
4use crate::bytes::BytesWriter;
5use crate::bytes::EndOfBuffer;
6use crate::ids::InvalidSessionId;
7use crate::ids::SessionId;
8use crate::varint::VarInt;
9use std::borrow::Cow;
10
11#[cfg(feature = "async")]
12use crate::bytes::AsyncRead;
13
14#[cfg(feature = "async")]
15use crate::bytes::AsyncWrite;
16
17#[cfg(feature = "async")]
18use crate::bytes;
19
20/// Error frame parsing.
21#[derive(Debug, thiserror::Error)]
22pub enum ParseError {
23    /// Error for unknown frame ID.
24    #[error("cannot parse HTTP3 frame as ID is unknown")]
25    UnknownFrame,
26
27    /// Error for invalid session ID.
28    #[error("cannot parse HTTP3 frame as session ID is invalid")]
29    InvalidSessionId,
30
31    /// Payload required too big.
32    #[error("cannot parse HTTP3 frame as payload limit is reached")]
33    PayloadTooBig,
34}
35
36/// An error during frame I/O read operation.
37#[cfg(feature = "async")]
38#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
39#[derive(Debug, thiserror::Error)]
40pub enum IoReadError {
41    /// Error during parsing a frame.
42    #[error(transparent)]
43    Parse(ParseError),
44
45    /// Error due to I/O operation.
46    #[error(transparent)]
47    IO(bytes::IoReadError),
48}
49
50#[cfg(feature = "async")]
51impl From<bytes::IoReadError> for IoReadError {
52    #[inline(always)]
53    fn from(io_error: bytes::IoReadError) -> Self {
54        IoReadError::IO(io_error)
55    }
56}
57
58/// An error during frame I/O write operation.
59#[cfg(feature = "async")]
60pub type IoWriteError = bytes::IoWriteError;
61
62/// Alias for [`Frame<'static>`](Frame);
63pub type FrameOwned = Frame<'static>;
64
65/// An HTTP3 [`Frame`] type.
66#[derive(Copy, Clone, Debug)]
67pub enum FrameKind {
68    /// DATA frame type.
69    Data,
70
71    /// HEADERS frame type.
72    Headers,
73
74    /// SETTINGS frame type.
75    Settings,
76
77    /// WebTransport frame type.
78    WebTransport,
79
80    /// Exercise frame.
81    Exercise(VarInt),
82}
83
84impl FrameKind {
85    /// Checks whether an `id` is valid for a [`FrameKind::Exercise`].
86    #[inline(always)]
87    pub const fn is_id_exercise(id: VarInt) -> bool {
88        id.into_inner() >= 0x21 && ((id.into_inner() - 0x21) % 0x1f == 0)
89    }
90
91    const fn parse(id: VarInt) -> Option<Self> {
92        match id {
93            frame_kind_ids::DATA => Some(FrameKind::Data),
94            frame_kind_ids::HEADERS => Some(FrameKind::Headers),
95            frame_kind_ids::SETTINGS => Some(FrameKind::Settings),
96            frame_kind_ids::WEBTRANSPORT_STREAM => Some(FrameKind::WebTransport),
97            id if FrameKind::is_id_exercise(id) => Some(FrameKind::Exercise(id)),
98            _ => None,
99        }
100    }
101
102    const fn id(self) -> VarInt {
103        match self {
104            FrameKind::Data => frame_kind_ids::DATA,
105            FrameKind::Headers => frame_kind_ids::HEADERS,
106            FrameKind::Settings => frame_kind_ids::SETTINGS,
107            FrameKind::WebTransport => frame_kind_ids::WEBTRANSPORT_STREAM,
108            FrameKind::Exercise(id) => id,
109        }
110    }
111}
112
113/// An HTTP3 frame.
114#[derive(Debug)]
115pub struct Frame<'a> {
116    kind: FrameKind,
117    payload: Cow<'a, [u8]>,
118    session_id: Option<SessionId>,
119}
120
121impl<'a> Frame<'a> {
122    const MAX_PARSE_PAYLOAD_ALLOWED: usize = 4096;
123
124    /// Creates a new frame of type [`FrameKind::Headers`].
125    ///
126    /// # Panics
127    ///
128    /// Panics if the `payload` size if greater than [`VarInt::MAX`].
129    #[inline(always)]
130    pub fn new_headers(payload: Cow<'a, [u8]>) -> Self {
131        Self::new(FrameKind::Headers, payload, None)
132    }
133
134    /// Creates a new frame of type [`FrameKind::Settings`].
135    ///
136    /// # Panics
137    ///
138    /// Panics if the `payload` size if greater than [`VarInt::MAX`].
139    #[inline(always)]
140    pub fn new_settings(payload: Cow<'a, [u8]>) -> Self {
141        Self::new(FrameKind::Settings, payload, None)
142    }
143
144    /// Creates a new frame of type [`FrameKind::WebTransport`].
145    #[inline(always)]
146    pub fn new_webtransport(session_id: SessionId) -> Self {
147        Self::new(
148            FrameKind::WebTransport,
149            Cow::Owned(Default::default()),
150            Some(session_id),
151        )
152    }
153
154    /// Creates a new frame of type [`FrameKind::Data`].
155    ///
156    /// # Panics
157    ///
158    /// Panics if the `payload` size if greater than [`VarInt::MAX`].
159    #[inline(always)]
160    pub fn new_data(payload: Cow<'a, [u8]>) -> Self {
161        Self::new(FrameKind::Data, payload, None)
162    }
163
164    /// Creates a new frame of type [`FrameKind::Exercise`].
165    ///
166    /// # Panics
167    ///
168    /// * Panics if the `payload` size if greater than [`VarInt::MAX`].
169    /// * Panics if `id` is not a valid exercise (see [`FrameKind::is_id_exercise`]).
170    #[inline(always)]
171    pub fn new_exercise(id: VarInt, payload: Cow<'a, [u8]>) -> Self {
172        assert!(FrameKind::is_id_exercise(id));
173        Self::new(FrameKind::Exercise(id), payload, None)
174    }
175
176    /// Reads a [`Frame`] from a [`BytesReader`].
177    ///
178    /// It returns [`None`] if the `bytes_reader` does not contain enough bytes
179    /// to parse an entire frame.
180    ///
181    /// In case [`None`] or [`Err`], `bytes_reader` might be partially read.
182    pub fn read<R>(bytes_reader: &mut R) -> Result<Option<Self>, ParseError>
183    where
184        R: BytesReader<'a>,
185    {
186        let kind = match bytes_reader.get_varint() {
187            Some(kind_id) => FrameKind::parse(kind_id).ok_or(ParseError::UnknownFrame)?,
188            None => return Ok(None),
189        };
190
191        if matches!(kind, FrameKind::WebTransport) {
192            let session_id = match bytes_reader.get_varint() {
193                Some(session_id) => SessionId::try_from_varint(session_id)
194                    .map_err(|InvalidSessionId| ParseError::InvalidSessionId)?,
195                None => return Ok(None),
196            };
197
198            Ok(Some(Self::new_webtransport(session_id)))
199        } else {
200            let payload_len = match bytes_reader.get_varint() {
201                Some(payload_len) => payload_len.into_inner() as usize,
202                None => return Ok(None),
203            };
204
205            if payload_len > Self::MAX_PARSE_PAYLOAD_ALLOWED {
206                return Err(ParseError::PayloadTooBig);
207            }
208
209            let Some(payload) = bytes_reader.get_bytes(payload_len) else {
210                return Ok(None);
211            };
212
213            Ok(Some(Self::new(kind, Cow::Borrowed(payload), None)))
214        }
215    }
216
217    /// Reads a [`Frame`] from a `reader`.
218    #[cfg(feature = "async")]
219    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
220    pub async fn read_async<R>(reader: &mut R) -> Result<Frame<'a>, IoReadError>
221    where
222        R: AsyncRead + Unpin + ?Sized,
223    {
224        use crate::bytes::BytesReaderAsync;
225
226        let kind_id = reader.get_varint().await?;
227        let kind = FrameKind::parse(kind_id).ok_or(IoReadError::Parse(ParseError::UnknownFrame))?;
228
229        if matches!(kind, FrameKind::WebTransport) {
230            let session_id =
231                SessionId::try_from_varint(reader.get_varint().await.map_err(|e| match e {
232                    bytes::IoReadError::ImmediateFin => bytes::IoReadError::UnexpectedFin,
233                    _ => e,
234                })?)
235                .map_err(|InvalidSessionId| IoReadError::Parse(ParseError::InvalidSessionId))?;
236
237            Ok(Self::new_webtransport(session_id))
238        } else {
239            let payload_len = reader
240                .get_varint()
241                .await
242                .map_err(|e| match e {
243                    bytes::IoReadError::ImmediateFin => bytes::IoReadError::UnexpectedFin,
244                    _ => e,
245                })?
246                .into_inner() as usize;
247
248            if payload_len > Self::MAX_PARSE_PAYLOAD_ALLOWED {
249                return Err(IoReadError::Parse(ParseError::PayloadTooBig));
250            }
251
252            let mut payload = vec![0; payload_len];
253
254            reader.get_buffer(&mut payload).await.map_err(|e| match e {
255                bytes::IoReadError::ImmediateFin => bytes::IoReadError::UnexpectedFin,
256                _ => e,
257            })?;
258
259            payload.shrink_to_fit();
260
261            Ok(Self::new(kind, Cow::Owned(payload), None))
262        }
263    }
264
265    /// Reads a [`Frame`] from a [`BufferReader`].
266    ///
267    /// It returns [`None`] if the `buffer_reader` does not contain enough bytes
268    /// to parse an entire frame.
269    ///
270    /// In case [`None`] or [`Err`], `buffer_reader` offset if not advanced.
271    pub fn read_from_buffer(
272        buffer_reader: &mut BufferReader<'a>,
273    ) -> Result<Option<Self>, ParseError> {
274        let mut buffer_reader_child = buffer_reader.child();
275
276        match Self::read(&mut *buffer_reader_child)? {
277            Some(frame) => {
278                buffer_reader_child.commit();
279                Ok(Some(frame))
280            }
281            None => Ok(None),
282        }
283    }
284
285    /// Writes a [`Frame`] into a [`BytesWriter`].
286    ///
287    /// It returns [`Err`] if the `bytes_writer` does not have enough capacity
288    /// to write the entire frame.
289    /// See [`Self::write_size`] to retrieve the exact amount of required capacity.
290    ///
291    /// In case [`Err`], `bytes_writer` might be partially written.
292    ///
293    /// # Panics
294    ///
295    /// Panics if the payload size if greater than [`VarInt::MAX`].
296    pub fn write<W>(&self, bytes_writer: &mut W) -> Result<(), EndOfBuffer>
297    where
298        W: BytesWriter,
299    {
300        bytes_writer.put_varint(self.kind.id())?;
301
302        if let Some(session_id) = self.session_id() {
303            bytes_writer.put_varint(session_id.into_varint())?;
304        } else {
305            bytes_writer.put_varint(
306                VarInt::try_from(self.payload.len() as u64)
307                    .expect("Payload cannot be larger than varint max"),
308            )?;
309            bytes_writer.put_bytes(&self.payload)?;
310        }
311
312        Ok(())
313    }
314
315    /// Writes a [`Frame`] into a `writer`.
316    ///
317    /// # Panics
318    ///
319    /// Panics if the payload size if greater than [`VarInt::MAX`].
320    #[cfg(feature = "async")]
321    #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
322    pub async fn write_async<W>(&self, writer: &mut W) -> Result<(), IoWriteError>
323    where
324        W: AsyncWrite + Unpin + ?Sized,
325    {
326        use crate::bytes::BytesWriterAsync;
327
328        writer.put_varint(self.kind.id()).await?;
329
330        if let Some(session_id) = self.session_id() {
331            writer.put_varint(session_id.into_varint()).await?;
332        } else {
333            writer
334                .put_varint(
335                    VarInt::try_from(self.payload.len() as u64)
336                        .expect("Payload cannot be larger than varint max"),
337                )
338                .await?;
339            writer.put_buffer(&self.payload).await?;
340        }
341
342        Ok(())
343    }
344
345    /// Writes this [`Frame`] into a buffer via [`BufferWriter`].
346    ///
347    /// In case [`Err`], `buffer_writer` is not advanced.
348    ///
349    /// # Panics
350    ///
351    /// Panics if the payload size if greater than [`VarInt::MAX`].
352    pub fn write_to_buffer(&self, buffer_writer: &mut BufferWriter) -> Result<(), EndOfBuffer> {
353        if buffer_writer.capacity() < self.write_size() {
354            return Err(EndOfBuffer);
355        }
356
357        self.write(buffer_writer)
358            .expect("Enough capacity for frame");
359
360        Ok(())
361    }
362
363    /// Returns the needed capacity to write this frame into a buffer.
364    pub fn write_size(&self) -> usize {
365        if let Some(session_id) = self.session_id() {
366            self.kind.id().size() + session_id.into_varint().size()
367        } else {
368            self.kind.id().size()
369                + VarInt::try_from(self.payload.len() as u64)
370                    .expect("Payload cannot be larger than varint max")
371                    .size()
372                + self.payload.len()
373        }
374    }
375
376    /// Returns the [`FrameKind`] of this [`Frame`].
377    #[inline(always)]
378    pub const fn kind(&self) -> FrameKind {
379        self.kind
380    }
381
382    /// Returns the payload of this [`Frame`].
383    #[inline(always)]
384    pub fn payload(&self) -> &[u8] {
385        &self.payload
386    }
387
388    /// Returns the [`SessionId`] if frame is [`FrameKind::WebTransport`],
389    /// otherwise returns [`None`].
390    #[inline(always)]
391    pub fn session_id(&self) -> Option<SessionId> {
392        matches!(self.kind, FrameKind::WebTransport).then(|| {
393            self.session_id
394                .expect("WebTransport frame contains session id")
395        })
396    }
397
398    /// # Panics
399    ///
400    /// Panics if the `payload` size if greater than [`VarInt::MAX`].
401    fn new(kind: FrameKind, payload: Cow<'a, [u8]>, session_id: Option<SessionId>) -> Self {
402        if let FrameKind::Exercise(id) = kind {
403            debug_assert!(FrameKind::is_id_exercise(id));
404        } else if let FrameKind::WebTransport = kind {
405            debug_assert!(payload.is_empty());
406            debug_assert!(session_id.is_some());
407        }
408
409        assert!(payload.len() <= VarInt::MAX.into_inner() as usize);
410
411        Self {
412            kind,
413            payload,
414            session_id,
415        }
416    }
417
418    #[cfg(test)]
419    pub(crate) fn into_owned<'b>(self) -> Frame<'b> {
420        Frame {
421            kind: self.kind,
422            payload: Cow::Owned(self.payload.into_owned()),
423            session_id: self.session_id,
424        }
425    }
426
427    #[cfg(test)]
428    pub(crate) fn serialize_any(kind: VarInt, payload: &[u8]) -> Vec<u8> {
429        let mut buffer = Vec::new();
430
431        Self {
432            kind: FrameKind::Exercise(kind),
433            payload: Cow::Owned(payload.to_vec()),
434            session_id: None,
435        }
436        .write(&mut buffer)
437        .unwrap();
438
439        buffer
440    }
441
442    #[cfg(test)]
443    pub(crate) fn serialize_webtransport(session_id: SessionId) -> Vec<u8> {
444        let mut buffer = Vec::new();
445
446        Self {
447            kind: FrameKind::WebTransport,
448            payload: Cow::Owned(Default::default()),
449            session_id: Some(session_id),
450        }
451        .write(&mut buffer)
452        .unwrap();
453
454        buffer
455    }
456}
457
458mod frame_kind_ids {
459    use crate::varint::VarInt;
460
461    pub const DATA: VarInt = VarInt::from_u32(0x00);
462    pub const HEADERS: VarInt = VarInt::from_u32(0x01);
463    pub const SETTINGS: VarInt = VarInt::from_u32(0x04);
464    pub const WEBTRANSPORT_STREAM: VarInt = VarInt::from_u32(0x41);
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use crate::headers::Headers;
471    use crate::settings::Settings;
472
473    #[test]
474    fn settings() {
475        let settings = Settings::builder()
476            .qpack_blocked_streams(VarInt::from_u32(1))
477            .qpack_max_table_capacity(VarInt::from_u32(2))
478            .enable_h3_datagrams()
479            .enable_webtransport()
480            .webtransport_max_sessions(VarInt::from_u32(3))
481            .build();
482
483        let frame = settings.generate_frame();
484        assert!(frame.session_id().is_none());
485        assert!(matches!(frame.kind(), FrameKind::Settings));
486
487        let frame = utils::assert_serde(frame);
488        Settings::with_frame(&frame).unwrap();
489    }
490
491    #[tokio::test]
492    async fn settings_async() {
493        let settings = Settings::builder()
494            .qpack_blocked_streams(VarInt::from_u32(1))
495            .qpack_max_table_capacity(VarInt::from_u32(2))
496            .enable_h3_datagrams()
497            .enable_webtransport()
498            .webtransport_max_sessions(VarInt::from_u32(3))
499            .build();
500
501        let frame = settings.generate_frame();
502        assert!(frame.session_id().is_none());
503        assert!(matches!(frame.kind(), FrameKind::Settings));
504
505        let frame = utils::assert_serde_async(frame).await;
506        Settings::with_frame(&frame).unwrap();
507    }
508
509    #[test]
510    fn headers() {
511        let headers = Headers::from_iter([("key1", "value1")]);
512
513        let frame = headers.generate_frame();
514        assert!(frame.session_id().is_none());
515        assert!(matches!(frame.kind(), FrameKind::Headers));
516
517        let frame = utils::assert_serde(frame);
518        Headers::with_frame(&frame).unwrap();
519    }
520
521    #[tokio::test]
522    async fn headers_async() {
523        let headers = Headers::from_iter([("key1", "value1")]);
524
525        let frame = headers.generate_frame();
526        assert!(frame.session_id().is_none());
527        assert!(matches!(frame.kind(), FrameKind::Headers));
528
529        let frame = utils::assert_serde_async(frame).await;
530        Headers::with_frame(&frame).unwrap();
531    }
532
533    #[test]
534    fn webtransport() {
535        let session_id = SessionId::try_from_varint(VarInt::from_u32(0)).unwrap();
536        let frame = Frame::new_webtransport(session_id);
537
538        assert!(frame.payload().is_empty());
539        assert!(matches!(frame.session_id(), Some(x) if x == session_id));
540        assert!(matches!(frame.kind(), FrameKind::WebTransport));
541
542        let frame = utils::assert_serde(frame);
543
544        assert!(frame.payload().is_empty());
545        assert!(matches!(frame.session_id(), Some(x) if x == session_id));
546        assert!(matches!(frame.kind(), FrameKind::WebTransport));
547    }
548
549    #[tokio::test]
550    async fn webtransport_async() {
551        let session_id = SessionId::try_from_varint(VarInt::from_u32(0)).unwrap();
552        let frame = Frame::new_webtransport(session_id);
553
554        assert!(frame.payload().is_empty());
555        assert!(matches!(frame.session_id(), Some(x) if x == session_id));
556        assert!(matches!(frame.kind(), FrameKind::WebTransport));
557
558        let frame = utils::assert_serde_async(frame).await;
559
560        assert!(frame.payload().is_empty());
561        assert!(matches!(frame.session_id(), Some(x) if x == session_id));
562        assert!(matches!(frame.kind(), FrameKind::WebTransport));
563    }
564
565    #[test]
566    fn read_eof() {
567        let buffer = Frame::serialize_any(FrameKind::Data.id(), b"This is a test payload");
568        assert!(Frame::read(&mut &buffer[..buffer.len() - 1])
569            .unwrap()
570            .is_none());
571    }
572
573    #[tokio::test]
574    async fn read_eof_async() {
575        let buffer = Frame::serialize_any(FrameKind::Data.id(), b"This is a test payload");
576
577        for len in 0..buffer.len() {
578            let result = Frame::read_async(&mut &buffer[..len]).await;
579
580            match len {
581                0 => assert!(matches!(
582                    result,
583                    Err(IoReadError::IO(bytes::IoReadError::ImmediateFin))
584                )),
585                _ => assert!(matches!(
586                    result,
587                    Err(IoReadError::IO(bytes::IoReadError::UnexpectedFin))
588                )),
589            }
590        }
591    }
592
593    #[tokio::test]
594    async fn read_eof_webtransport_async() {
595        let session_id = SessionId::try_from_varint(VarInt::from_u32(0)).unwrap();
596        let buffer = Frame::serialize_webtransport(session_id);
597
598        for len in 0..buffer.len() {
599            let result = Frame::read_async(&mut &buffer[..len]).await;
600
601            match len {
602                0 => assert!(matches!(
603                    result,
604                    Err(IoReadError::IO(bytes::IoReadError::ImmediateFin))
605                )),
606                _ => assert!(matches!(
607                    result,
608                    Err(IoReadError::IO(bytes::IoReadError::UnexpectedFin))
609                )),
610            }
611        }
612    }
613
614    #[test]
615    fn unknown_frame() {
616        let buffer = Frame::serialize_any(VarInt::from_u32(0x0042_4242), b"This is a test payload");
617
618        assert!(matches!(
619            Frame::read(&mut buffer.as_slice()),
620            Err(ParseError::UnknownFrame)
621        ));
622    }
623
624    #[tokio::test]
625    async fn unknown_frame_async() {
626        let buffer = Frame::serialize_any(VarInt::from_u32(0x0042_4242), b"This is a test payload");
627
628        assert!(matches!(
629            Frame::read_async(&mut buffer.as_slice()).await,
630            Err(IoReadError::Parse(ParseError::UnknownFrame))
631        ));
632    }
633
634    #[test]
635    fn invalid_session_id() {
636        let invalid_session_id = SessionId::maybe_invalid(VarInt::from_u32(1));
637        let buffer = Frame::serialize_webtransport(invalid_session_id);
638
639        assert!(matches!(
640            Frame::read(&mut buffer.as_slice()),
641            Err(ParseError::InvalidSessionId)
642        ));
643    }
644
645    #[tokio::test]
646    async fn invalid_session_id_async() {
647        let invalid_session_id = SessionId::maybe_invalid(VarInt::from_u32(1));
648        let buffer = Frame::serialize_webtransport(invalid_session_id);
649
650        assert!(matches!(
651            Frame::read_async(&mut buffer.as_slice()).await,
652            Err(IoReadError::Parse(ParseError::InvalidSessionId))
653        ));
654    }
655
656    #[test]
657    fn payload_too_big() {
658        let mut buffer = Vec::new();
659        buffer.put_varint(FrameKind::Data.id()).unwrap();
660        buffer
661            .put_varint(VarInt::from_u32(
662                Frame::MAX_PARSE_PAYLOAD_ALLOWED as u32 + 1,
663            ))
664            .unwrap();
665
666        assert!(matches!(
667            Frame::read_from_buffer(&mut BufferReader::new(&buffer)),
668            Err(ParseError::PayloadTooBig)
669        ));
670    }
671
672    #[tokio::test]
673    async fn payload_too_big_async() {
674        let mut buffer = Vec::new();
675        buffer.put_varint(FrameKind::Data.id()).unwrap();
676        buffer
677            .put_varint(VarInt::from_u32(
678                Frame::MAX_PARSE_PAYLOAD_ALLOWED as u32 + 1,
679            ))
680            .unwrap();
681
682        assert!(matches!(
683            Frame::read_async(&mut &*buffer).await,
684            Err(IoReadError::Parse(ParseError::PayloadTooBig)),
685        ));
686    }
687
688    mod utils {
689        use super::*;
690
691        pub fn assert_serde(frame: Frame) -> Frame {
692            let mut buffer = Vec::new();
693
694            frame.write(&mut buffer).unwrap();
695            assert_eq!(buffer.len(), frame.write_size());
696
697            let mut buffer = buffer.as_slice();
698            let frame = Frame::read(&mut buffer).unwrap().unwrap();
699            assert!(buffer.is_empty());
700
701            frame.into_owned()
702        }
703
704        #[cfg(feature = "async")]
705        pub async fn assert_serde_async(frame: Frame<'_>) -> Frame<'_> {
706            let mut buffer = Vec::new();
707
708            frame.write_async(&mut buffer).await.unwrap();
709            assert_eq!(buffer.len(), frame.write_size());
710
711            let mut buffer = buffer.as_slice();
712            let frame = Frame::read_async(&mut buffer).await.unwrap();
713            assert!(buffer.is_empty());
714
715            frame.into_owned()
716        }
717    }
718}