Stream

Struct Stream 

Source
pub struct Stream<T = Info> { /* private fields */ }
Expand description

Handle to operations that can be performed on a Stream. It’s generic over the type of info field to allow Stream with or without info contents.

Implementations§

Source§

impl Stream<Info>

Source

pub async fn info(&mut self) -> Result<&Info, InfoError>

Retrieves info about Stream from the server, updates the cached info inside Stream and returns it.

§Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut stream = jetstream.get_stream("events").await?;

let info = stream.info().await?;
Source

pub fn cached_info(&self) -> &Info

Returns cached Info for the Stream. Cache is either from initial creation/retrieval of the Stream or last call to Stream::info.

§Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;

let info = stream.cached_info();
Source§

impl<I> Stream<I>

Source

pub async fn get_info(&self) -> Result<Info, InfoError>

Retrieves info about Stream from the server. Does not update the cache. Can be used on Stream retrieved by Context::get_stream_no_info

Source

pub async fn info_with_subjects<F: AsRef<str>>( &self, subjects_filter: F, ) -> Result<InfoWithSubjects, InfoError>

Retrieves [Info] from the server and returns a [futures_util::Stream] that allows iterating over all subjects in the stream fetched via paged API.

§Examples
use futures_util::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut stream = jetstream.get_stream("events").await?;

let mut info = stream.info_with_subjects("events.>").await?;

while let Some((subject, count)) = info.try_next().await? {
    println!("Subject: {} count: {}", subject, count);
}
Source

pub fn info_builder(&self) -> StreamInfoBuilder

Creates a builder that allows to customize Stream::Info.

§Examples
use futures_util::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut stream = jetstream.get_stream("events").await?;

let mut info = stream
    .info_builder()
    .with_deleted(true)
    .subjects("events.>")
    .fetch()
    .await?;

while let Some((subject, count)) = info.try_next().await? {
    println!("Subject: {} count: {}", subject, count);
}
Source

pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders>

Creates a builder for direct get operations.

Allows for more control over direct get requests.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;

// Get message without headers
let message = stream.direct_get_builder().sequence(100).send().await?;
Source

pub async fn direct_get_next_for_subject<T: Into<String>>( &self, subject: T, sequence: Option<u64>, ) -> Result<StreamMessage, DirectGetError>

Gets next message for a Stream.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch super::message::StreamMessage from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.>".to_string()],
        allow_direct: true,
        ..Default::default()
    })
    .await?;

jetstream.publish("events.data", "data".into()).await?;
let pub_ack = jetstream.publish("events.data", "data".into()).await?;

let message = stream
    .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
    .await?;
Source

pub async fn direct_get_first_for_subject<T: Into<String>>( &self, subject: T, ) -> Result<StreamMessage, DirectGetError>

Gets first message from Stream.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch super::message::StreamMessage from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.>".to_string()],
        allow_direct: true,
        ..Default::default()
    })
    .await?;

let pub_ack = jetstream.publish("events.data", "data".into()).await?;

let message = stream.direct_get_first_for_subject("events.data").await?;
Source

pub async fn direct_get( &self, sequence: u64, ) -> Result<StreamMessage, DirectGetError>

Gets message from Stream with given sequence id.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch super::message::StreamMessage from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.>".to_string()],
        allow_direct: true,
        ..Default::default()
    })
    .await?;

let pub_ack = jetstream.publish("events.data", "data".into()).await?;

let message = stream.direct_get(pub_ack.await?.sequence).await?;
Source

pub async fn direct_get_last_for_subject<T: Into<String>>( &self, subject: T, ) -> Result<StreamMessage, DirectGetError>

Gets last message for a given subject.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch super::message::StreamMessage from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.>".to_string()],
        allow_direct: true,
        ..Default::default()
    })
    .await?;

jetstream.publish("events.data", "data".into()).await?;

let message = stream.direct_get_last_for_subject("events.data").await?;
Source

pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders>

Creates a builder for retrieving messages from the stream with flexible options. Raw message methods retrieve messages directly from the stream leader instead of a replica. This should be used with care, as it can put additional load on the stream leader.

§Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_stream("events").await?;

// Get message without headers
let value = stream.raw_message_builder().sequence(100).send().await?;
Source

pub async fn get_raw_message( &self, sequence: u64, ) -> Result<StreamMessage, RawMessageError>

Get a raw message from the stream for a given stream sequence. This low-level API always reaches stream leader. This should be discouraged in favor of using Stream::direct_get.

§Examples
#[tokio::main]
use futures_util::StreamExt;
use futures_util::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

let publish_ack = context.publish("events", "data".into()).await?;
let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
println!("Retrieved raw message {:?}", raw_message);
Source

pub async fn get_first_raw_message_by_subject<T: AsRef<str>>( &self, subject: T, sequence: u64, ) -> Result<StreamMessage, RawMessageError>

Get a first message from the stream for a given subject starting from provided sequence. This low-level API always reaches stream leader. This should be discouraged in favor of using Stream::direct_get_first_for_subject.

§Examples
#[tokio::main]
use futures_util::StreamExt;
use futures_util::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_stream_no_info("events").await?;

let raw_message = stream
    .get_first_raw_message_by_subject("events.created", 10)
    .await?;
println!("Retrieved raw message {:?}", raw_message);
Source

pub async fn get_next_raw_message_by_subject<T: AsRef<str>>( &self, subject: T, ) -> Result<StreamMessage, RawMessageError>

Get a next message from the stream for a given subject. This low-level API always reaches stream leader. This should be discouraged in favor of using Stream::direct_get_next_for_subject.

§Examples
#[tokio::main]
use futures_util::StreamExt;
use futures_util::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_stream_no_info("events").await?;

let raw_message = stream
    .get_next_raw_message_by_subject("events.created")
    .await?;
println!("Retrieved raw message {:?}", raw_message);
Source

pub async fn get_last_raw_message_by_subject( &self, stream_subject: &str, ) -> Result<StreamMessage, LastRawMessageError>

Get a last message from the stream for a given subject. This low-level API always reaches stream leader. This should be discouraged in favor of using Stream::direct_get_last_for_subject.

§Examples
#[tokio::main]
use futures_util::StreamExt;
use futures_util::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_stream_no_info("events").await?;

let raw_message = stream
    .get_last_raw_message_by_subject("events.created")
    .await?;
println!("Retrieved raw message {:?}", raw_message);
Source

pub async fn delete_message( &self, sequence: u64, ) -> Result<bool, DeleteMessageError>

Delete a message from the stream.

§Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

let publish_ack = context.publish("events", "data".into()).await?;
stream.delete_message(publish_ack.await?.sequence).await?;
Source

pub fn purge(&self) -> Purge<No, No>

Purge Stream messages.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
stream.purge().await?;
Source

pub async fn purge_subject<T>( &self, subject: T, ) -> Result<PurgeResponse, PurgeError>
where T: Into<String>,

👎Deprecated since 0.25.0: Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead.

Purge Stream messages for a matching subject.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
stream.purge_subject("data").await?;
Source

pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>( &self, config: C, ) -> Result<Consumer<C>, ConsumerError>

Create or update Durable or Ephemeral Consumer (if durable_name was not provided) and returns the info from the server about created Consumer If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].

§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream
    .create_consumer(consumer::pull::Config {
        durable_name: Some("pull".to_string()),
        ..Default::default()
    })
    .await?;
Source

pub async fn consumer_info<T: AsRef<str>>( &self, name: T, ) -> Result<Info, ConsumerInfoError>

Retrieve Info about Consumer from the server.

§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream.consumer_info("pull").await?;
Source

pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, ) -> Result<Consumer<T>, Error>

Get Consumer from the the server. Consumer iterators can be used to retrieve Messages for a given Consumer.

§Examples
use async_nats::jetstream::consumer;
use futures_util::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
Source

pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, config: T, ) -> Result<Consumer<T>, ConsumerError>

Create a Consumer with the given configuration if it is not present on the server. Returns a handle to the Consumer.

Note: This does not validate if the Consumer on the server is compatible with the configuration passed in except Push/Pull compatibility.

§Examples
use async_nats::jetstream::consumer;
use futures_util::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let consumer = stream
    .get_or_create_consumer(
        "pull",
        consumer::pull::Config {
            durable_name: Some("pull".to_string()),
            ..Default::default()
        },
    )
    .await?;
Source

pub async fn delete_consumer( &self, name: &str, ) -> Result<DeleteStatus, ConsumerError>

Delete a Consumer from the server.

§Examples
use async_nats::jetstream::consumer;
use futures_util::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

jetstream
    .get_stream("events")
    .await?
    .delete_consumer("pull")
    .await?;
Source

pub fn consumer_names(&self) -> ConsumerNames

Lists names of all consumers for current stream.

§Examples
use futures_util::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut names = stream.consumer_names();
while let Some(consumer) = names.try_next().await? {
    println!("consumer: {stream:?}");
}
Source

pub fn consumers(&self) -> Consumers

Lists all consumers info for current stream.

§Examples
use futures_util::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut consumers = stream.consumers();
while let Some(consumer) = consumers.try_next().await? {
    println!("consumer: {consumer:?}");
}

Trait Implementations§

Source§

impl<T: Clone> Clone for Stream<T>

Source§

fn clone(&self) -> Stream<T>

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Debug> Debug for Stream<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Stream<T>
where T: Freeze,

§

impl<T = Info> !RefUnwindSafe for Stream<T>

§

impl<T> Send for Stream<T>
where T: Send,

§

impl<T> Sync for Stream<T>
where T: Sync,

§

impl<T> Unpin for Stream<T>
where T: Unpin,

§

impl<T = Info> !UnwindSafe for Stream<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more