pub struct Stream<T = Info> { /* private fields */ }Expand description
Handle to operations that can be performed on a Stream.
It’s generic over the type of info field to allow Stream with or without
info contents.
Implementations§
Source§impl Stream<Info>
impl Stream<Info>
Sourcepub async fn info(&mut self) -> Result<&Info, InfoError>
pub async fn info(&mut self) -> Result<&Info, InfoError>
Retrieves info about Stream from the server, updates the cached info inside
Stream and returns it.
§Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream.get_stream("events").await?;
let info = stream.info().await?;Sourcepub fn cached_info(&self) -> &Info
pub fn cached_info(&self) -> &Info
Returns cached Info for the Stream. Cache is either from initial creation/retrieval of the Stream or last call to Stream::info.
§Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream.cached_info();Source§impl<I> Stream<I>
impl<I> Stream<I>
Sourcepub async fn get_info(&self) -> Result<Info, InfoError>
pub async fn get_info(&self) -> Result<Info, InfoError>
Retrieves info about Stream from the server. Does not update the cache.
Can be used on Stream retrieved by Context::get_stream_no_info
Sourcepub async fn info_with_subjects<F: AsRef<str>>(
&self,
subjects_filter: F,
) -> Result<InfoWithSubjects, InfoError>
pub async fn info_with_subjects<F: AsRef<str>>( &self, subjects_filter: F, ) -> Result<InfoWithSubjects, InfoError>
Retrieves [Info] from the server and returns a [futures_util::Stream] that allows iterating over all subjects in the stream fetched via paged API.
§Examples
use futures_util::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream.get_stream("events").await?;
let mut info = stream.info_with_subjects("events.>").await?;
while let Some((subject, count)) = info.try_next().await? {
println!("Subject: {} count: {}", subject, count);
}Sourcepub fn info_builder(&self) -> StreamInfoBuilder
pub fn info_builder(&self) -> StreamInfoBuilder
Creates a builder that allows to customize Stream::Info.
§Examples
use futures_util::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream.get_stream("events").await?;
let mut info = stream
.info_builder()
.with_deleted(true)
.subjects("events.>")
.fetch()
.await?;
while let Some((subject, count)) = info.try_next().await? {
println!("Subject: {} count: {}", subject, count);
}Sourcepub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders>
pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders>
Creates a builder for direct get operations.
Allows for more control over direct get requests.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
// Get message without headers
let message = stream.direct_get_builder().sequence(100).send().await?;Sourcepub async fn direct_get_next_for_subject<T: Into<String>>(
&self,
subject: T,
sequence: Option<u64>,
) -> Result<StreamMessage, DirectGetError>
pub async fn direct_get_next_for_subject<T: Into<String>>( &self, subject: T, sequence: Option<u64>, ) -> Result<StreamMessage, DirectGetError>
Gets next message for a Stream.
Requires a Stream with allow_direct set to true.
This is different from Stream::get_raw_message, as it can fetch super::message::StreamMessage
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
jetstream.publish("events.data", "data".into()).await?;
let pub_ack = jetstream.publish("events.data", "data".into()).await?;
let message = stream
.direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
.await?;
Sourcepub async fn direct_get_first_for_subject<T: Into<String>>(
&self,
subject: T,
) -> Result<StreamMessage, DirectGetError>
pub async fn direct_get_first_for_subject<T: Into<String>>( &self, subject: T, ) -> Result<StreamMessage, DirectGetError>
Gets first message from Stream.
Requires a Stream with allow_direct set to true.
This is different from Stream::get_raw_message, as it can fetch super::message::StreamMessage
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
let pub_ack = jetstream.publish("events.data", "data".into()).await?;
let message = stream.direct_get_first_for_subject("events.data").await?;
Sourcepub async fn direct_get(
&self,
sequence: u64,
) -> Result<StreamMessage, DirectGetError>
pub async fn direct_get( &self, sequence: u64, ) -> Result<StreamMessage, DirectGetError>
Gets message from Stream with given sequence id.
Requires a Stream with allow_direct set to true.
This is different from Stream::get_raw_message, as it can fetch super::message::StreamMessage
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
let pub_ack = jetstream.publish("events.data", "data".into()).await?;
let message = stream.direct_get(pub_ack.await?.sequence).await?;
Sourcepub async fn direct_get_last_for_subject<T: Into<String>>(
&self,
subject: T,
) -> Result<StreamMessage, DirectGetError>
pub async fn direct_get_last_for_subject<T: Into<String>>( &self, subject: T, ) -> Result<StreamMessage, DirectGetError>
Gets last message for a given subject.
Requires a Stream with allow_direct set to true.
This is different from Stream::get_raw_message, as it can fetch super::message::StreamMessage
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
jetstream.publish("events.data", "data".into()).await?;
let message = stream.direct_get_last_for_subject("events.data").await?;
Sourcepub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders>
pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders>
Creates a builder for retrieving messages from the stream with flexible options. Raw message methods retrieve messages directly from the stream leader instead of a replica. This should be used with care, as it can put additional load on the stream leader.
§Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_stream("events").await?;
// Get message without headers
let value = stream.raw_message_builder().sequence(100).send().await?;Sourcepub async fn get_raw_message(
&self,
sequence: u64,
) -> Result<StreamMessage, RawMessageError>
pub async fn get_raw_message( &self, sequence: u64, ) -> Result<StreamMessage, RawMessageError>
Get a raw message from the stream for a given stream sequence. This low-level API always reaches stream leader. This should be discouraged in favor of using Stream::direct_get.
§Examples
#[tokio::main]
use futures_util::StreamExt;
use futures_util::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
let publish_ack = context.publish("events", "data".into()).await?;
let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
println!("Retrieved raw message {:?}", raw_message);Sourcepub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
&self,
subject: T,
sequence: u64,
) -> Result<StreamMessage, RawMessageError>
pub async fn get_first_raw_message_by_subject<T: AsRef<str>>( &self, subject: T, sequence: u64, ) -> Result<StreamMessage, RawMessageError>
Get a first message from the stream for a given subject starting from provided sequence. This low-level API always reaches stream leader. This should be discouraged in favor of using Stream::direct_get_first_for_subject.
§Examples
#[tokio::main]
use futures_util::StreamExt;
use futures_util::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_stream_no_info("events").await?;
let raw_message = stream
.get_first_raw_message_by_subject("events.created", 10)
.await?;
println!("Retrieved raw message {:?}", raw_message);Sourcepub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<StreamMessage, RawMessageError>
pub async fn get_next_raw_message_by_subject<T: AsRef<str>>( &self, subject: T, ) -> Result<StreamMessage, RawMessageError>
Get a next message from the stream for a given subject. This low-level API always reaches stream leader. This should be discouraged in favor of using Stream::direct_get_next_for_subject.
§Examples
#[tokio::main]
use futures_util::StreamExt;
use futures_util::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_stream_no_info("events").await?;
let raw_message = stream
.get_next_raw_message_by_subject("events.created")
.await?;
println!("Retrieved raw message {:?}", raw_message);Sourcepub async fn get_last_raw_message_by_subject(
&self,
stream_subject: &str,
) -> Result<StreamMessage, LastRawMessageError>
pub async fn get_last_raw_message_by_subject( &self, stream_subject: &str, ) -> Result<StreamMessage, LastRawMessageError>
Get a last message from the stream for a given subject. This low-level API always reaches stream leader. This should be discouraged in favor of using Stream::direct_get_last_for_subject.
§Examples
#[tokio::main]
use futures_util::StreamExt;
use futures_util::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_stream_no_info("events").await?;
let raw_message = stream
.get_last_raw_message_by_subject("events.created")
.await?;
println!("Retrieved raw message {:?}", raw_message);Sourcepub async fn delete_message(
&self,
sequence: u64,
) -> Result<bool, DeleteMessageError>
pub async fn delete_message( &self, sequence: u64, ) -> Result<bool, DeleteMessageError>
Delete a message from the stream.
§Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
let publish_ack = context.publish("events", "data".into()).await?;
stream.delete_message(publish_ack.await?.sequence).await?;Sourcepub fn purge(&self) -> Purge<No, No>
pub fn purge(&self) -> Purge<No, No>
Purge Stream messages.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
stream.purge().await?;Sourcepub async fn purge_subject<T>(
&self,
subject: T,
) -> Result<PurgeResponse, PurgeError>
👎Deprecated since 0.25.0: Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead.
pub async fn purge_subject<T>( &self, subject: T, ) -> Result<PurgeResponse, PurgeError>
Purge Stream messages for a matching subject.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
stream.purge_subject("data").await?;Sourcepub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C,
) -> Result<Consumer<C>, ConsumerError>
pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>( &self, config: C, ) -> Result<Consumer<C>, ConsumerError>
Create or update Durable or Ephemeral Consumer (if durable_name was not provided) and
returns the info from the server about created Consumer
If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await?;Sourcepub async fn consumer_info<T: AsRef<str>>(
&self,
name: T,
) -> Result<Info, ConsumerInfoError>
pub async fn consumer_info<T: AsRef<str>>( &self, name: T, ) -> Result<Info, ConsumerInfoError>
Retrieve Info about Consumer from the server.
§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream.consumer_info("pull").await?;Sourcepub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
) -> Result<Consumer<T>, Error>
pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, ) -> Result<Consumer<T>, Error>
Get Consumer from the the server. Consumer iterators can be used to retrieve Messages for a given Consumer.
§Examples
use async_nats::jetstream::consumer;
use futures_util::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;Sourcepub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
config: T,
) -> Result<Consumer<T>, ConsumerError>
pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, config: T, ) -> Result<Consumer<T>, ConsumerError>
Create a Consumer with the given configuration if it is not present on the server. Returns a handle to the Consumer.
Note: This does not validate if the Consumer on the server is compatible with the configuration passed in except Push/Pull compatibility.
§Examples
use async_nats::jetstream::consumer;
use futures_util::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let consumer = stream
.get_or_create_consumer(
"pull",
consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
},
)
.await?;Sourcepub async fn delete_consumer(
&self,
name: &str,
) -> Result<DeleteStatus, ConsumerError>
pub async fn delete_consumer( &self, name: &str, ) -> Result<DeleteStatus, ConsumerError>
Sourcepub fn consumer_names(&self) -> ConsumerNames
pub fn consumer_names(&self) -> ConsumerNames
Lists names of all consumers for current stream.
§Examples
use futures_util::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut names = stream.consumer_names();
while let Some(consumer) = names.try_next().await? {
println!("consumer: {stream:?}");
}Sourcepub fn consumers(&self) -> Consumers
pub fn consumers(&self) -> Consumers
Lists all consumers info for current stream.
§Examples
use futures_util::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut consumers = stream.consumers();
while let Some(consumer) = consumers.try_next().await? {
println!("consumer: {consumer:?}");
}