wtransport/connection.rs
1//! # WebTransport Connection
2//!
3//! [`Connection`] provides an essential building block for managing WebTransport
4//! connections. It allows you to initiate, accept, and control data *streams*, send and receive
5//! *datagrams*, monitor connection status, and interact with various aspects of your WebTransport
6//! communication.
7//!
8//! WebTransport exchanges data either via [*streams*](crate#streams) or [*datagrams*](crate#datagrams).
9//!
10//! ## Streams
11//! WebTransport streams provide a lightweight, ordered byte-stream abstraction.
12//!
13//! There are two fundamental types of streams:
14//! - *Unidirectional* streams carry data in a single direction, from the stream initiator to its peer.
15//! - *Bidirectional* streams allow for data to be sent in both directions.
16//!
17//! Both server and client endpoints have the capability to create an arbitrary number of streams to
18//! operate concurrently.
19//!
20//! Each stream can be independently cancelled by both side.
21//!
22//! ### Examples
23//! #### Open a stream
24//! ```no_run
25//! # use anyhow::Result;
26//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
27//! use wtransport::Connection;
28//!
29//! // Open a bi-directional stream
30//! let (mut send_stream, mut recv_stream) = connection.open_bi().await?.await?;
31//!
32//! // Send data on the stream
33//! send_stream.write_all(b"Hello, wtransport!").await?;
34//!
35//! // Receive data from the stream
36//! let mut buffer = vec![0; 1024];
37//! let bytes_read = recv_stream.read(&mut buffer).await?;
38//!
39//! // Open an uni-directional stream (can only send data)
40//! let mut send_stream = connection.open_uni().await?.await?;
41//!
42//! // Send data on the stream
43//! send_stream.write_all(b"Hello, wtransport!").await?;
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! #### Accept a stream
49//! ```no_run
50//! # use anyhow::Result;
51//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
52//! use wtransport::Connection;
53//!
54//! // Await the peer opens a bi-directional stream
55//! let (mut send_stream, mut recv_stream) = connection.accept_bi().await?;
56//!
57//! // Can send and receive data on peer's stream
58//! send_stream.write_all(b"Hello, wtransport!").await?;
59//! # let mut buffer = vec![0; 1024];
60//! let bytes_read = recv_stream.read(&mut buffer).await?;
61//!
62//! // Await the peer opens an uni-directional stream (can only receive data)
63//! let mut recv_stream = connection.accept_uni().await?;
64//!
65//! // Receive data on the stream
66//! let bytes_read = recv_stream.read(&mut buffer).await?;
67//! # Ok(())
68//! # }
69//! ```
70//!
71//! ## Datagrams
72//! WebTransport datagrams are similar to UDP datagrams but come with an
73//! added layer of security through *encryption* and *congestion control*.
74//! Datagrams can arrive out of order or might not arrive at all, offering
75//! flexibility in data exchange scenarios.
76//!
77//! Unlike streams, which operate as byte-stream abstractions, WebTransport
78//! datagrams act more like messages.
79//!
80//! ### Examples
81//! ```no_run
82//! # use anyhow::Result;
83//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
84//! use wtransport::Connection;
85//!
86//! // Send datagram message
87//! connection.send_datagram(b"Hello, wtransport!")?;
88//!
89//! // Receive a datagram message
90//! let message = connection.receive_datagram().await?;
91//! # Ok(())
92//! # }
93//! ```
94
95use crate::datagram::Datagram;
96use crate::driver::utils::varint_w2q;
97use crate::driver::Driver;
98use crate::error::ConnectionError;
99use crate::error::ExportKeyingMaterialError;
100use crate::error::SendDatagramError;
101use crate::stream::OpeningBiStream;
102use crate::stream::OpeningUniStream;
103use crate::stream::RecvStream;
104use crate::stream::SendStream;
105use crate::tls::Certificate;
106use crate::tls::CertificateChain;
107use crate::tls::HandshakeData;
108use crate::SessionId;
109use crate::VarInt;
110use std::net::SocketAddr;
111use std::sync::Arc;
112use std::time::Duration;
113
114/// A WebTransport session connection.
115///
116/// For more details, see the [module documentation](crate::connection).
117///
118/// May be cloned to obtain another handle to the same connection.
119#[derive(Clone, Debug)]
120pub struct Connection {
121 quic_connection: quinn::Connection,
122 driver: Arc<Driver>,
123 session_id: SessionId,
124}
125
126impl Connection {
127 pub(crate) fn new(
128 quic_connection: quinn::Connection,
129 driver: Driver,
130 session_id: SessionId,
131 ) -> Self {
132 Self {
133 quic_connection,
134 driver: Arc::new(driver),
135 session_id,
136 }
137 }
138
139 /// Asynchronously accepts a unidirectional stream.
140 ///
141 /// This method is used to accept incoming unidirectional streams that have been initiated
142 /// by the remote peer.
143 /// It waits for the next unidirectional stream to be available, then wraps it in a
144 /// [`RecvStream`] that can be used to read data from the stream.
145 ///
146 /// # Cancel safety
147 ///
148 /// This method is cancel safe.
149 pub async fn accept_uni(&self) -> Result<RecvStream, ConnectionError> {
150 let stream = self
151 .driver
152 .accept_uni(self.session_id)
153 .await
154 .map_err(|driver_error| {
155 ConnectionError::with_driver_error(driver_error, &self.quic_connection)
156 })?
157 .into_stream();
158
159 Ok(RecvStream::new(stream))
160 }
161
162 /// Asynchronously accepts a bidirectional stream.
163 ///
164 /// This method is used to accept incoming bidirectional streams that have been initiated
165 /// by the remote peer.
166 /// It waits for the next bidirectional stream to be available, then wraps it in a
167 /// tuple containing a [`SendStream`] for sending data and a [`RecvStream`] for receiving
168 /// data on the stream.
169 ///
170 /// # Cancel safety
171 ///
172 /// This method is cancel safe.
173 pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
174 let stream = self
175 .driver
176 .accept_bi(self.session_id)
177 .await
178 .map_err(|driver_error| {
179 ConnectionError::with_driver_error(driver_error, &self.quic_connection)
180 })?
181 .into_stream();
182
183 Ok((SendStream::new(stream.0), RecvStream::new(stream.1)))
184 }
185
186 /// Asynchronously opens a new unidirectional stream.
187 ///
188 /// This method is used to initiate the opening of a new unidirectional stream.
189 ///
190 /// # Asynchronous Behavior
191 ///
192 /// This method is asynchronous and involves two `await` points:
193 ///
194 /// 1. The first `await` occurs during the initial phase of opening the stream, which may involve awaiting
195 /// the flow controller. This wait is necessary to ensure proper resource allocation and flow control.
196 /// It is safe to cancel this `await` point if needed.
197 ///
198 /// 2. The second `await` is internal to the returned [`OpeningUniStream`] object when it is used to initialize
199 /// the WebTransport stream. Cancelling this latter future before it completes may result in the stream
200 /// being closed during initialization.
201 ///
202 /// # Example
203 ///
204 /// ```no_run
205 /// # use wtransport::Connection;
206 /// # use anyhow::Result;
207 /// # async fn run(connection: Connection) -> Result<()> {
208 /// let send_stream = connection.open_uni().await?.await?;
209 /// # Ok(())
210 /// # }
211 /// ```
212 pub async fn open_uni(&self) -> Result<OpeningUniStream, ConnectionError> {
213 self.driver
214 .open_uni(self.session_id)
215 .await
216 .map_err(|driver_error| {
217 ConnectionError::with_driver_error(driver_error, &self.quic_connection)
218 })
219 }
220
221 /// Asynchronously opens a new bidirectional stream.
222 ///
223 /// This method is used to initiate the opening of a new bidirectional stream.
224 ///
225 /// # Asynchronous Behavior
226 ///
227 /// This method is asynchronous and involves two `await` points:
228 ///
229 /// 1. The first `await` occurs during the initial phase of opening the stream, which may involve awaiting
230 /// the flow controller. This wait is necessary to ensure proper resource allocation and flow control.
231 /// It is safe to cancel this `await` point if needed.
232 ///
233 /// 2. The second `await` is internal to the returned [`OpeningBiStream`] object when it is used to initialize
234 /// the WebTransport stream. Cancelling this latter future before it completes may result in the stream
235 /// being closed during initialization.
236 ///
237 /// # Example
238 ///
239 /// ```no_run
240 /// # use wtransport::Connection;
241 /// # use anyhow::Result;
242 /// # async fn run(connection: Connection) -> Result<()> {
243 /// let (send_stream, recv_stream) = connection.open_bi().await?.await?;
244 /// # Ok(())
245 /// # }
246 /// ```
247 pub async fn open_bi(&self) -> Result<OpeningBiStream, ConnectionError> {
248 self.driver
249 .open_bi(self.session_id)
250 .await
251 .map_err(|driver_error| {
252 ConnectionError::with_driver_error(driver_error, &self.quic_connection)
253 })
254 }
255
256 /// Asynchronously receives an application datagram from the remote peer.
257 ///
258 /// This method is used to receive an application datagram sent by the remote
259 /// peer over the connection.
260 /// It waits for a datagram to become available and returns the received [`Datagram`].
261 ///
262 /// # Example
263 ///
264 /// ```no_run
265 /// # use wtransport::Connection;
266 /// # use anyhow::Result;
267 /// # async fn run(connection: Connection) -> Result<()> {
268 /// let datagram = connection.receive_datagram().await?;
269 /// # Ok(())
270 /// # }
271 /// ```
272 pub async fn receive_datagram(&self) -> Result<Datagram, ConnectionError> {
273 self.driver
274 .receive_datagram(self.session_id)
275 .await
276 .map_err(|driver_error| {
277 ConnectionError::with_driver_error(driver_error, &self.quic_connection)
278 })
279 }
280
281 /// Sends an application datagram to the remote peer.
282 ///
283 /// This method is used to send an application datagram to the remote peer
284 /// over the connection.
285 /// The datagram payload is provided as a reference to a slice of bytes.
286 ///
287 /// # Example
288 ///
289 /// ```no_run
290 /// # use wtransport::Connection;
291 /// # use anyhow::Result;
292 /// # async fn run(connection: Connection) -> Result<()> {
293 /// connection.send_datagram(b"Hello, wtransport!")?;
294 /// # Ok(())
295 /// # }
296 /// ```
297 pub fn send_datagram<D>(&self, payload: D) -> Result<(), SendDatagramError>
298 where
299 D: AsRef<[u8]>,
300 {
301 self.driver.send_datagram(self.session_id, payload.as_ref())
302 }
303
304 /// Closes the connection immediately.
305 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
306 self.quic_connection.close(varint_w2q(error_code), reason);
307 }
308
309 /// Waits for the connection to be closed for any reason.
310 pub async fn closed(&self) -> ConnectionError {
311 self.quic_connection.closed().await.into()
312 }
313
314 /// Returns the WebTransport session identifier.
315 #[inline(always)]
316 pub fn session_id(&self) -> SessionId {
317 self.session_id
318 }
319
320 /// Returns the peer's UDP address.
321 ///
322 /// **Note**: as QUIC supports migration, remote address may change
323 /// during connection. Furthermore, when IPv6 support is enabled, IPv4
324 /// addresses may be mapped to IPv6.
325 #[inline(always)]
326 pub fn remote_address(&self) -> SocketAddr {
327 self.quic_connection.remote_address()
328 }
329
330 /// A stable identifier for this connection.
331 ///
332 /// Peer addresses and connection IDs can change, but this value will remain
333 /// fixed for the lifetime of the connection.
334 #[inline(always)]
335 pub fn stable_id(&self) -> usize {
336 self.quic_connection.stable_id()
337 }
338
339 /// Computes the maximum size of datagrams that may be passed to
340 /// [`send_datagram`](Self::send_datagram).
341 ///
342 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
343 ///
344 /// This may change over the lifetime of a connection according to variation in the path MTU
345 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
346 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
347 ///
348 /// Not necessarily the maximum size of received datagrams.
349 #[inline(always)]
350 pub fn max_datagram_size(&self) -> Option<usize> {
351 self.quic_connection
352 .max_datagram_size()
353 .map(|quic_max_size| quic_max_size - Datagram::header_size(self.session_id))
354 }
355
356 /// Current best estimate of this connection's latency (round-trip-time).
357 #[inline(always)]
358 pub fn rtt(&self) -> Duration {
359 self.quic_connection.rtt()
360 }
361
362 /// Derive keying material from this connection's TLS session secrets.
363 ///
364 /// When both peers call this method with the same `label` and `context`
365 /// arguments and `output` buffers of equal length, they will get the
366 /// same sequence of bytes in `output`. These bytes are cryptographically
367 /// strong and pseudorandom, and are suitable for use as keying material.
368 ///
369 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
370 pub fn export_keying_material(
371 &self,
372 output: &mut [u8],
373 label: &[u8],
374 context: &[u8],
375 ) -> Result<(), ExportKeyingMaterialError> {
376 self.quic_connection
377 .export_keying_material(output, label, context)
378 .map_err(|_: quinn::crypto::ExportKeyingMaterialError| ExportKeyingMaterialError)
379 }
380
381 /// Returns the peer's identity as a certificate chain if available.
382 ///
383 /// This function returns an `Option` containing a [`CertificateChain`]. If the peer's identity
384 /// is available, it is converted into a `CertificateChain` and returned. If the peer's identity
385 /// is not available, `None` is returned.
386 pub fn peer_identity(&self) -> Option<CertificateChain> {
387 self.quic_connection.peer_identity().map(|any| {
388 any.downcast::<Vec<rustls_pki_types::CertificateDer<'static>>>()
389 .expect("rustls certificate vector")
390 .into_iter()
391 .map(Certificate::from_rustls_pki)
392 .collect()
393 })
394 }
395
396 /// Retrieves handshake data associated with the connection.
397 pub fn handshake_data(&self) -> HandshakeData {
398 let hd = self
399 .quic_connection
400 .handshake_data()
401 .expect("fully established connection")
402 .downcast::<quinn::crypto::rustls::HandshakeData>()
403 .expect("valid downcast");
404
405 HandshakeData {
406 alpn: hd.protocol,
407 server_name: hd.server_name,
408 }
409 }
410
411 /// Returns a reference to the inner QUIC connection.
412 #[cfg(feature = "quinn")]
413 #[cfg_attr(docsrs, doc(cfg(feature = "quinn")))]
414 #[inline(always)]
415 pub fn quic_connection(&self) -> &quinn::Connection {
416 &self.quic_connection
417 }
418
419 /// Returns a mutable reference to the inner QUIC connection.
420 #[cfg(feature = "quinn")]
421 #[cfg_attr(docsrs, doc(cfg(feature = "quinn")))]
422 #[inline(always)]
423 pub fn quic_connection_mut(&mut self) -> &mut quinn::Connection {
424 &mut self.quic_connection
425 }
426}