alexcrichton opened issue #11342:
Currently there is liberal use of
Arc<...>in the WASIp3wasi:socketsimplementation which is used for sharing sockets between tasks which are used to implement streams. One example is that a stream of clients can be extracted from a TCP listener, meaning that the underlying socket is now shared between the original TCP listener resource and the stream that is reading clients. The consequence of this implementation is that when the resource is closed it does not mean that the actual socket is itself closed. For example the OS-level port remains reserved until the task for accepted clients also exits. While the tasks will "promptly exit" there is currently no reliable mechanism to await this happening.This leads to tests for REUSEADDR, for example, having a loop around re-binding a port and asserting it's successful. This test is being added in https://github.com/bytecodealliance/wasmtime/pull/11291.
alexcrichton added the wasi:impl label to Issue #11342.
alexcrichton added the wasm-proposal:component-model-async label to Issue #11342.
rvolosatovs commented on issue #11342:
One thing that is currently missing in the implementation is the receive/listen task cancellation on socket drop, i.e.
receiveandlistenshould store theAbortHandlein the socket state and abort these tasks once the socket is dropped.I spent some time on it just now, but I don't really see a nice way to integrate this with the
start_receiveetc. abstractions without makingTcpStatefield public again, @alexcrichton could you handle this?I don't know if that would necessarily fix this issue, but it's something we should do anyway. For reference, here's the documentation of
wasi:httpfor a similar construct: https://github.com/WebAssembly/wasi-http/blob/ad500ba30bdfd57e04bdabdcd0480111681bf017/wit-0.3.0-draft/types.wit#L420-L423
rvolosatovs edited a comment on issue #11342:
One thing that is currently missing in the implementation is the receive/listen task cancellation on socket drop, i.e.
receiveandlistenshould store theAbortHandlein the socket state and abort these tasks once the socket is dropped.I spent some time on it just now, but I don't really see a nice way to integrate this with the
start_receiveetc. abstractions without makingTcpStatefield public again, @alexcrichton could you handle this?I've implemented this for filesystem though: https://github.com/bytecodealliance/wasmtime/pull/11406/commits/40415b52f79a33c376a2a875ffecb1d0a01c7903
I don't know if that would necessarily fix this issue, but it's something we should do anyway. For reference, here's the documentation of
wasi:httpfor a similar construct: https://github.com/WebAssembly/wasi-http/blob/ad500ba30bdfd57e04bdabdcd0480111681bf017/wit-0.3.0-draft/types.wit#L420-L423
rvolosatovs edited a comment on issue #11342:
One thing that is currently missing in the implementation is the receive/listen task cancellation on socket drop, i.e.
receiveandlistenshould store theAbortHandlein the socket state and abort these tasks once the socket is dropped.I spent some time on it just now, but I don't really see a nice way to integrate this with the
start_receiveetc. abstractions without makingTcpStatefield public again, @alexcrichton could you handle this?I've implemented this for filesystem though: https://github.com/bytecodealliance/wasmtime/pull/11406/commits/40415b52f79a33c376a2a875ffecb1d0a01c7903
I don't know if that would necessarily fix this issue, but it's something we should do anyway. For reference, here's the documentation of
wasi:httpfor a similar construct: https://github.com/WebAssembly/wasi-http/blob/ad500ba30bdfd57e04bdabdcd0480111681bf017/wit-0.3.0-draft/types.wit#L420-L423For reference, here's the in-progress diff, if it helps:
diff --git a/crates/wasi/src/p3/sockets/host/types/tcp.rs b/crates/wasi/src/p3/sockets/host/types/tcp.rs index 66314d5372..a2bf1ff0c1 100644 --- a/crates/wasi/src/p3/sockets/host/types/tcp.rs +++ b/crates/wasi/src/p3/sockets/host/types/tcp.rs @@ -266,11 +266,12 @@ impl HostTcpSocketWithStore for WasiSockets { let (result_tx, result_rx) = instance .future(&mut view, || unreachable!()) .context("failed to create future")?; - view.spawn(ReceiveTask { + let task = view.spawn(ReceiveTask { stream, data_tx, result_tx, }); + socket.set_p3_receive_task(task); Ok((data_rx, result_rx)) } None => { diff --git a/crates/wasi/src/sockets/tcp.rs b/crates/wasi/src/sockets/tcp.rs index 09dd2ba8de..ff907b356e 100644 --- a/crates/wasi/src/sockets/tcp.rs +++ b/crates/wasi/src/sockets/tcp.rs @@ -94,7 +94,10 @@ enum TcpState { /// /// A socket will not transition out of this state. #[cfg(feature = "p3")] - Receiving(Arc<tokio::net::TcpStream>), + Receiving( + Arc<tokio::net::TcpStream>, + Option<wasmtime::component::AbortHandle>, + ), /// This is a WASIp2-bound socket which stores some extra state for /// read/write streams to handle TCP shutdown. @@ -239,7 +242,7 @@ impl TcpSocket { | TcpState::ListenStarted(socket) => Ok(socket.as_socketlike_view()), TcpState::Connected(stream) => Ok(stream.as_socketlike_view()), #[cfg(feature = "p3")] - TcpState::Receiving(stream) => Ok(stream.as_socketlike_view()), + TcpState::Receiving(stream, _) => Ok(stream.as_socketlike_view()), TcpState::Listening { listener, .. } => Ok(listener.as_socketlike_view()), TcpState::P2Streaming(state) => Ok(state.stream.as_socketlike_view()), TcpState::Connecting(..) | TcpState::ConnectReady(_) | TcpState::Closed => { @@ -459,7 +462,7 @@ impl TcpSocket { pub(crate) fn start_receive(&mut self) -> Option<&Arc<tokio::net::TcpStream>> { match mem::replace(&mut self.tcp_state, TcpState::Closed) { TcpState::Connected(stream) => { - self.tcp_state = TcpState::Receiving(stream); + self.tcp_state = TcpState::Receiving(stream, None); Some(self.tcp_stream_arc().unwrap()) } prev => { @@ -469,12 +472,26 @@ impl TcpSocket { } } + #[cfg(feature = "p3")] + pub(crate) fn set_p3_receive_task( + &mut self, + task: wasmtime::component::AbortHandle, + ) -> Result<(), ErrorCode> { + match &mut self.tcp_state { + TcpState::Receiving(_, slot @ None) => { + *slot = Some(task); + Ok(()) + } + _ => Err(ErrorCode::InvalidState), + } + } + pub(crate) fn local_address(&self) -> Result<SocketAddr, ErrorCode> { match &self.tcp_state { TcpState::Bound(socket) => Ok(socket.local_addr()?), TcpState::Connected(stream) => Ok(stream.local_addr()?), #[cfg(feature = "p3")] - TcpState::Receiving(stream) => Ok(stream.local_addr()?), + TcpState::Receiving(stream, _) => Ok(stream.local_addr()?), TcpState::P2Streaming(state) => Ok(state.stream.local_addr()?), TcpState::Listening { listener, .. } => Ok(listener.local_addr()?), #[cfg(feature = "p3")] @@ -645,7 +662,7 @@ impl TcpSocket { match &self.tcp_state { TcpState::Connected(socket) => Ok(socket), #[cfg(feature = "p3")] - TcpState::Receiving(socket) => Ok(socket), + TcpState::Receiving(socket, _) => Ok(socket), TcpState::P2Streaming(state) => Ok(&state.stream), #[cfg(feature = "p3")] TcpState::Error(err) => Err(err.into()), @@ -695,7 +712,7 @@ impl TcpSocket { | TcpState::P2Streaming(_) => {} #[cfg(feature = "p3")] - TcpState::Receiving(_) | TcpState::Error(_) => {} + TcpState::Receiving(_, _) | TcpState::Error(_) => {} TcpState::Connecting(Some(future)) => { self.tcp_state = TcpState::ConnectReady(future.as_mut().await);
alexcrichton commented on issue #11342:
Related issue after our discussion today as well: https://github.com/WebAssembly/component-model/issues/552
alexcrichton assigned dicej to issue #11342.
alexcrichton closed issue #11342:
Currently there is liberal use of
Arc<...>in the WASIp3wasi:socketsimplementation which is used for sharing sockets between tasks which are used to implement streams. One example is that a stream of clients can be extracted from a TCP listener, meaning that the underlying socket is now shared between the original TCP listener resource and the stream that is reading clients. The consequence of this implementation is that when the resource is closed it does not mean that the actual socket is itself closed. For example the OS-level port remains reserved until the task for accepted clients also exits. While the tasks will "promptly exit" there is currently no reliable mechanism to await this happening.This leads to tests for REUSEADDR, for example, having a loop around re-binding a port and asserting it's successful. This test is being added in https://github.com/bytecodealliance/wasmtime/pull/11291.
alexcrichton commented on issue #11342:
Fixed in https://github.com/bytecodealliance/wasmtime/pull/11515
Last updated: Dec 06 2025 at 07:03 UTC