Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docker/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ name = "app"
path = "main.rs"

[dependencies]
async-nats = {version = "0.33", features = ["service"]}
tokio = { version = "1.20.0", features = ["full"] }
futures = "0.3.21"
serde_json = "1.0.82"
serde = "1.0.139"
async-nats = {version = "0.35.1", features = ["service"]}
tokio = { version = "1.39.1", features = ["full"] }
futures = "0.3.28"
serde_json = "1.0.104"
serde = { version = "1.0.184", features = ["derive"] }
rand = "0.8"
2 changes: 1 addition & 1 deletion docker/rust/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.67-slim AS build
FROM rust:1.80-slim-bullseye AS build

WORKDIR /opt/app

Expand Down
60 changes: 60 additions & 0 deletions examples/jetstream/ack-ack/rust/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::{env};
use async_nats::jetstream;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let nats_url = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());

let client = async_nats::connect(nats_url).await?;
let jetstream = jetstream::new(client);

let stream = jetstream
.create_stream(jetstream::stream::Config {
name: "EVENTS".to_string(),
subjects: vec!["event.foo".to_string()],
..Default::default()
})
.await?;

let _ = jetstream.publish("event.foo", "1".into()).await;
let _ = jetstream.publish("event.foo", "2".into()).await;

let mut consumer = stream
.create_consumer(async_nats::jetstream::consumer::pull::Config { ..Default::default()})
.await?;

let ci = consumer.info().await?;
println!("Consumer 1");
println!(" Start\n # pending messages: {}\n # messages with ack pending: {}", ci.num_pending, ci.num_ack_pending);

let message = consumer.fetch().max_messages(1).messages().await?.next().await.unwrap()?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not unwrap the optional without comment here.
Let's explain to the user what is happening here: that messages() returns an iterator that is closed when None is returned.

maybe this is giving the user a better idea what is happening?

let fetch = consumer.fetch().max_messages(1).messages().await?;
let message = match fetch.next().await.expect("should get one message");

let ci = consumer.info().await?;
println!(" After received but before ack\n # pending messages: {}\n # messages with ack pending: {}", ci.num_pending, ci.num_ack_pending);

message.ack().await?;

let ci = consumer.info().await?;
println!(" After ack\n # pending messages: {}\n # messages with ack pending: {}", ci.num_pending, ci.num_ack_pending);

// Consumer 2 will use double_ack()
let stream = jetstream.get_stream("EVENTS".to_string()).await?;
let mut consumer = stream
.create_consumer(async_nats::jetstream::consumer::pull::Config { ..Default::default()})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put an explicit name for the consumer.

.await?;

let ci = consumer.info().await?;
println!("Consumer 2");
println!(" Start\n # pending messages: {}\n # messages with ack pending: {}", ci.num_pending, ci.num_ack_pending);

let message = consumer.fetch().max_messages(1).messages().await?.next().await.unwrap()?;
let ci = consumer.info().await?;
println!(" After received but before ack\n # pending messages: {}\n # messages with ack pending: {}", ci.num_pending, ci.num_ack_pending);

message.double_ack().await?;

let ci = consumer.info().await?;
println!(" After ack\n # pending messages: {}\n # messages with ack pending: {}", ci.num_pending, ci.num_ack_pending);

Ok(())
}
2 changes: 1 addition & 1 deletion examples/jetstream/limits-stream/rust/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> Result<(), async_nats::Error> {
.await?
.await?;
jetstream
.publish("events.mouse_clicked")
.publish("events.mouse_clicked", "".into())
.await?
.await?;
jetstream
Expand Down
13 changes: 0 additions & 13 deletions examples/jetstream/limits-stream/rust/output.cast

This file was deleted.

Loading