wrpc_transport_web/
lib.rs1use core::ops::{Deref, DerefMut};
4
5use anyhow::Context as _;
6use bytes::Bytes;
7use quinn::VarInt;
8use tracing::{debug, error, trace, warn};
9use wrpc_transport::frame::{invoke, Accept, Incoming, Outgoing};
10use wrpc_transport::Invoke;
11use wtransport::{Connection, RecvStream, SendStream};
12
13pub type Server = wrpc_transport::Server<(), RecvStream, SendStream, ConnHandler>;
15
16#[derive(Clone, Debug)]
18pub struct Client(Connection);
19
20impl From<Connection> for Client {
21 fn from(session: Connection) -> Self {
22 Self(session)
23 }
24}
25
26impl Deref for Client {
27 type Target = Connection;
28
29 fn deref(&self) -> &Self::Target {
30 &self.0
31 }
32}
33
34impl DerefMut for Client {
35 fn deref_mut(&mut self) -> &mut Self::Target {
36 &mut self.0
37 }
38}
39
40pub struct ConnHandler;
42
43impl wrpc_transport::frame::ConnHandler<RecvStream, SendStream> for ConnHandler {
44 async fn on_ingress(mut rx: RecvStream, res: std::io::Result<()>) {
45 if let Err(err) = res {
46 error!(?err, "ingress failed");
47 } else {
48 debug!("ingress successfully complete");
49 }
50 if let Ok(code) = VarInt::from_u64(0x52e4a40fa8db) {
51 if let Err(err) = rx.quic_stream_mut().stop(code) {
52 debug!(?err, "failed to close incoming stream");
53 }
54 }
55 }
56
57 async fn on_egress(mut tx: SendStream, res: std::io::Result<()>) {
58 if let Err(err) = res {
59 error!(?err, "egress failed");
60 } else {
61 debug!("egress successfully complete");
62 }
63 match tx.quic_stream_mut().stopped().await {
64 Ok(None) => {
65 trace!("stream successfully closed")
66 }
67 Ok(Some(code)) => {
68 if u64::from(code) == 0x52e4a40fa8db {
69 trace!("stream successfully closed")
70 } else {
71 warn!(?code, "stream closed with code")
72 }
73 }
74 Err(err) => {
75 error!(?err, "failed to await stream close");
76 }
77 }
78 }
79}
80
81impl Invoke for &Client {
82 type Context = ();
83 type Outgoing = Outgoing;
84 type Incoming = Incoming;
85
86 async fn invoke<P>(
87 &self,
88 (): Self::Context,
89 instance: &str,
90 func: &str,
91 params: Bytes,
92 paths: impl AsRef<[P]> + Send,
93 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
94 where
95 P: AsRef<[Option<usize>]> + Send + Sync,
96 {
97 let stream = self
98 .0
99 .open_bi()
100 .await
101 .context("failed to initialize parameter stream")?;
102 let (tx, rx) = stream.await.context("failed to open parameter stream")?;
103 invoke(tx, rx, instance, func, params, paths).await
104 }
105}
106
107impl Invoke for Client {
108 type Context = ();
109 type Outgoing = Outgoing;
110 type Incoming = Incoming;
111
112 async fn invoke<P>(
113 &self,
114 (): Self::Context,
115 instance: &str,
116 func: &str,
117 params: Bytes,
118 paths: impl AsRef<[P]> + Send,
119 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
120 where
121 P: AsRef<[Option<usize>]> + Send + Sync,
122 {
123 (&self).invoke((), instance, func, params, paths).await
124 }
125}
126
127impl Accept for &Client {
128 type Context = ();
129 type Outgoing = SendStream;
130 type Incoming = RecvStream;
131
132 async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
133 let (tx, rx) = self.0.accept_bi().await.map_err(std::io::Error::other)?;
134 Ok(((), tx, rx))
135 }
136}
137
138impl Accept for Client {
139 type Context = ();
140 type Outgoing = SendStream;
141 type Incoming = RecvStream;
142
143 async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
144 (&self).accept().await
145 }
146}