wrpc_transport/frame/tcp/
tokio.rs

1//! wRPC TCP transport using [tokio]
2
3use core::net::SocketAddr;
4
5use anyhow::{bail, Context as _};
6use bytes::Bytes;
7use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
8use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
9use tracing::instrument;
10
11use crate::frame::{invoke, Accept, Incoming, Outgoing};
12use crate::Invoke;
13
14/// [Invoke] implementation in terms of a single [`TcpStream`]
15///
16/// [`Invoke::invoke`] can only be called once on [Invocation],
17/// repeated calls with return an error
18pub struct Invocation(std::sync::Mutex<Option<TcpStream>>);
19
20/// [Invoke] implementation of a TCP transport using [tokio]
21#[derive(Clone, Debug)]
22pub struct Client<T>(T);
23
24impl<T> From<T> for Client<T>
25where
26    T: ToSocketAddrs + Clone,
27{
28    fn from(addr: T) -> Self {
29        Self(addr)
30    }
31}
32
33impl From<TcpStream> for Invocation {
34    fn from(stream: TcpStream) -> Self {
35        Self(std::sync::Mutex::new(Some(stream)))
36    }
37}
38
39impl<T> Invoke for Client<T>
40where
41    T: ToSocketAddrs + Clone + Send + Sync,
42{
43    type Context = ();
44    type Outgoing = Outgoing;
45    type Incoming = Incoming;
46
47    #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
48    async fn invoke<P>(
49        &self,
50        (): Self::Context,
51        instance: &str,
52        func: &str,
53        params: Bytes,
54        paths: impl AsRef<[P]> + Send,
55    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
56    where
57        P: AsRef<[Option<usize>]> + Send + Sync,
58    {
59        let stream = TcpStream::connect(self.0.clone()).await?;
60        let (rx, tx) = stream.into_split();
61        invoke(tx, rx, instance, func, params, paths).await
62    }
63}
64
65impl Invoke for Invocation {
66    type Context = ();
67    type Outgoing = Outgoing;
68    type Incoming = Incoming;
69
70    #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
71    async fn invoke<P>(
72        &self,
73        (): Self::Context,
74        instance: &str,
75        func: &str,
76        params: Bytes,
77        paths: impl AsRef<[P]> + Send,
78    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
79    where
80        P: AsRef<[Option<usize>]> + Send + Sync,
81    {
82        let stream = match self.0.lock() {
83            Ok(mut stream) => stream
84                .take()
85                .context("stream was already used for an invocation")?,
86            Err(_) => bail!("stream lock poisoned"),
87        };
88        let (rx, tx) = stream.into_split();
89        invoke(tx, rx, instance, func, params, paths).await
90    }
91}
92
93impl Accept for TcpListener {
94    type Context = SocketAddr;
95    type Outgoing = OwnedWriteHalf;
96    type Incoming = OwnedReadHalf;
97
98    async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
99        (&self).accept().await
100    }
101}
102
103impl Accept for &TcpListener {
104    type Context = SocketAddr;
105    type Outgoing = OwnedWriteHalf;
106    type Incoming = OwnedReadHalf;
107
108    #[instrument(level = "trace")]
109    async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
110        let (stream, addr) = TcpListener::accept(self).await?;
111        let (rx, tx) = stream.into_split();
112        Ok((addr, tx, rx))
113    }
114}