JetStream
Core NATS delivers messages only to subscribers connected at the moment of publication - at most once, never replayed. JetStream adds a persistence layer on top, giving you at-least-once delivery - messages survive restarts and can be replayed.
Core NATS already decouples publisher and subscriber from each other, where a publisher does not need to know about the subscriber. JetStream extends that decoupling to time - the two no longer need to be online at the same moment.
How It Works
JetStream introduces three pieces working together:
- A stream is a server-side store of messages, bound to one or more subjects.
- A consumer is a server-side, stateful view of a stream - the server tracks how far a client has progressed, so applications don't have to.
- A client is an application that connects to a consumer to receive messages and acknowledge them. A consumer can be shared by multiple clients to divide the work; each acknowledgment advances the consumer's position in the stream.
Streams
A stream is bound to one or more subject patterns. When a publisher sends a message to a matching subject, the server appends it to the stream and assigns it a sequence number. Streams are configurable for storage (memory or disk), retention (how long messages are kept), replication, and more.
Consumers
A consumer is a server-side, stateful view of a stream that tracks how far a client has progressed. Multiple consumers can read the same stream independently, each with its own position. The server maintains that position so applications don't have to coordinate or remember it themselves.
The application that connects to a consumer - the client - receives messages and acknowledges each one. An acknowledgment advances the consumer's cursor - if a message isn't acknowledged in time, the server redelivers it, which is what gives you at-least-once delivery.
A consumer can be configured to start reading from the beginning of the stream, from the latest message, from a specific sequence number, or from a specific time.
Putting It Together
- CLI
- JavaScript/TypeScript
- Go
- Python
- Java
- Rust
- C#/.NET
# Create a stream that captures any subject under `orders.`
nats stream add ORDERS --subjects "orders.>" --storage file --retention limits --defaults
# Publish a few orders
nats pub orders.new "Order #1001"
nats pub orders.new "Order #1002"
nats pub orders.shipped "Order #1001 shipped"
# Create a durable pull consumer that delivers from the beginning of the stream
nats consumer add ORDERS order-processor --pull --deliver all --ack explicit --defaults
# Fetch and acknowledge the next batch of messages
nats consumer next ORDERS order-processor --count 3 --ack
// Create a stream that captures any subject under `orders.`
const jsm = await jetstreamManager(nc);
await jsm.streams.add({
name: "ORDERS",
subjects: ["orders.>"],
storage: StorageType.File,
});
// Publish a few orders
const js = jetstream(nc);
await js.publish("orders.new", "Order #1001");
await js.publish("orders.new", "Order #1002");
await js.publish("orders.shipped", "Order #1001 shipped");
// Create a durable pull consumer that delivers from the beginning
await jsm.consumers.add("ORDERS", {
durable_name: "order-processor",
ack_policy: AckPolicy.Explicit,
});
const consumer = await js.consumers.get("ORDERS", "order-processor");
// Fetch a batch and acknowledge each message
const messages = await consumer.fetch({ max_messages: 3, expires: 5000 });
for await (const msg of messages) {
console.log(`Received on ${msg.subject}: ${msg.string()}`);
msg.ack();
}
// Create a stream that captures any subject under `orders.`
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"},
Storage: jetstream.FileStorage,
})
if err != nil {
log.Fatal(err)
}
// Publish a few orders
js.Publish(ctx, "orders.new", []byte("Order #1001"))
js.Publish(ctx, "orders.new", []byte("Order #1002"))
js.Publish(ctx, "orders.shipped", []byte("Order #1001 shipped"))
// Create a durable pull consumer that delivers from the beginning
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "order-processor",
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
log.Fatal(err)
}
// Fetch a batch and acknowledge each message
msgs, err := consumer.Fetch(3)
if err != nil {
log.Fatal(err)
}
for msg := range msgs.Messages() {
fmt.Printf("Received on %s: %s\n", msg.Subject(), string(msg.Data()))
msg.Ack()
}
# JetStream context
js = jetstream.new(nc)
# Create a stream that captures any subject under `orders.`
stream = await js.create_stream(name="ORDERS", subjects=["orders.>"], storage="file")
# Publish a few orders
await js.publish("orders.new", b"Order #1001")
await js.publish("orders.new", b"Order #1002")
await js.publish("orders.shipped", b"Order #1001 shipped")
# Create a durable pull consumer that delivers from the beginning
consumer = await stream.create_or_update_consumer(
name="order-processor",
ack_policy="explicit",
)
# Fetch a batch and acknowledge each message
batch = await consumer.fetch(max_messages=3, max_wait=2.0)
async for msg in batch:
print(f"Received on {msg.subject}: {msg.data.decode()}")
await msg.ack()
// Create a stream that captures any subject under `orders.`
JetStreamManagement jsm = nc.jetStreamManagement();
StreamInfo streamInfo = jsm.addStream(StreamConfiguration.builder()
.name("ORDERS")
.subjects("orders.>")
.storageType(StorageType.File)
.build());
// Publish a few orders
JetStream js = nc.jetStream();
js.publish("orders.new", "Order #1001".getBytes(StandardCharsets.UTF_8));
js.publish("orders.new", "Order #1002".getBytes(StandardCharsets.UTF_8));
js.publish("orders.shipped", "Order #1001 shipped".getBytes(StandardCharsets.UTF_8));
// Create a durable pull consumer that delivers from the beginning
StreamContext stream = js.getStreamContext("ORDERS");
ConsumerContext consumer = stream.createOrUpdateConsumer(ConsumerConfiguration.builder()
.durable("order-processor")
.ackPolicy(AckPolicy.Explicit)
.build());
// Fetch a batch and acknowledge each message
try (FetchConsumer fetchConsumer = consumer.fetchMessages(3)) {
Message msg = fetchConsumer.nextMessage();
while (msg != null) {
System.out.printf("Received on %s: %s\n", msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
msg.ack();
msg = fetchConsumer.nextMessage();
}
}
// Create a stream that captures any subject under `orders.`
let stream = js
.create_stream(jetstream::stream::Config {
name: "ORDERS".to_string(),
subjects: vec!["orders.>".into()],
storage: StorageType::File,
..Default::default()
})
.await?;
// Publish a few orders
js.publish("orders.new", "Order #1001".into()).await?;
js.publish("orders.new", "Order #1002".into()).await?;
js.publish("orders.shipped", "Order #1001 shipped".into())
.await?;
// Create a durable pull consumer that delivers from the beginning
let consumer: PullConsumer = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name: Some("order-processor".to_string()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
..Default::default()
})
.await?;
// Fetch a batch and acknowledge each message
let mut messages = consumer.fetch().max_messages(3).messages().await?;
while let Some(message) = messages.next().await {
let message = message?;
println!(
"Received on {}: {}",
message.subject,
String::from_utf8_lossy(&message.payload)
);
message.ack().await?;
}
// Create a stream that captures any subject under `orders.`
var js = client.CreateJetStreamContext();
await js.CreateStreamAsync(new StreamConfig(name: "ORDERS", subjects: ["orders.>"])
{
Storage = StreamConfigStorage.File,
});
// Publish a few orders
await js.PublishAsync<string>(subject: "orders.new", data: "Order #1001");
await js.PublishAsync<string>(subject: "orders.new", data: "Order #1002");
await js.PublishAsync<string>(subject: "orders.shipped", data: "Order #1001 shipped");
// Create a durable pull consumer that delivers from the beginning
var consumer = await js.CreateOrUpdateConsumerAsync(stream: "ORDERS", config: new ConsumerConfig
{
Name = "order-processor",
DurableName = "order-processor",
AckPolicy = ConsumerConfigAckPolicy.Explicit,
});
// Fetch a batch and acknowledge each message
await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 3 }))
{
output.WriteLine($"Received on {msg.Subject}: {msg.Data}");
await msg.AckAsync();
}
Beyond Streams and Consumers
JetStream also provides higher-level abstractions built on top of streams and consumers:
- Key Value Store: A simple key-value store with built-in replication and durability.
- Object Store: A scalable object storage system with support for versioning and metadata.
Related Concepts
- Publish-Subscribe - The fire-and-forget messaging model JetStream builds on
- Subjects - How streams capture messages by subject patterns
- Queue Groups - Load balancing across consumers, also available with JetStream consumers