wrpc_transport/frame/
unix.rs

1//! Unix domain socket transport
2
3use std::path::{Path, PathBuf};
4
5use bytes::Bytes;
6use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf, SocketAddr};
7use tokio::net::{UnixListener, UnixStream};
8use tracing::instrument;
9
10use crate::frame::{invoke, Accept, Incoming, Outgoing};
11use crate::Invoke;
12
13/// [Invoke] and [Accept] implementation in terms of a single [`UnixStream`].
14///
15/// Either [`Invoke::invoke`] or [`Accept::accept`] can only be called at most once
16/// on [Oneshot], repeated calls with return an error
17pub type Oneshot = super::Oneshot<OwnedReadHalf, OwnedWriteHalf>;
18
19impl Oneshot {
20    /// Creates a pair of connected [Oneshot] using [UnixStream::pair].
21    pub fn unix_pair() -> std::io::Result<(Oneshot, Oneshot)> {
22        let (a, b) = UnixStream::pair()?;
23        Ok((a.into(), b.into()))
24    }
25}
26
27impl From<UnixStream> for Oneshot {
28    fn from(stream: UnixStream) -> Self {
29        stream.into_split().into()
30    }
31}
32
33/// [Invoke] implementation of a Unix domain socket transport
34#[derive(Clone, Debug)]
35pub struct Client<T>(T);
36
37impl From<PathBuf> for Client<PathBuf> {
38    fn from(path: PathBuf) -> Self {
39        Self(path)
40    }
41}
42
43impl<'a> From<&'a Path> for Client<&'a Path> {
44    fn from(path: &'a Path) -> Self {
45        Self(path)
46    }
47}
48
49impl<'a> From<&'a std::os::unix::net::SocketAddr> for Client<&'a std::os::unix::net::SocketAddr> {
50    fn from(addr: &'a std::os::unix::net::SocketAddr) -> Self {
51        Self(addr)
52    }
53}
54
55impl From<std::os::unix::net::SocketAddr> for Client<std::os::unix::net::SocketAddr> {
56    fn from(addr: std::os::unix::net::SocketAddr) -> Self {
57        Self(addr)
58    }
59}
60
61impl Invoke for Client<PathBuf> {
62    type Context = ();
63    type Outgoing = Outgoing;
64    type Incoming = Incoming;
65
66    #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
67    async fn invoke<P>(
68        &self,
69        (): Self::Context,
70        instance: &str,
71        func: &str,
72        params: Bytes,
73        paths: impl AsRef<[P]> + Send,
74    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
75    where
76        P: AsRef<[Option<usize>]> + Send + Sync,
77    {
78        let stream = UnixStream::connect(&self.0).await?;
79        let (rx, tx) = stream.into_split();
80        invoke(tx, rx, instance, func, params, paths).await
81    }
82}
83
84impl Invoke for Client<&Path> {
85    type Context = ();
86    type Outgoing = Outgoing;
87    type Incoming = Incoming;
88
89    #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
90    async fn invoke<P>(
91        &self,
92        (): Self::Context,
93        instance: &str,
94        func: &str,
95        params: Bytes,
96        paths: impl AsRef<[P]> + Send,
97    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
98    where
99        P: AsRef<[Option<usize>]> + Send + Sync,
100    {
101        let stream = UnixStream::connect(self.0).await?;
102        let (rx, tx) = stream.into_split();
103        invoke(tx, rx, instance, func, params, paths).await
104    }
105}
106
107impl Invoke for Client<&std::os::unix::net::SocketAddr> {
108    type Context = ();
109    type Outgoing = Outgoing;
110    type Incoming = Incoming;
111
112    #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
113    async fn invoke<P>(
114        &self,
115        (): Self::Context,
116        instance: &str,
117        func: &str,
118        params: Bytes,
119        paths: impl AsRef<[P]> + Send,
120    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
121    where
122        P: AsRef<[Option<usize>]> + Send + Sync,
123    {
124        let stream = std::os::unix::net::UnixStream::connect_addr(self.0)?;
125        let stream = UnixStream::from_std(stream)?;
126        let (rx, tx) = stream.into_split();
127        invoke(tx, rx, instance, func, params, paths).await
128    }
129}
130
131impl Invoke for Client<std::os::unix::net::SocketAddr> {
132    type Context = ();
133    type Outgoing = Outgoing;
134    type Incoming = Incoming;
135
136    #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
137    async fn invoke<P>(
138        &self,
139        (): Self::Context,
140        instance: &str,
141        func: &str,
142        params: Bytes,
143        paths: impl AsRef<[P]> + Send,
144    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
145    where
146        P: AsRef<[Option<usize>]> + Send + Sync,
147    {
148        let stream = std::os::unix::net::UnixStream::connect_addr(&self.0)?;
149        let stream = UnixStream::from_std(stream)?;
150        let (rx, tx) = stream.into_split();
151        invoke(tx, rx, instance, func, params, paths).await
152    }
153}
154
155impl Accept for UnixListener {
156    type Context = SocketAddr;
157    type Outgoing = OwnedWriteHalf;
158    type Incoming = OwnedReadHalf;
159
160    async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
161        (&self).accept().await
162    }
163}
164
165impl Accept for &UnixListener {
166    type Context = SocketAddr;
167    type Outgoing = OwnedWriteHalf;
168    type Incoming = OwnedReadHalf;
169
170    #[instrument(level = "trace")]
171    async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
172        let (stream, addr) = UnixListener::accept(self).await?;
173        let (rx, tx) = stream.into_split();
174        Ok((addr, tx, rx))
175    }
176}