gfx opened issue #13514:
When a host registers a stream consumer via
StreamReader::pipe(or hands the
guest a host-driven future/stream viaFutureReader::new/ a host-written
stream) and the guest then drops its end, theTransmitStateand both
TransmitHandles are never reclaimed from the instance's concurrent-state
table. The host-side end is left inHostReadyand is never finalized, so the
slots leak for the lifetime of the instance. A guest that performs many such
operations in a loop fills the table and eventually traps withresource table has no free keys.This is in the core component runtime
(crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs),
independent ofwasmtime-wasi.Test Case
The leak is in host-side transmit bookkeeping, so there is no single standalone
.wasmthat triggers it: reproducing it requires a host driving the public
StreamReader::pipe/FutureReader::newAPIs and a guest that drops its end
while the host end is stillHostReady. The minimal reproduction is therefore a
pair of in-treecrates/misc/component-async-testscases (full source in *Steps
to Reproduce*), using only public host APIs and nowasi:*data flow.Steps to Reproduce
Case 1 — host consumer (
host_drop_writer/ReadState::HostReady)The guest hands the host the readable end of a fresh stream, **keeps the
writable end**, writes one byte once the host attaches a consumer, then drops
the writer. The writer-drop reacheshost_drop_writerwith the read side still
HostReady.
Add an interface + world to
crates/misc/component-async-tests/wit/test.wit:```wit
interface host-consumer-drop {
// Returns the readable end of a fresh stream while the guest keeps the
// writable end; writes one byte once a consumer attaches, then drops it.
get: async func() -> stream<u8>;
}world host-consumer-drop-guest {
export host-consumer-drop;
}
```Add a guest test program at
crates/test-programs/src/bin/async_host_consumer_drop.rs:```rust
mod bindings {
wit_bindgen::generate!({
path: "../misc/component-async-tests/wit",
world: "host-consumer-drop-guest",
async: true,
});use super::Component; export!(Component);}
use {bindings::exports::local::local::host_consumer_drop::Guest, wit_bindgen::StreamReader};
struct Component;
impl Guest for Component {
async fn get() -> StreamReader<u8> {
let (mut tx, rx) = bindings::wit_stream::new();
// The host attaches a consumer (read side ->HostReady); the write
// below blocks until that consumer reads, after which we drop the
// writer. Dropping it while the consumer is stillHostReadyleaks.
wit_bindgen::spawn(async move {
assert!(tx.write_one(42).await.is_none());
drop(tx);
});
rx
}
}fn main() {}
```Add a host test (e.g. in
tests/scenario/streams.rs):```rust
mod host_consumer_drop {
wasmtime::component::bindgen!({
path: "wit",
world: "host-consumer-drop-guest",
exports: { default: store | async },
});
}#[tokio::test]
pub async fn async_host_consumer_drop() -> Result<()> {
let engine = Engine::new(&config())?;
let component = make_component(
&engine,
&[test_programs_artifacts::ASYNC_HOST_CONSUMER_DROP_COMPONENT],
)
.await?;let mut linker = Linker::new(&engine); wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; let mut store = Store::new( &engine, Ctx { wasi: WasiCtxBuilder::new().inherit_stdio().build(), table: ResourceTable::default(), continue_: false, }, ); let instance = linker.instantiate_async(&mut store, &component).await?; let guest = host_consumer_drop::HostConsumerDropGuest::new(&mut store, &instance)?; store .run_concurrent(async move |accessor| { let stream = guest .local_local_host_consumer_drop() .call_get(accessor) .await?; let (tx, mut rx) = mpsc::channel(1); accessor.with(move |store| stream.pipe(store, PipeConsumer::new(tx)))?; assert_eq!(rx.next().await, Some(42)); assert!(rx.next().await.is_none()); wasmtime::error::Ok(()) }) .await??; store.assert_concurrent_state_empty(); Ok(())}
```Case 2 — host producer (
host_drop_reader/WriteState::HostReady)The host hands the guest two host-produced futures via
FutureReader::new; the
guest reads one and drops the other. This reuses the existingclosed-streams
guest (local::local::closed::read-future, whose_rx_ignoredargument is
dropped), so it is a drop-in host test with no new guest:#[tokio::test] pub async fn async_host_producer_drop() -> Result<()> { let engine = Engine::new(&config())?; let component = make_component( &engine, &[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT], ) .await?; let mut linker = Linker::new(&engine); wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; let mut store = Store::new( &engine, Ctx { wasi: WasiCtxBuilder::new().inherit_stdio().build(), table: ResourceTable::default(), continue_: false, }, ); let instance = linker.instantiate_async(&mut store, &component).await?; let value = 42_u8; let (tx, rx) = oneshot::channel(); let rx = FutureReader::new(&mut store, OneshotProducer::new(rx))?; let (_, rx_ignored) = oneshot::channel(); let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored))?; let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?; store .run_concurrent(async move |accessor| { _ = tx.send(value); closed_streams .local_local_closed() .call_read_future(accessor, rx, value, rx_ignored) .await }) .await??; store.assert_concurrent_state_empty(); Ok(()) }
- Run:
cargo test -p component-async-tests --test test_all async_host_Expected Results
Both tests pass: when the guest drops its end of a stream/future, the host
consumer/producer is finalized and the transmit (state + both handles) is
reclaimed, soassert_concurrent_state_empty()succeeds after a clean run.Actual Results
Both tests fail with leftover entries in the concurrent-state table:
non-empty table: [3, 4, 5] // Case 1: 1 stream transmit (state + 2 handles) non-empty table: [0, 1, 2, 3, 4, 5] // Case 2: 2 future transmits(In Case 2 both futures leak — reading a host-produced future and then
dropping the reader still strands the producer, alongside the never-read
rx_ignored.) The transmits are never reclaimed, so a long-running guest that
loops over such operations eventually traps withresource table has no free keys.Root cause, in
crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs— two
match arms are no-ops where they must finalize the stranded host end. Both
functions areStoreOpaquemethods reached fromInstance::guest_drop_readable
/Instance::guest_drop_writable:
host_drop_reader— guest drops the readable end while the host producer
isWriteState::HostReady:
rust WriteState::HostReady { .. } => {}
host_drop_writer— guest drops the writable end while the host consumer
isReadState::HostReady:
rust ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}A
TransmitStateis only removed bydelete_transmit, which is reached only
once the other end is alreadyDropped. With these no-ops, one end stays
HostReadyforever and the transmit is never deleted. (The host producer set up
bynew_transmitis purely reactive — it has no self-cleanup future that
observes the drop — so nothing else reclaims it either.)Versions and Environment
Wasmtime version or commit:
main, 46.0.0-dev (commit9c49989a2e)Operating system: macOS 26.5
Architecture: aarch64 (Apple Silicon)
The leak is deterministic host-side bookkeeping and is not OS/architecture
specific.Extra Info
Proposed fix: finalize the stranded host end on guest drop — when the guest's
end is nowDroppedand the host end isHostReady, set the host end to
Droppedand calldelete_transmit(reclaiming the state + both handles and
dropping the host consumer/producer). A change covering exactly these two arms
takes both reproductions above from leaking to empty, and the full
component-async-testssuite stays green. I'm happy to open a PR with the fix
and these two tests.On test coverage:
round_trip*/post_returncall
assert_concurrent_state_empty, but exercise read-based guest↔guest flows. The
existing.pipe()-based host-consumer scenarios (tests/scenario/streams.rs,
tests/scenario/transmit.rs) do not assert an empty concurrent state, so this
guest-drop path was untested.Related: #12091 proposes a
close()method on the
{Future,Stream}{Producer,Consumer}traits, motivated by the fact that today a
host can only detect a guest-side drop viaDrop. This bug compounds that: on
theHostReadyguest-drop path the host producer/consumer is never finalized at
all, so even the currentDrop-based detection never fires. Finalizing the
stranded host end (the fix above) is also the natural place from which a future
close()hook would be invoked.
gfx added the bug label to Issue #13514.
gfx edited issue #13514:
When a host registers a stream consumer via
StreamReader::pipe(or hands the
guest a host-driven future/stream viaFutureReader::new/ a host-written
stream) and the guest then drops its end, theTransmitStateand both
TransmitHandles are never reclaimed from the instance's concurrent-state
table. The host-side end is left inHostReadyand is never finalized, so the
slots leak for the lifetime of the instance. A guest that performs many such
operations in a loop fills the table and eventually traps withresource table has no free keys.This is in the core component runtime
(crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs),
independent ofwasmtime-wasi.Test Case
The leak is in host-side transmit bookkeeping, so there is no single standalone
.wasmthat triggers it: reproducing it requires a host driving the public
StreamReader::pipe/FutureReader::newAPIs and a guest that drops its end
while the host end is stillHostReady. The minimal reproduction is therefore a
pair of in-treecrates/misc/component-async-testscases (full source in *Steps
to Reproduce*), using only public host APIs and nowasi:*data flow.Steps to Reproduce
Case 1 — host consumer (
host_drop_writer/ReadState::HostReady)The guest hands the host the readable end of a fresh stream, **keeps the
writable end**, writes one byte once the host attaches a consumer, then drops
the writer. The writer-drop reacheshost_drop_writerwith the read side still
HostReady.
Add an interface + world to
crates/misc/component-async-tests/wit/test.wit:```wit
interface host-consumer-drop {
// Returns the readable end of a fresh stream while the guest keeps the
// writable end; writes one byte once a consumer attaches, then drops it.
get: async func() -> stream<u8>;
}world host-consumer-drop-guest {
export host-consumer-drop;
}
```Add a guest test program at
crates/test-programs/src/bin/async_host_consumer_drop.rs:```rust
mod bindings {
wit_bindgen::generate!({
path: "../misc/component-async-tests/wit",
world: "host-consumer-drop-guest",
async: true,
});use super::Component; export!(Component);}
use {bindings::exports::local::local::host_consumer_drop::Guest, wit_bindgen::StreamReader};
struct Component;
impl Guest for Component {
async fn get() -> StreamReader<u8> {
let (mut tx, rx) = bindings::wit_stream::new();
// The host attaches a consumer (read side ->HostReady); the write
// below blocks until that consumer reads, after which we drop the
// writer. Dropping it while the consumer is stillHostReadyleaks.
wit_bindgen::spawn(async move {
assert!(tx.write_one(42).await.is_none());
drop(tx);
});
rx
}
}fn main() {}
```Add a host test (e.g. in
tests/scenario/streams.rs):```rust
mod host_consumer_drop {
wasmtime::component::bindgen!({
path: "wit",
world: "host-consumer-drop-guest",
exports: { default: store | async },
});
}#[tokio::test]
pub async fn async_host_consumer_drop() -> Result<()> {
let engine = Engine::new(&config())?;
let component = make_component(
&engine,
&[test_programs_artifacts::ASYNC_HOST_CONSUMER_DROP_COMPONENT],
)
.await?;let mut linker = Linker::new(&engine); wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; let mut store = Store::new( &engine, Ctx { wasi: WasiCtxBuilder::new().inherit_stdio().build(), table: ResourceTable::default(), continue_: false, }, ); let instance = linker.instantiate_async(&mut store, &component).await?; let guest = host_consumer_drop::HostConsumerDropGuest::new(&mut store, &instance)?; store .run_concurrent(async move |accessor| { let stream = guest .local_local_host_consumer_drop() .call_get(accessor) .await?; let (tx, mut rx) = mpsc::channel(1); accessor.with(move |store| stream.pipe(store, PipeConsumer::new(tx)))?; assert_eq!(rx.next().await, Some(42)); assert!(rx.next().await.is_none()); wasmtime::error::Ok(()) }) .await??; store.assert_concurrent_state_empty(); Ok(())}
```Case 2 — host producer (
host_drop_reader/WriteState::HostReady)The host hands the guest two host-produced futures via
FutureReader::new; the
guest reads one and drops the other. This reuses the existingclosed-streams
guest (local::local::closed::read-future, whose_rx_ignoredargument is
dropped), so it is a drop-in host test with no new guest:#[tokio::test] pub async fn async_host_producer_drop() -> Result<()> { let engine = Engine::new(&config())?; let component = make_component( &engine, &[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT], ) .await?; let mut linker = Linker::new(&engine); wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; let mut store = Store::new( &engine, Ctx { wasi: WasiCtxBuilder::new().inherit_stdio().build(), table: ResourceTable::default(), continue_: false, }, ); let instance = linker.instantiate_async(&mut store, &component).await?; let value = 42_u8; let (tx, rx) = oneshot::channel(); let rx = FutureReader::new(&mut store, OneshotProducer::new(rx))?; let (_, rx_ignored) = oneshot::channel(); let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored))?; let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?; store .run_concurrent(async move |accessor| { _ = tx.send(value); closed_streams .local_local_closed() .call_read_future(accessor, rx, value, rx_ignored) .await }) .await??; store.assert_concurrent_state_empty(); Ok(()) }
- Run:
cargo test -p component-async-tests --test test_all async_host_Expected Results
Both tests pass: when the guest drops its end of a stream/future, the host
consumer/producer is finalized and the transmit (state + both handles) is
reclaimed, soassert_concurrent_state_empty()succeeds after a clean run.Actual Results
Both tests fail with leftover entries in the concurrent-state table:
non-empty table: [3, 4, 5] // Case 1: 1 stream transmit (state + 2 handles) non-empty table: [0, 1, 2, 3, 4, 5] // Case 2: 2 future transmits(In Case 2 both futures leak — reading a host-produced future and then
dropping the reader still strands the producer, alongside the never-read
rx_ignored.) The transmits are never reclaimed, so a long-running guest that
loops over such operations eventually traps withresource table has no free keys.Root cause, in
crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs— two
match arms are no-ops where they must finalize the stranded host end. Both
functions areStoreOpaquemethods reached fromInstance::guest_drop_readable
/Instance::guest_drop_writable:
host_drop_reader— guest drops the readable end while the host producer
isWriteState::HostReady:
rust WriteState::HostReady { .. } => {}
host_drop_writer— guest drops the writable end while the host consumer
isReadState::HostReady:
rust ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}A
TransmitStateis only removed bydelete_transmit, which is reached only
once the other end is alreadyDropped. With these no-ops, one end stays
HostReadyforever and the transmit is never deleted. (The host producer set up
bynew_transmitis purely reactive — it has no self-cleanup future that
observes the drop — so nothing else reclaims it either.)Versions and Environment
Wasmtime version or commit:
main, 46.0.0-dev (commit9c49989a2e)Operating system: macOS 26.5
Architecture: aarch64 (Apple Silicon)
The leak is deterministic host-side bookkeeping and is not OS/architecture
specific.Extra Info
Proposed fix: finalize the stranded host end on guest drop — when the guest's
end is nowDroppedand the host end isHostReady, set the host end to
Droppedand calldelete_transmit(reclaiming the state + both handles and
dropping the host consumer/producer). A change covering exactly these two arms
takes both reproductions above from leaking to empty, and the full
component-async-testssuite stays green. I'm happy to open a PR with the fix
and these two tests.On test coverage:
round_trip*/post_returncall
assert_concurrent_state_empty, but exercise read-based guest↔guest flows. The
existing.pipe()-based host-consumer scenarios (tests/scenario/streams.rs,
tests/scenario/transmit.rs) do not assert an empty concurrent state, so this
guest-drop path was untested.Related: #12091 proposes a
close()method on the
{Future,Stream}{Producer,Consumer}traits, motivated by the fact that today a
host can only detect a guest-side drop viaDrop. This bug compounds that: on
theHostReadyguest-drop path the host producer/consumer is never finalized at
all, so even the currentDrop-based detection never fires. Finalizing the
stranded host end (the fix above) is also the natural place from which a future
close()hook would be invoked.
Last updated: Jun 01 2026 at 09:49 UTC