wrpc_transport_web/
lib.rs

1//! wRPC WebTransport transport
2
3use core::ops::{Deref, DerefMut};
4
5use anyhow::Context as _;
6use bytes::Bytes;
7use quinn::VarInt;
8use tracing::{debug, error, trace, warn};
9use wrpc_transport::frame::{invoke, Accept, Incoming, Outgoing};
10use wrpc_transport::Invoke;
11use wtransport::{Connection, RecvStream, SendStream};
12
13/// WebTransport server with graceful stream shutdown handling
14pub type Server = wrpc_transport::Server<(), RecvStream, SendStream, ConnHandler>;
15
16/// WebTransport wRPC client
17#[derive(Clone, Debug)]
18pub struct Client(Connection);
19
20impl From<Connection> for Client {
21    fn from(session: Connection) -> Self {
22        Self(session)
23    }
24}
25
26impl Deref for Client {
27    type Target = Connection;
28
29    fn deref(&self) -> &Self::Target {
30        &self.0
31    }
32}
33
34impl DerefMut for Client {
35    fn deref_mut(&mut self) -> &mut Self::Target {
36        &mut self.0
37    }
38}
39
40/// Graceful stream shutdown handler
41pub struct ConnHandler;
42
43impl wrpc_transport::frame::ConnHandler<RecvStream, SendStream> for ConnHandler {
44    async fn on_ingress(mut rx: RecvStream, res: std::io::Result<()>) {
45        if let Err(err) = res {
46            error!(?err, "ingress failed");
47        } else {
48            debug!("ingress successfully complete");
49        }
50        if let Ok(code) = VarInt::from_u64(0x52e4a40fa8db) {
51            if let Err(err) = rx.quic_stream_mut().stop(code) {
52                debug!(?err, "failed to close incoming stream");
53            }
54        }
55    }
56
57    async fn on_egress(mut tx: SendStream, res: std::io::Result<()>) {
58        if let Err(err) = res {
59            error!(?err, "egress failed");
60        } else {
61            debug!("egress successfully complete");
62        }
63        match tx.quic_stream_mut().stopped().await {
64            Ok(None) => {
65                trace!("stream successfully closed")
66            }
67            Ok(Some(code)) => {
68                if u64::from(code) == 0x52e4a40fa8db {
69                    trace!("stream successfully closed")
70                } else {
71                    warn!(?code, "stream closed with code")
72                }
73            }
74            Err(err) => {
75                error!(?err, "failed to await stream close");
76            }
77        }
78    }
79}
80
81impl Invoke for &Client {
82    type Context = ();
83    type Outgoing = Outgoing;
84    type Incoming = Incoming;
85
86    async fn invoke<P>(
87        &self,
88        (): Self::Context,
89        instance: &str,
90        func: &str,
91        params: Bytes,
92        paths: impl AsRef<[P]> + Send,
93    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
94    where
95        P: AsRef<[Option<usize>]> + Send + Sync,
96    {
97        let stream = self
98            .0
99            .open_bi()
100            .await
101            .context("failed to initialize parameter stream")?;
102        let (tx, rx) = stream.await.context("failed to open parameter stream")?;
103        invoke(tx, rx, instance, func, params, paths).await
104    }
105}
106
107impl Invoke for Client {
108    type Context = ();
109    type Outgoing = Outgoing;
110    type Incoming = Incoming;
111
112    async fn invoke<P>(
113        &self,
114        (): Self::Context,
115        instance: &str,
116        func: &str,
117        params: Bytes,
118        paths: impl AsRef<[P]> + Send,
119    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
120    where
121        P: AsRef<[Option<usize>]> + Send + Sync,
122    {
123        (&self).invoke((), instance, func, params, paths).await
124    }
125}
126
127impl Accept for &Client {
128    type Context = ();
129    type Outgoing = SendStream;
130    type Incoming = RecvStream;
131
132    async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
133        let (tx, rx) = self.0.accept_bi().await.map_err(std::io::Error::other)?;
134        Ok(((), tx, rx))
135    }
136}
137
138impl Accept for Client {
139    type Context = ();
140    type Outgoing = SendStream;
141    type Incoming = RecvStream;
142
143    async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
144        (&self).accept().await
145    }
146}