wtransport/
connection.rs

1//! # WebTransport Connection
2//!
3//! [`Connection`] provides an essential building block for managing WebTransport
4//! connections. It allows you to initiate, accept, and control data *streams*, send and receive
5//! *datagrams*, monitor connection status, and interact with various aspects of your WebTransport
6//! communication.
7//!
8//! WebTransport exchanges data either via [*streams*](crate#streams) or [*datagrams*](crate#datagrams).
9//!
10//! ## Streams
11//! WebTransport streams provide a lightweight, ordered byte-stream abstraction.
12//!
13//! There are two fundamental types of streams:
14//!  - *Unidirectional* streams carry data in a single direction, from the stream initiator to its peer.
15//!  - *Bidirectional* streams allow for data to be sent in both directions.
16//!
17//! Both server and client endpoints have the capability to create an arbitrary number of streams to
18//! operate concurrently.
19//!
20//! Each stream can be independently cancelled by both side.
21//!
22//! ### Examples
23//! #### Open a stream
24//! ```no_run
25//! # use anyhow::Result;
26//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
27//! use wtransport::Connection;
28//!
29//! // Open a bi-directional stream
30//! let (mut send_stream, mut recv_stream) = connection.open_bi().await?.await?;
31//!
32//! // Send data on the stream
33//! send_stream.write_all(b"Hello, wtransport!").await?;
34//!
35//! // Receive data from the stream
36//! let mut buffer = vec![0; 1024];
37//! let bytes_read = recv_stream.read(&mut buffer).await?;
38//!
39//! // Open an uni-directional stream (can only send data)
40//! let mut send_stream = connection.open_uni().await?.await?;
41//!
42//! // Send data on the stream
43//! send_stream.write_all(b"Hello, wtransport!").await?;
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! #### Accept a stream
49//! ```no_run
50//! # use anyhow::Result;
51//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
52//! use wtransport::Connection;
53//!
54//! // Await the peer opens a bi-directional stream
55//! let (mut send_stream, mut recv_stream) = connection.accept_bi().await?;
56//!
57//! // Can send and receive data on peer's stream
58//! send_stream.write_all(b"Hello, wtransport!").await?;
59//! # let mut buffer = vec![0; 1024];
60//! let bytes_read = recv_stream.read(&mut buffer).await?;
61//!
62//! // Await the peer opens an uni-directional stream (can only receive data)
63//! let mut recv_stream = connection.accept_uni().await?;
64//!
65//! // Receive data on the stream
66//! let bytes_read = recv_stream.read(&mut buffer).await?;
67//! # Ok(())
68//! # }
69//! ```
70//!
71//! ## Datagrams
72//! WebTransport datagrams are similar to UDP datagrams but come with an
73//! added layer of security through *encryption* and *congestion control*.
74//! Datagrams can arrive out of order or might not arrive at all, offering
75//! flexibility in data exchange scenarios.
76//!
77//! Unlike streams, which operate as byte-stream abstractions, WebTransport
78//! datagrams act more like messages.
79//!
80//! ### Examples
81//! ```no_run
82//! # use anyhow::Result;
83//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
84//! use wtransport::Connection;
85//!
86//! // Send datagram message
87//! connection.send_datagram(b"Hello, wtransport!")?;
88//!
89//! // Receive a datagram message
90//! let message = connection.receive_datagram().await?;
91//! # Ok(())
92//! # }
93//! ```
94
95use crate::datagram::Datagram;
96use crate::driver::utils::varint_w2q;
97use crate::driver::Driver;
98use crate::error::ConnectionError;
99use crate::error::ExportKeyingMaterialError;
100use crate::error::SendDatagramError;
101use crate::stream::OpeningBiStream;
102use crate::stream::OpeningUniStream;
103use crate::stream::RecvStream;
104use crate::stream::SendStream;
105use crate::tls::Certificate;
106use crate::tls::CertificateChain;
107use crate::tls::HandshakeData;
108use crate::SessionId;
109use crate::VarInt;
110use std::net::SocketAddr;
111use std::sync::Arc;
112use std::time::Duration;
113
114/// A WebTransport session connection.
115///
116/// For more details, see the [module documentation](crate::connection).
117///
118/// May be cloned to obtain another handle to the same connection.
119#[derive(Clone, Debug)]
120pub struct Connection {
121    quic_connection: quinn::Connection,
122    driver: Arc<Driver>,
123    session_id: SessionId,
124}
125
126impl Connection {
127    pub(crate) fn new(
128        quic_connection: quinn::Connection,
129        driver: Driver,
130        session_id: SessionId,
131    ) -> Self {
132        Self {
133            quic_connection,
134            driver: Arc::new(driver),
135            session_id,
136        }
137    }
138
139    /// Asynchronously accepts a unidirectional stream.
140    ///
141    /// This method is used to accept incoming unidirectional streams that have been initiated
142    /// by the remote peer.
143    /// It waits for the next unidirectional stream to be available, then wraps it in a
144    /// [`RecvStream`] that can be used to read data from the stream.
145    ///
146    /// # Cancel safety
147    ///
148    /// This method is cancel safe.
149    pub async fn accept_uni(&self) -> Result<RecvStream, ConnectionError> {
150        let stream = self
151            .driver
152            .accept_uni(self.session_id)
153            .await
154            .map_err(|driver_error| {
155                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
156            })?
157            .into_stream();
158
159        Ok(RecvStream::new(stream))
160    }
161
162    /// Asynchronously accepts a bidirectional stream.
163    ///
164    /// This method is used to accept incoming bidirectional streams that have been initiated
165    /// by the remote peer.
166    /// It waits for the next bidirectional stream to be available, then wraps it in a
167    /// tuple containing a [`SendStream`] for sending data and a [`RecvStream`] for receiving
168    /// data on the stream.
169    ///
170    /// # Cancel safety
171    ///
172    /// This method is cancel safe.
173    pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
174        let stream = self
175            .driver
176            .accept_bi(self.session_id)
177            .await
178            .map_err(|driver_error| {
179                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
180            })?
181            .into_stream();
182
183        Ok((SendStream::new(stream.0), RecvStream::new(stream.1)))
184    }
185
186    /// Asynchronously opens a new unidirectional stream.
187    ///
188    /// This method is used to initiate the opening of a new unidirectional stream.
189    ///
190    /// # Asynchronous Behavior
191    ///
192    /// This method is asynchronous and involves two `await` points:
193    ///
194    /// 1. The first `await` occurs during the initial phase of opening the stream, which may involve awaiting
195    ///    the flow controller. This wait is necessary to ensure proper resource allocation and flow control.
196    ///    It is safe to cancel this `await` point if needed.
197    ///
198    /// 2. The second `await` is internal to the returned [`OpeningUniStream`] object when it is used to initialize
199    ///    the WebTransport stream. Cancelling this latter future before it completes may result in the stream
200    ///    being closed during initialization.
201    ///
202    /// # Example
203    ///
204    /// ```no_run
205    /// # use wtransport::Connection;
206    /// # use anyhow::Result;
207    /// # async fn run(connection: Connection) -> Result<()> {
208    /// let send_stream = connection.open_uni().await?.await?;
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub async fn open_uni(&self) -> Result<OpeningUniStream, ConnectionError> {
213        self.driver
214            .open_uni(self.session_id)
215            .await
216            .map_err(|driver_error| {
217                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
218            })
219    }
220
221    /// Asynchronously opens a new bidirectional stream.
222    ///
223    /// This method is used to initiate the opening of a new bidirectional stream.
224    ///
225    /// # Asynchronous Behavior
226    ///
227    /// This method is asynchronous and involves two `await` points:
228    ///
229    /// 1. The first `await` occurs during the initial phase of opening the stream, which may involve awaiting
230    ///    the flow controller. This wait is necessary to ensure proper resource allocation and flow control.
231    ///    It is safe to cancel this `await` point if needed.
232    ///
233    /// 2. The second `await` is internal to the returned [`OpeningBiStream`] object when it is used to initialize
234    ///    the WebTransport stream. Cancelling this latter future before it completes may result in the stream
235    ///    being closed during initialization.
236    ///
237    /// # Example
238    ///
239    /// ```no_run
240    /// # use wtransport::Connection;
241    /// # use anyhow::Result;
242    /// # async fn run(connection: Connection) -> Result<()> {
243    /// let (send_stream, recv_stream) = connection.open_bi().await?.await?;
244    /// # Ok(())
245    /// # }
246    /// ```
247    pub async fn open_bi(&self) -> Result<OpeningBiStream, ConnectionError> {
248        self.driver
249            .open_bi(self.session_id)
250            .await
251            .map_err(|driver_error| {
252                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
253            })
254    }
255
256    /// Asynchronously receives an application datagram from the remote peer.
257    ///
258    /// This method is used to receive an application datagram sent by the remote
259    /// peer over the connection.
260    /// It waits for a datagram to become available and returns the received [`Datagram`].
261    ///
262    /// # Example
263    ///
264    /// ```no_run
265    /// # use wtransport::Connection;
266    /// # use anyhow::Result;
267    /// # async fn run(connection: Connection) -> Result<()> {
268    /// let datagram = connection.receive_datagram().await?;
269    /// # Ok(())
270    /// # }
271    /// ```
272    pub async fn receive_datagram(&self) -> Result<Datagram, ConnectionError> {
273        self.driver
274            .receive_datagram(self.session_id)
275            .await
276            .map_err(|driver_error| {
277                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
278            })
279    }
280
281    /// Sends an application datagram to the remote peer.
282    ///
283    /// This method is used to send an application datagram to the remote peer
284    /// over the connection.
285    /// The datagram payload is provided as a reference to a slice of bytes.
286    ///
287    /// # Example
288    ///
289    /// ```no_run
290    /// # use wtransport::Connection;
291    /// # use anyhow::Result;
292    /// # async fn run(connection: Connection) -> Result<()> {
293    /// connection.send_datagram(b"Hello, wtransport!")?;
294    /// # Ok(())
295    /// # }
296    /// ```
297    pub fn send_datagram<D>(&self, payload: D) -> Result<(), SendDatagramError>
298    where
299        D: AsRef<[u8]>,
300    {
301        self.driver.send_datagram(self.session_id, payload.as_ref())
302    }
303
304    /// Closes the connection immediately.
305    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
306        self.quic_connection.close(varint_w2q(error_code), reason);
307    }
308
309    /// Waits for the connection to be closed for any reason.
310    pub async fn closed(&self) -> ConnectionError {
311        self.quic_connection.closed().await.into()
312    }
313
314    /// Returns the WebTransport session identifier.
315    #[inline(always)]
316    pub fn session_id(&self) -> SessionId {
317        self.session_id
318    }
319
320    /// Returns the peer's UDP address.
321    ///
322    /// **Note**: as QUIC supports migration, remote address may change
323    /// during connection. Furthermore, when IPv6 support is enabled, IPv4
324    /// addresses may be mapped to IPv6.
325    #[inline(always)]
326    pub fn remote_address(&self) -> SocketAddr {
327        self.quic_connection.remote_address()
328    }
329
330    /// A stable identifier for this connection.
331    ///
332    /// Peer addresses and connection IDs can change, but this value will remain
333    /// fixed for the lifetime of the connection.
334    #[inline(always)]
335    pub fn stable_id(&self) -> usize {
336        self.quic_connection.stable_id()
337    }
338
339    /// Computes the maximum size of datagrams that may be passed to
340    /// [`send_datagram`](Self::send_datagram).
341    ///
342    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
343    ///
344    /// This may change over the lifetime of a connection according to variation in the path MTU
345    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
346    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
347    ///
348    /// Not necessarily the maximum size of received datagrams.
349    #[inline(always)]
350    pub fn max_datagram_size(&self) -> Option<usize> {
351        self.quic_connection
352            .max_datagram_size()
353            .map(|quic_max_size| quic_max_size - Datagram::header_size(self.session_id))
354    }
355
356    /// Current best estimate of this connection's latency (round-trip-time).
357    #[inline(always)]
358    pub fn rtt(&self) -> Duration {
359        self.quic_connection.rtt()
360    }
361
362    /// Derive keying material from this connection's TLS session secrets.
363    ///
364    /// When both peers call this method with the same `label` and `context`
365    /// arguments and `output` buffers of equal length, they will get the
366    /// same sequence of bytes in `output`. These bytes are cryptographically
367    /// strong and pseudorandom, and are suitable for use as keying material.
368    ///
369    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
370    pub fn export_keying_material(
371        &self,
372        output: &mut [u8],
373        label: &[u8],
374        context: &[u8],
375    ) -> Result<(), ExportKeyingMaterialError> {
376        self.quic_connection
377            .export_keying_material(output, label, context)
378            .map_err(|_: quinn::crypto::ExportKeyingMaterialError| ExportKeyingMaterialError)
379    }
380
381    /// Returns the peer's identity as a certificate chain if available.
382    ///
383    /// This function returns an `Option` containing a [`CertificateChain`]. If the peer's identity
384    /// is available, it is converted into a `CertificateChain` and returned. If the peer's identity
385    /// is not available, `None` is returned.
386    pub fn peer_identity(&self) -> Option<CertificateChain> {
387        self.quic_connection.peer_identity().map(|any| {
388            any.downcast::<Vec<rustls_pki_types::CertificateDer<'static>>>()
389                .expect("rustls certificate vector")
390                .into_iter()
391                .map(Certificate::from_rustls_pki)
392                .collect()
393        })
394    }
395
396    /// Retrieves handshake data associated with the connection.
397    pub fn handshake_data(&self) -> HandshakeData {
398        let hd = self
399            .quic_connection
400            .handshake_data()
401            .expect("fully established connection")
402            .downcast::<quinn::crypto::rustls::HandshakeData>()
403            .expect("valid downcast");
404
405        HandshakeData {
406            alpn: hd.protocol,
407            server_name: hd.server_name,
408        }
409    }
410
411    /// Returns a reference to the inner QUIC connection.
412    #[cfg(feature = "quinn")]
413    #[cfg_attr(docsrs, doc(cfg(feature = "quinn")))]
414    #[inline(always)]
415    pub fn quic_connection(&self) -> &quinn::Connection {
416        &self.quic_connection
417    }
418
419    /// Returns a mutable reference to the inner QUIC connection.
420    #[cfg(feature = "quinn")]
421    #[cfg_attr(docsrs, doc(cfg(feature = "quinn")))]
422    #[inline(always)]
423    pub fn quic_connection_mut(&mut self) -> &mut quinn::Connection {
424        &mut self.quic_connection
425    }
426}