1use std::path::{Path, PathBuf};
4
5use bytes::Bytes;
6use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf, SocketAddr};
7use tokio::net::{UnixListener, UnixStream};
8use tracing::instrument;
9
10use crate::frame::{invoke, Accept, Incoming, Outgoing};
11use crate::Invoke;
12
13pub type Oneshot = super::Oneshot<OwnedReadHalf, OwnedWriteHalf>;
18
19impl Oneshot {
20 pub fn unix_pair() -> std::io::Result<(Oneshot, Oneshot)> {
22 let (a, b) = UnixStream::pair()?;
23 Ok((a.into(), b.into()))
24 }
25}
26
27impl From<UnixStream> for Oneshot {
28 fn from(stream: UnixStream) -> Self {
29 stream.into_split().into()
30 }
31}
32
33#[derive(Clone, Debug)]
35pub struct Client<T>(T);
36
37impl From<PathBuf> for Client<PathBuf> {
38 fn from(path: PathBuf) -> Self {
39 Self(path)
40 }
41}
42
43impl<'a> From<&'a Path> for Client<&'a Path> {
44 fn from(path: &'a Path) -> Self {
45 Self(path)
46 }
47}
48
49impl<'a> From<&'a std::os::unix::net::SocketAddr> for Client<&'a std::os::unix::net::SocketAddr> {
50 fn from(addr: &'a std::os::unix::net::SocketAddr) -> Self {
51 Self(addr)
52 }
53}
54
55impl From<std::os::unix::net::SocketAddr> for Client<std::os::unix::net::SocketAddr> {
56 fn from(addr: std::os::unix::net::SocketAddr) -> Self {
57 Self(addr)
58 }
59}
60
61impl Invoke for Client<PathBuf> {
62 type Context = ();
63 type Outgoing = Outgoing;
64 type Incoming = Incoming;
65
66 #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
67 async fn invoke<P>(
68 &self,
69 (): Self::Context,
70 instance: &str,
71 func: &str,
72 params: Bytes,
73 paths: impl AsRef<[P]> + Send,
74 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
75 where
76 P: AsRef<[Option<usize>]> + Send + Sync,
77 {
78 let stream = UnixStream::connect(&self.0).await?;
79 let (rx, tx) = stream.into_split();
80 invoke(tx, rx, instance, func, params, paths).await
81 }
82}
83
84impl Invoke for Client<&Path> {
85 type Context = ();
86 type Outgoing = Outgoing;
87 type Incoming = Incoming;
88
89 #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
90 async fn invoke<P>(
91 &self,
92 (): Self::Context,
93 instance: &str,
94 func: &str,
95 params: Bytes,
96 paths: impl AsRef<[P]> + Send,
97 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
98 where
99 P: AsRef<[Option<usize>]> + Send + Sync,
100 {
101 let stream = UnixStream::connect(self.0).await?;
102 let (rx, tx) = stream.into_split();
103 invoke(tx, rx, instance, func, params, paths).await
104 }
105}
106
107impl Invoke for Client<&std::os::unix::net::SocketAddr> {
108 type Context = ();
109 type Outgoing = Outgoing;
110 type Incoming = Incoming;
111
112 #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
113 async fn invoke<P>(
114 &self,
115 (): Self::Context,
116 instance: &str,
117 func: &str,
118 params: Bytes,
119 paths: impl AsRef<[P]> + Send,
120 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
121 where
122 P: AsRef<[Option<usize>]> + Send + Sync,
123 {
124 let stream = std::os::unix::net::UnixStream::connect_addr(self.0)?;
125 let stream = UnixStream::from_std(stream)?;
126 let (rx, tx) = stream.into_split();
127 invoke(tx, rx, instance, func, params, paths).await
128 }
129}
130
131impl Invoke for Client<std::os::unix::net::SocketAddr> {
132 type Context = ();
133 type Outgoing = Outgoing;
134 type Incoming = Incoming;
135
136 #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
137 async fn invoke<P>(
138 &self,
139 (): Self::Context,
140 instance: &str,
141 func: &str,
142 params: Bytes,
143 paths: impl AsRef<[P]> + Send,
144 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
145 where
146 P: AsRef<[Option<usize>]> + Send + Sync,
147 {
148 let stream = std::os::unix::net::UnixStream::connect_addr(&self.0)?;
149 let stream = UnixStream::from_std(stream)?;
150 let (rx, tx) = stream.into_split();
151 invoke(tx, rx, instance, func, params, paths).await
152 }
153}
154
155impl Accept for UnixListener {
156 type Context = SocketAddr;
157 type Outgoing = OwnedWriteHalf;
158 type Incoming = OwnedReadHalf;
159
160 async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
161 (&self).accept().await
162 }
163}
164
165impl Accept for &UnixListener {
166 type Context = SocketAddr;
167 type Outgoing = OwnedWriteHalf;
168 type Incoming = OwnedReadHalf;
169
170 #[instrument(level = "trace")]
171 async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
172 let (stream, addr) = UnixListener::accept(self).await?;
173 let (rx, tx) = stream.into_split();
174 Ok((addr, tx, rx))
175 }
176}