Coverage for wasmtime/bindgen/generated/imports/streams.py: 82%

22 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-20 16:25 +0000

1from ..types import Result 

2from abc import abstractmethod 

3from enum import Enum 

4from typing import Protocol, Tuple 

5 

6InputStream = int 

7OutputStream = int 

8class StreamStatus(Enum): 

9 OPEN = 0 

10 ENDED = 1 

11 

12class HostStreams(Protocol): 

13 @abstractmethod 

14 def drop_input_stream(self, this: InputStream) -> None: 

15 raise NotImplementedError 

16 @abstractmethod 

17 def write(self, this: OutputStream, buf: bytes) -> Result[Tuple[int, StreamStatus], None]: 

18 raise NotImplementedError 

19 @abstractmethod 

20 def blocking_write(self, this: OutputStream, buf: bytes) -> Result[Tuple[int, StreamStatus], None]: 

21 raise NotImplementedError 

22 @abstractmethod 

23 def drop_output_stream(self, this: OutputStream) -> None: 

24 raise NotImplementedError 

25