wrpc_runtime_wasmtime/
polyfill.rs

1use core::iter::zip;
2use core::pin::pin;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use anyhow::{bail, ensure, Context as _};
8use bytes::BytesMut;
9use futures::future::try_join_all;
10use tokio::io::AsyncWriteExt as _;
11use tokio::time::Instant;
12use tokio::try_join;
13use tokio_util::codec::Encoder;
14use tracing::{debug, instrument, trace, warn, Instrument as _, Span};
15use wasmtime::component::{types, LinkerInstance, ResourceType, Type, Val};
16use wasmtime::{AsContextMut, Engine, StoreContextMut};
17use wrpc_transport::{Index as _, Invoke, InvokeExt as _};
18
19use crate::rpc::Error;
20use crate::{read_value, rpc_func_name, rpc_result_type, ValEncoder, WrpcView, WrpcViewExt as _};
21
22/// Polyfill [`types::ComponentItem`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
23#[instrument(level = "trace", skip_all)]
24pub fn link_item<V>(
25    engine: &Engine,
26    linker: &mut LinkerInstance<V>,
27    guest_resources: impl Into<Arc<[ResourceType]>>,
28    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
29    ty: types::ComponentItem,
30    instance: impl Into<Arc<str>>,
31    name: impl Into<Arc<str>>,
32) -> wasmtime::Result<()>
33where
34    V: WrpcView,
35{
36    let instance = instance.into();
37    let guest_resources = guest_resources.into();
38    let host_resources = host_resources.into();
39    match ty {
40        types::ComponentItem::ComponentFunc(ty) => {
41            let name = name.into();
42            debug!(?instance, ?name, "linking function");
43            link_function(
44                linker,
45                Arc::clone(&guest_resources),
46                Arc::clone(&host_resources),
47                ty,
48                instance,
49                name,
50            )?;
51        }
52        types::ComponentItem::CoreFunc(_) => {
53            bail!("polyfilling core functions not supported yet")
54        }
55        types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"),
56        types::ComponentItem::Component(ty) => {
57            for (name, ty) in ty.imports(engine) {
58                debug!(?instance, name, "linking component item");
59                link_item(
60                    engine,
61                    linker,
62                    Arc::clone(&guest_resources),
63                    Arc::clone(&host_resources),
64                    ty,
65                    "",
66                    name,
67                )?;
68            }
69        }
70        types::ComponentItem::ComponentInstance(ty) => {
71            let name = name.into();
72            let mut linker = linker
73                .instance(&name)
74                .with_context(|| format!("failed to instantiate `{name}` in the linker"))?;
75            debug!(?instance, ?name, "linking instance");
76            link_instance(
77                engine,
78                &mut linker,
79                guest_resources,
80                host_resources,
81                ty,
82                name,
83            )?;
84        }
85        types::ComponentItem::Type(_) => {}
86        types::ComponentItem::Resource(ty) => {
87            let name = name.into();
88            let Some((guest_ty, host_ty)) = host_resources
89                .get(&*instance)
90                .and_then(|instance| instance.get(&*name))
91            else {
92                bail!("resource type for {instance}/{name} not defined");
93            };
94            ensure!(ty == *guest_ty, "{instance}/{name} resource type mismatch");
95
96            debug!(?instance, ?name, "linking resource");
97            linker.resource(&name, *host_ty, |_, _| Ok(()))?;
98        }
99    }
100    Ok(())
101}
102
103/// Polyfill [`types::ComponentInstance`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
104#[instrument(level = "trace", skip_all)]
105pub fn link_instance<V>(
106    engine: &Engine,
107    linker: &mut LinkerInstance<V>,
108    guest_resources: impl Into<Arc<[ResourceType]>>,
109    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
110    ty: types::ComponentInstance,
111    name: impl Into<Arc<str>>,
112) -> wasmtime::Result<()>
113where
114    V: WrpcView,
115{
116    let instance = name.into();
117    let guest_resources = guest_resources.into();
118    let host_resources = host_resources.into();
119    for (name, ty) in ty.exports(engine) {
120        debug!(name, "linking instance item");
121        link_item(
122            engine,
123            linker,
124            Arc::clone(&guest_resources),
125            Arc::clone(&host_resources),
126            ty,
127            Arc::clone(&instance),
128            name,
129        )?;
130    }
131    Ok(())
132}
133
134#[allow(clippy::too_many_arguments)]
135async fn invoke<T: WrpcView>(
136    mut store: &mut StoreContextMut<'_, T>,
137    params: &[Val],
138    results: &mut [Val],
139    guest_resources: Arc<[ResourceType]>,
140    params_ty: impl IntoIterator<Item = (&str, Type)>,
141    results_ty: impl IntoIterator<Item = Type>,
142    instance: Arc<str>,
143    name: Arc<str>,
144) -> wasmtime::Result<anyhow::Result<()>> {
145    let mut buf = BytesMut::default();
146    let mut deferred = vec![];
147    for (v, (name, ref ty)) in zip(params, params_ty) {
148        let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources);
149        enc.encode(v, &mut buf)
150            .with_context(|| format!("failed to encode parameter `{name}`"))?;
151        deferred.push(enc.deferred);
152    }
153    let view = store.data_mut().wrpc();
154    let clt = view.ctx.client();
155    let cx = view.ctx.context();
156    let timeout = view.ctx.timeout();
157    let buf = buf.freeze();
158    // TODO: set paths
159    let paths = &[[]; 0];
160    let rpc_name = rpc_func_name(&name);
161    let start = Instant::now();
162    let invocation = if let Some(timeout) = timeout {
163        clt.timeout(timeout)
164            .invoke(cx, &instance, rpc_name, buf, paths)
165            .await
166    } else {
167        clt.invoke(cx, &instance, rpc_name, buf, paths).await
168    }
169    .with_context(|| format!("failed to invoke `{instance}.{name}` polyfill via wRPC"));
170    let (outgoing, incoming) = match invocation {
171        Ok((outgoing, incoming)) => (outgoing, incoming),
172        Err(err) => return Ok(Err(err)),
173    };
174    let tx = async {
175        try_join_all(
176            zip(0.., deferred)
177                .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f)))
178                .map(|(w, f)| async move {
179                    let w = w?;
180                    f(w).await
181                }),
182        )
183        .await
184        .context("failed to write asynchronous parameters")?;
185        let mut outgoing = pin!(outgoing);
186        outgoing
187            .flush()
188            .await
189            .context("failed to flush outgoing stream")?;
190        if let Err(err) = outgoing.shutdown().await {
191            trace!(?err, "failed to shutdown outgoing stream");
192        }
193        anyhow::Ok(())
194    };
195    let rx = async {
196        let mut incoming = pin!(incoming);
197        for (i, (v, ref ty)) in zip(results, results_ty).enumerate() {
198            read_value(&mut store, &mut incoming, &guest_resources, v, ty, &[i])
199                .await
200                .with_context(|| format!("failed to decode return value {i}"))?;
201        }
202        Ok(())
203    };
204    let res = if let Some(timeout) = timeout {
205        let timeout = timeout.saturating_sub(Instant::now().saturating_duration_since(start));
206        try_join!(
207            async {
208                tokio::time::timeout(timeout, tx)
209                    .await
210                    .context("data transmission timed out")?
211            },
212            async {
213                tokio::time::timeout(timeout, rx)
214                    .await
215                    .context("data receipt timed out")?
216            },
217        )
218    } else {
219        try_join!(tx, rx)
220    };
221    match res {
222        Ok(((), ())) => Ok(Ok(())),
223        Err(err) => Ok(Err(err)),
224    }
225}
226
227/// Polyfill [`types::ComponentFunc`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
228#[instrument(level = "trace", skip_all)]
229pub fn link_function<V>(
230    linker: &mut LinkerInstance<V>,
231    guest_resources: impl Into<Arc<[ResourceType]>>,
232    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
233    ty: types::ComponentFunc,
234    instance: impl Into<Arc<str>>,
235    name: impl Into<Arc<str>>,
236) -> wasmtime::Result<()>
237where
238    V: WrpcView,
239{
240    let span = Span::current();
241    let instance = instance.into();
242    let name = name.into();
243    let guest_resources = guest_resources.into();
244    let host_resources = host_resources.into();
245    match rpc_result_type(&host_resources, ty.results()) {
246        None => linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
247            let instance = Arc::clone(&instance);
248            let name = Arc::clone(&name);
249            let resources = Arc::clone(&guest_resources);
250            Box::new(
251                async move {
252                    match invoke(
253                        &mut store,
254                        params,
255                        results,
256                        resources,
257                        ty.params(),
258                        ty.results(),
259                        instance,
260                        name,
261                    )
262                    .await
263                    {
264                        Ok(Ok(())) => Ok(()),
265                        Ok(Err(err)) => Err(err),
266                        Err(err) => Err(err),
267                    }
268                }
269                .instrument(span.clone()),
270            )
271        }),
272        // `result<_, rpc-eror>`
273        Some(None) => {
274            linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
275                let instance = Arc::clone(&instance);
276                let name = Arc::clone(&name);
277                let resources = Arc::clone(&guest_resources);
278                Box::new(
279                    async move {
280                        let [result] = results else {
281                            bail!("result type mismatch");
282                        };
283                        match invoke(
284                            &mut store,
285                            params,
286                            &mut [],
287                            resources,
288                            ty.params(),
289                            None,
290                            instance,
291                            name,
292                        )
293                        .await?
294                        {
295                            Ok(()) => {
296                                *result = Val::Result(Ok(None));
297                            }
298                            Err(err) => {
299                                let err = store.data_mut().push_error(Error::Invoke(err))?;
300                                let err = err
301                                    .try_into_resource_any(&mut store)
302                                    .context("failed to lower error resource")?;
303                                *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
304                            }
305                        }
306                        Ok(())
307                    }
308                    .instrument(span.clone()),
309                )
310            })
311        }
312        // `result<T, rpc-eror>`
313        Some(Some(result_ty)) => {
314            linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
315                let instance = Arc::clone(&instance);
316                let name = Arc::clone(&name);
317                let resources = Arc::clone(&guest_resources);
318                let result_ty = result_ty.clone();
319                Box::new(
320                    async move {
321                        let [result] = results else {
322                            bail!("result type mismatch");
323                        };
324                        let mut ok = [Val::Bool(false); 1];
325                        match invoke(
326                            &mut store,
327                            params,
328                            ok.as_mut_slice(),
329                            resources,
330                            ty.params(),
331                            [result_ty],
332                            instance,
333                            name,
334                        )
335                        .await?
336                        {
337                            Ok(()) => {
338                                let [ok] = ok;
339                                *result = Val::Result(Ok(Some(Box::new(ok))));
340                            }
341                            Err(err) => {
342                                let err = store.data_mut().push_error(Error::Invoke(err))?;
343                                let err = err
344                                    .try_into_resource_any(&mut store)
345                                    .context("failed to lower error resource")?;
346                                *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
347                            }
348                        }
349                        Ok(())
350                    }
351                    .instrument(span.clone()),
352                )
353            })
354        }
355    }
356}