wrpc_transport/frame/tcp/
tokio.rs1use core::net::SocketAddr;
4
5use anyhow::{bail, Context as _};
6use bytes::Bytes;
7use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
8use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
9use tracing::instrument;
10
11use crate::frame::{invoke, Accept, Incoming, Outgoing};
12use crate::Invoke;
13
14pub struct Invocation(std::sync::Mutex<Option<TcpStream>>);
19
20#[derive(Clone, Debug)]
22pub struct Client<T>(T);
23
24impl<T> From<T> for Client<T>
25where
26 T: ToSocketAddrs + Clone,
27{
28 fn from(addr: T) -> Self {
29 Self(addr)
30 }
31}
32
33impl From<TcpStream> for Invocation {
34 fn from(stream: TcpStream) -> Self {
35 Self(std::sync::Mutex::new(Some(stream)))
36 }
37}
38
39impl<T> Invoke for Client<T>
40where
41 T: ToSocketAddrs + Clone + Send + Sync,
42{
43 type Context = ();
44 type Outgoing = Outgoing;
45 type Incoming = Incoming;
46
47 #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
48 async fn invoke<P>(
49 &self,
50 (): Self::Context,
51 instance: &str,
52 func: &str,
53 params: Bytes,
54 paths: impl AsRef<[P]> + Send,
55 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
56 where
57 P: AsRef<[Option<usize>]> + Send + Sync,
58 {
59 let stream = TcpStream::connect(self.0.clone()).await?;
60 let (rx, tx) = stream.into_split();
61 invoke(tx, rx, instance, func, params, paths).await
62 }
63}
64
65impl Invoke for Invocation {
66 type Context = ();
67 type Outgoing = Outgoing;
68 type Incoming = Incoming;
69
70 #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
71 async fn invoke<P>(
72 &self,
73 (): Self::Context,
74 instance: &str,
75 func: &str,
76 params: Bytes,
77 paths: impl AsRef<[P]> + Send,
78 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
79 where
80 P: AsRef<[Option<usize>]> + Send + Sync,
81 {
82 let stream = match self.0.lock() {
83 Ok(mut stream) => stream
84 .take()
85 .context("stream was already used for an invocation")?,
86 Err(_) => bail!("stream lock poisoned"),
87 };
88 let (rx, tx) = stream.into_split();
89 invoke(tx, rx, instance, func, params, paths).await
90 }
91}
92
93impl Accept for TcpListener {
94 type Context = SocketAddr;
95 type Outgoing = OwnedWriteHalf;
96 type Incoming = OwnedReadHalf;
97
98 async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
99 (&self).accept().await
100 }
101}
102
103impl Accept for &TcpListener {
104 type Context = SocketAddr;
105 type Outgoing = OwnedWriteHalf;
106 type Incoming = OwnedReadHalf;
107
108 #[instrument(level = "trace")]
109 async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
110 let (stream, addr) = TcpListener::accept(self).await?;
111 let (rx, tx) = stream.into_split();
112 Ok((addr, tx, rx))
113 }
114}