Stream: git-wasmtime

Topic: wasmtime / issue #9653 wasi-http: can we call the `blocki...


view this post on Zulip Wasmtime GitHub notifications bot (Nov 22 2024 at 08:28):

iawia002 opened issue #9653:

My scenario is simple: I want to send a request with a large body, but the blocking_write_and_flush method can only write 4096 bytes at a time. Naturally, I decided to call this method multiple times to write the entire body:

let req = OutgoingRequest::new(...);

let outgoing_body = req.body().unwarp();
let request_body = outgoing_body.write().unwarp();

let chunks = buf.chunks(4096);
for chunk in chunks {
    request_body.blocking_write_and_flush(chunk).expect("writing response");
}

OutgoingBody::finish(outgoing_body, None).unwarp();

However, I found that the program gets stuck forever on the second call to blocking_write_and_flush. After debugging, I found that it actually gets stuck during the ready check in the second call:

https://github.com/bytecodealliance/wasmtime/blob/5af89308dc0229ca404cd7000eec694201022e2d/crates/wasi-http/src/body.rs#L662-L669

https://github.com/bytecodealliance/wasmtime/blob/5af89308dc0229ca404cd7000eec694201022e2d/crates/wasi-http/src/body.rs#L472

At this point, the writer has no capacity left, but my writing process hasn't finished, so the reader hasn't started consuming the data (?).

For my issue, I could stop using this method and instead combine check-write, subscribe, write, and flush manually to solve it. However, I'm curious whether we are inclined to allow or not allow this behavior. Because the current behavior is strange—the program doesn't report an error, it just hangs indefinitely.

view this post on Zulip Wasmtime GitHub notifications bot (Nov 22 2024 at 16:36):

pchickey commented on issue #9653:

Your program getting hung forever on the second blocking_write_and_flush is a bug - you should be able to call that as many times as you want, though it may be less efficient than using check-write/write.

Can you provide a .wasm that reproduces this with the wasmtime cli and we can work on getting it fixed?

view this post on Zulip Wasmtime GitHub notifications bot (Nov 25 2024 at 02:06):

iawia002 commented on issue #9653:

you should be able to call that as many times as you want, though it may be less efficient than using check-write/write.

Agree. Here is a program that reproduces this issue:

http.wasm.zip

Source code:

use wasi::{
    http::{
        outgoing_handler,
        types::{Fields, Method, OutgoingBody, RequestOptions, Scheme},
    },
    io::streams,
};

wasi::cli::command::export!(Example);

struct Example;

impl wasi::exports::cli::run::Guest for Example {
    fn run() -> Result<(), ()> {
        let fields = Fields::new();
        let outgoing_request = outgoing_handler::OutgoingRequest::new(fields);
        outgoing_request.set_method(&Method::Post).unwrap();
        outgoing_request.set_scheme(Some(&Scheme::Https)).unwrap();
        outgoing_request.set_authority(Some("httpbin.org")).unwrap();
        outgoing_request.set_path_with_query(Some("/post")).unwrap();

        let outgoing_body = outgoing_request.body().unwrap();
        let body = [0; 5000];
        let request_body = outgoing_body.write().unwrap();

        let chunks = body.chunks(4096);
        for chunk in chunks {
            request_body
                .blocking_write_and_flush(chunk)
                .expect("writing response");
            println!("writing response");
        }
        drop(request_body);
        println!("finished");
        OutgoingBody::finish(outgoing_body, None).unwrap();

        let options = RequestOptions::new();
        let future_response = outgoing_handler::handle(outgoing_request, Some(options)).unwrap();

        let incoming_response = match future_response.get() {
            Some(result) => result.unwrap(),
            None => {
                let pollable = future_response.subscribe();
                pollable.block();

                future_response
                    .get()
                    .expect("incoming response available")
                    .unwrap()
            }
        }
        .unwrap();

        drop(future_response);

        let incoming_body = incoming_response.consume().unwrap();

        drop(incoming_response);

        let input_stream = incoming_body.stream().unwrap();
        let input_stream_pollable = input_stream.subscribe();

        let mut body = Vec::new();
        loop {
            input_stream_pollable.block();

            let mut body_chunk = match input_stream.read(1024 * 1024) {
                Ok(c) => c,
                Err(streams::StreamError::Closed) => break,
                Err(e) => panic!("input_stream read failed: {e:?}"),
            };

            if !body_chunk.is_empty() {
                body.append(&mut body_chunk);
            }
        }

        println!("body: {}", String::from_utf8(body).unwrap());
        Ok(())
    }
}
$ wasmtime -S http target/wasm32-wasip2/debug/http.wasm

writing response  // <- hung forever here

At this point, the writer has no capacity left, but my writing process hasn't finished, so the reader hasn't started consuming the data (?).

I'm not sure if my assumption is correct. This issue could be easily resolved by using an unbounded_channel or increasing the channel's capacity. If that's the case, I can submit a patch to fix it. However, I'm not certain if this is the most efficient solution.

view this post on Zulip Wasmtime GitHub notifications bot (Nov 27 2024 at 20:11):

pchickey commented on issue #9653:

Thanks for providing the reproduction. I just spent time looking into this.

The purpose of the bounds on the OutgoingBody OutputStream writer's buffer size and channel depth is to allow the host to maintain backpressure from the HTTP connection to the wasi stream. Backpressure only works when buffering is finite, so an unbounded_channel would break that. I agree we should go back and make the amount of buffering configurable as you saw from the TODOs removed in #9670.

However, since buffering will always be finite, raising the bound doesn't resolve the issue with your guest code here, it just lifts the threshold where it hits. Currently, your guest makes the incorrect assumption that the wasi-http implementation will buffer the entire outgoing body prior to sending the request. The wasmtime wasi-http implementation will presently buffer up to 1 chunk of 1MB, but those limits are allowed to vary between implementations (and will more easily, once we land the configurability in #9670). Your guest must tolerate buffering as little as 1 chunk of 4k (the minimum guaranteed by a call to blocking_write_and_flush) before encountering backpressure (where blocking_write_and_flush's implementation awaits until the stream is ready for more writes).

However, backpressure cannot be relieved until the request is sent. You should restructure your guest to first send the outgoing-request (which will initiate the HTTP connection and send the method, path and query, and headers) and then start writing to the body stream:

use wasi::{
    http::{
        outgoing_handler,
        types::{Fields, Method, OutgoingBody, RequestOptions, Scheme},
    },
    io::streams,
};

wasi::cli::command::export!(Example);

struct Example;

impl wasi::exports::cli::run::Guest for Example {
    fn run() -> Result<(), ()> {
        let fields = Fields::new();
        let outgoing_request = outgoing_handler::OutgoingRequest::new(fields);
        outgoing_request.set_method(&Method::Post).unwrap();
        outgoing_request.set_scheme(Some(&Scheme::Https)).unwrap();
        outgoing_request.set_authority(Some("httpbin.org")).unwrap();
        outgoing_request.set_path_with_query(Some("/post")).unwrap();

        let outgoing_body = outgoing_request.body().unwrap();

        println!("sending request");
        let options = RequestOptions::new();
        let future_response = outgoing_handler::handle(outgoing_request, Some(options)).unwrap();

        let body = [0; 5000];
        let request_body = outgoing_body.write().unwrap();

        let chunks = body.chunks(4096);
        for chunk in chunks {
            request_body
                .blocking_write_and_flush(chunk)
                .expect("writing request body");
            println!("wrote {} of request body", chunk.len());
        }
        drop(request_body);
        OutgoingBody::finish(outgoing_body, None).unwrap();
        println!("finished with request body");

        let incoming_response = match future_response.get() {
            Some(result) => result.unwrap(),
            None => {
                let pollable = future_response.subscribe();
                pollable.block();

                future_response
                    .get()
                    .expect("incoming response available")
                    .unwrap()
            }
        }
        .unwrap();

        drop(future_response);

        let incoming_body = incoming_response.consume().unwrap();

        drop(incoming_response);

        let input_stream = incoming_body.stream().unwrap();
        let input_stream_pollable = input_stream.subscribe();

        let mut body = Vec::new();
        loop {
            input_stream_pollable.block();

            let mut body_chunk = match input_stream.read(1024 * 1024) {
                Ok(c) => c,
                Err(streams::StreamError::Closed) => break,
                Err(e) => panic!("input_stream read failed: {e:?}"),
            };

            if !body_chunk.is_empty() {
                body.append(&mut body_chunk);
            }
        }

        println!("body: {}", String::from_utf8(body).unwrap());
        Ok(())
    }
}

fn main() {}

% wasmtime run -Shttp target/wasm32-wasip2/debug/issue9653.wasm
sending request
wrote 4096 of request body
wrote 904 of request body
finished with request body
body: {
  "args": {},
  "data": "\u0000\u0000\u0000\u0000\u0000\...snipped...",
  "files": {},
  "form": {},
  "headers": {
    "Host": "httpbin.org",
    "Transfer-Encoding": "chunked",
    "X-Amzn-Trace-Id": "Root=1-67477cb9-73a49f517d15aafc1afb91e7"
  },
  "json": null,
  "origin": "98.232.174.29",
  "url": "https://httpbin.org/post"
}

view this post on Zulip Wasmtime GitHub notifications bot (Nov 27 2024 at 20:22):

pchickey edited a comment on issue #9653:

Thanks for providing the reproduction. I just spent time looking into this.

The purpose of the bounds on the OutgoingBody OutputStream writer's buffer size and channel depth is to allow the host to maintain backpressure from the HTTP connection to the wasi stream. Backpressure only works when buffering is finite, so an unbounded_channel would break that. I agree we should go back and make the amount of buffering configurable as you saw from the TODOs removed in #9670.

However, since buffering will always be finite, raising the bound doesn't resolve the issue with your guest code here, it just lifts the threshold where it hits. Currently, your guest makes the incorrect assumption that the wasi-http implementation will buffer the entire outgoing body prior to sending the request. The wasmtime wasi-http implementation will presently buffer up to 1 chunk of 1MB, but those limits are allowed to vary between implementations (and will more easily, once we land the configurability in #9670). Your guest must tolerate buffering as little as 1 chunk of 4k (the minimum guaranteed by a call to blocking_write_and_flush) before encountering backpressure (where blocking_write_and_flush's implementation awaits until the stream is ready for more writes).

However, backpressure cannot be relieved until the request is sent. You should restructure your guest to first send the outgoing-request (which will initiate the HTTP connection and send the method, path and query, and headers) and then start writing to the body stream.

You can't be blamed from having made this mistake, since none of the docs really cover it (afaik), our own test_programs::http::request contains the exact same bug and just don't exercise large enough bodies to hit it.

use wasi::{
    http::{
        outgoing_handler,
        types::{Fields, Method, OutgoingBody, RequestOptions, Scheme},
    },
    io::streams,
};

wasi::cli::command::export!(Example);

struct Example;

impl wasi::exports::cli::run::Guest for Example {
    fn run() -> Result<(), ()> {
        let fields = Fields::new();
        let outgoing_request = outgoing_handler::OutgoingRequest::new(fields);
        outgoing_request.set_method(&Method::Post).unwrap();
        outgoing_request.set_scheme(Some(&Scheme::Https)).unwrap();
        outgoing_request.set_authority(Some("httpbin.org")).unwrap();
        outgoing_request.set_path_with_query(Some("/post")).unwrap();

        let outgoing_body = outgoing_request.body().unwrap();

        println!("sending request");
        let options = RequestOptions::new();
        let future_response = outgoing_handler::handle(outgoing_request, Some(options)).unwrap();

        let body = [0; 5000];
        let request_body = outgoing_body.write().unwrap();

        let chunks = body.chunks(4096);
        for chunk in chunks {
            request_body
                .blocking_write_and_flush(chunk)
                .expect("writing request body");
            println!("wrote {} of request body", chunk.len());
        }
        drop(request_body);
        OutgoingBody::finish(outgoing_body, None).unwrap();
        println!("finished with request body");

        let incoming_response = match future_response.get() {
            Some(result) => result.unwrap(),
            None => {
                let pollable = future_response.subscribe();
                pollable.block();

                future_response
                    .get()
                    .expect("incoming response available")
                    .unwrap()
            }
        }
        .unwrap();

        drop(future_response);

        let incoming_body = incoming_response.consume().unwrap();

        drop(incoming_response);

        let input_stream = incoming_body.stream().unwrap();
        let input_stream_pollable = input_stream.subscribe();

        let mut body = Vec::new();
        loop {
            input_stream_pollable.block();

            let mut body_chunk = match input_stream.read(1024 * 1024) {
                Ok(c) => c,
                Err(streams::StreamError::Closed) => break,
                Err(e) => panic!("input_stream read failed: {e:?}"),
            };

            if !body_chunk.is_empty() {
                body.append(&mut body_chunk);
            }
        }

        println!("body: {}", String::from_utf8(body).unwrap());
        Ok(())
    }
}

fn main() {}

% wasmtime run -Shttp target/wasm32-wasip2/debug/issue9653.wasm
sending request
wrote 4096 of request body
wrote 904 of request body
finished with request body
body: {
  "args": {},
  "data": "\u0000\u0000\u0000\u0000\u0000\...snipped...",
  "files": {},
  "form": {},
  "headers": {
    "Host": "httpbin.org",
    "Transfer-Encoding": "chunked",
    "X-Amzn-Trace-Id": "Root=1-67477cb9-73a49f517d15aafc1afb91e7"
  },
  "json": null,
  "origin": "98.232.174.29",
  "url": "https://httpbin.org/post"
}

view this post on Zulip Wasmtime GitHub notifications bot (Dec 04 2024 at 21:53):

alexcrichton closed issue #9653:

My scenario is simple: I want to send a request with a large body, but the blocking_write_and_flush method can only write 4096 bytes at a time. Naturally, I decided to call this method multiple times to write the entire body:

let req = OutgoingRequest::new(...);

let outgoing_body = req.body().unwarp();
let request_body = outgoing_body.write().unwarp();

let chunks = buf.chunks(4096);
for chunk in chunks {
    request_body.blocking_write_and_flush(chunk).expect("writing response");
}

OutgoingBody::finish(outgoing_body, None).unwarp();

However, I found that the program gets stuck forever on the second call to blocking_write_and_flush. After debugging, I found that it actually gets stuck during the ready check in the second call:

https://github.com/bytecodealliance/wasmtime/blob/5af89308dc0229ca404cd7000eec694201022e2d/crates/wasi-http/src/body.rs#L662-L669

https://github.com/bytecodealliance/wasmtime/blob/5af89308dc0229ca404cd7000eec694201022e2d/crates/wasi-http/src/body.rs#L472

At this point, the writer has no capacity left, but my writing process hasn't finished, so the reader hasn't started consuming the data (?).

For my issue, I could stop using this method and instead combine check-write, subscribe, write, and flush manually to solve it. However, I'm curious whether we are inclined to allow or not allow this behavior. Because the current behavior is strange—the program doesn't report an error, it just hangs indefinitely.


Last updated: Jan 24 2025 at 00:11 UTC