Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,38 @@ client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPaylo
.blockLast();
```

### Subscription with a Dead-Letter Topic

For workloads that need to forward unprocessable messages to a dead-letter topic,
use the listener-based `subscribeToEvents` overload that accepts a
`deadLetterTopic` argument. When the listener returns `Status.DROP`, the Dapr
runtime publishes the message to the configured dead-letter topic:

```java
var listener = new SubscriptionListener<String>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
if (shouldRejectMessage(event)) {
return Mono.just(Status.DROP); // forwarded to deadLetterTopicName
}
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
};

try (var subscription = client.subscribeToEvents(
PUBSUB_NAME, topicName, deadLetterTopicName, listener, TypeRef.STRING)) {
subscription.awaitTermination();
}
```

See [SubscriberWithDeadLetter.java](SubscriberWithDeadLetter.java) for a complete example
that consumes the dead-letter topic in the same process.

### Subscription Lifecycle

The examples use `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.
Expand Down Expand Up @@ -138,6 +170,13 @@ Or run the CloudEvent Subscriber example:
dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberCloudEvent
```

Or run the dead-letter Subscriber example, which routes messages whose payload
contains "fail" to a dead-letter topic and consumes both topics:

```bash
dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberWithDeadLetter
```

Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side:

<!-- STEP
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.stream;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;

/**
* Subscriber using bi-directional gRPC streaming with a dead-letter topic.
*
* <p>This example demonstrates how to forward failed messages to a dead-letter topic.
* The main listener returns {@code Status.DROP} for messages whose payload contains
* "fail", which causes the Dapr runtime to publish them to the configured
* dead-letter topic. A second subscription consumes the dead-letter topic so the
* forwarded messages are visible in the same process.
*
* <p>Usage:
* <ol>
* <li>Build and install jars: {@code mvn clean install}
* <li>cd [repo root]/examples
* <li>Run the subscriber:
* {@code dapr run --resources-path ./components/pubsub --app-id subscriber -- \
* java -jar target/dapr-java-sdk-examples-exec.jar \
* io.dapr.examples.pubsub.stream.SubscriberWithDeadLetter}
* <li>Publish messages from another terminal, e.g. containing the word "fail" to
* see them routed to the dead-letter topic.
* </ol>
*/
public class SubscriberWithDeadLetter {

private static final String DEFAULT_TOPIC_NAME = "testingtopic";
private static final String DEFAULT_DEAD_LETTER_TOPIC_NAME = "testingtopic-deadletter";
private static final String PUBSUB_NAME = "messagebus";

/**
* Main entry point for the dead-letter subscriber example.
*
* @param args Optional positional args: [topicName] [deadLetterTopicName].
* @throws Exception An Exception on startup.
*/
public static void main(String[] args) throws Exception {
String topicName = args.length >= 1 ? args[0] : DEFAULT_TOPIC_NAME;
String deadLetterTopicName = args.length >= 2 ? args[1] : DEFAULT_DEAD_LETTER_TOPIC_NAME;

try (var client = new DaprClientBuilder().buildPreviewClient()) {
System.out.println("Subscribing to dead-letter topic: " + deadLetterTopicName);

var deadLetterListener = new SubscriptionListener<String>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Dead-letter subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Dead-letter subscriber got exception: " + exception.getMessage());
}
};

System.out.println("Subscribing to topic: " + topicName
+ " (dead-letter topic: " + deadLetterTopicName + ")");

var mainListener = new SubscriptionListener<String>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
String data = event.getData();
if (data != null && data.toLowerCase().contains("fail")) {
System.out.println("Subscriber dropping message to dead-letter: " + data);
return Mono.just(Status.DROP);
}
System.out.println("Subscriber got: " + data);
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
};

try (var deadLetterSubscription = client.subscribeToEvents(
PUBSUB_NAME, deadLetterTopicName, deadLetterListener, TypeRef.STRING);
var mainSubscription = client.subscribeToEvents(
PUBSUB_NAME, topicName, deadLetterTopicName, mainListener, TypeRef.STRING)) {
mainSubscription.awaitTermination();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class PubSubStreamIT extends BaseIT {
private static final String TOPIC_NAME_FLUX = "stream-topic-flux";
private static final String TOPIC_NAME_CLOUDEVENT = "stream-topic-cloudevent";
private static final String TOPIC_NAME_RAWPAYLOAD = "stream-topic-rawpayload";
private static final String TOPIC_NAME_DLQ = "stream-topic-dlq";
private static final String TOPIC_NAME_DLQ_DEADLETTER = "stream-topic-dlq-deadletter";
private static final String PUBSUB_NAME = "messagebus";

private final List<DaprRun> runs = new ArrayList<>();
Expand Down Expand Up @@ -261,4 +263,77 @@ public void testPubSubRawPayload() throws Exception {
disposable.dispose();
}
}

@Test
public void testPubSubDeadLetterTopic() throws Exception {
final DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName() + "-dlq",
60000));

var runId = UUID.randomUUID().toString();
try (DaprClient client = daprRun.newDaprClient();
DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) {

// Subscribe to the dead-letter topic first so we don't miss any messages.
Set<String> deadLetterMessageIds = Collections.synchronizedSet(new HashSet<>());
var deadLetterListener = new SubscriptionListener<String>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
if (event.getData() != null && event.getData().contains(runId)) {
deadLetterMessageIds.add(event.getId());
System.out.println("Received dead-letter message ID: " + event.getId());
}
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.err.println("Dead-letter subscription error: " + exception.getMessage());
}
};

// Subscribe to the main topic with a listener that always DROPs, which should
// forward the messages to the dead-letter topic.
var mainListener = new SubscriptionListener<String>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
if (event.getData() != null && event.getData().contains(runId)) {
System.out.println("Dropping message ID: " + event.getId());
return Mono.just(Status.DROP);
}
return Mono.just(Status.DROP);
}

@Override
public void onError(RuntimeException exception) {
System.err.println("Main subscription error: " + exception.getMessage());
}
};

try (var deadLetterSubscription = previewClient.subscribeToEvents(
PUBSUB_NAME, TOPIC_NAME_DLQ_DEADLETTER, deadLetterListener, TypeRef.STRING);
var mainSubscription = previewClient.subscribeToEvents(
PUBSUB_NAME, TOPIC_NAME_DLQ, TOPIC_NAME_DLQ_DEADLETTER, mainListener, TypeRef.STRING)) {

// Publish messages to the main topic.
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("DLQ message #%d for run %s", i, runId);
client.publishEvent(PUBSUB_NAME, TOPIC_NAME_DLQ, message).block();
}

callWithRetry(() -> {
var count = deadLetterMessageIds.size();
System.out.println(
String.format("Got %d dead-letter messages out of %d for topic %s.",
count, NUM_MESSAGES, TOPIC_NAME_DLQ_DEADLETTER));
assertEquals(NUM_MESSAGES, deadLetterMessageIds.size());
}, 120000);

mainSubscription.close();
mainSubscription.awaitTermination();
deadLetterSubscription.close();
deadLetterSubscription.awaitTermination();
}
}
}
}
50 changes: 45 additions & 5 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,29 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
@Override
public <T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type) {
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
return subscribeToEvents(pubsubName, topic, null, listener, type);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Subscription subscribeToEvents(
String pubsubName,
String topic,
String deadLetterTopic,
SubscriptionListener<T> listener,
TypeRef<T> type) {
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder =
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.build();
.setPubsubName(pubsubName);
if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
initialRequestBuilder.setDeadLetterTopic(deadLetterTopic);
}
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
.setInitialRequest(initialRequest)
.setInitialRequest(initialRequestBuilder.build())
.build();
return buildSubscription(listener, type, request);
}
Expand Down Expand Up @@ -525,14 +540,35 @@ public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T>
*/
@Override
public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type) {
return subscribeToTopic(pubsubName, topic, type, null);
return subscribeToTopic(pubsubName, topic, null, type, null);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata) {
return subscribeToTopic(pubsubName, topic, null, type, metadata);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, String deadLetterTopic, TypeRef<T> type) {
return subscribeToTopic(pubsubName, topic, deadLetterTopic, type, null);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Flux<T> subscribeToTopic(
String pubsubName,
String topic,
String deadLetterTopic,
TypeRef<T> type,
Map<String, String> metadata) {
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder =
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
Expand All @@ -542,6 +578,10 @@ public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T>
initialRequestBuilder.putAllMetadata(metadata);
}

if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
initialRequestBuilder.setDeadLetterTopic(deadLetterTopic);
}

DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
.setInitialRequest(initialRequestBuilder.build())
Expand Down
49 changes: 49 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,25 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
<T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);

/**
* Subscribe to pubsub via streaming with a dead-letter topic.
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
* @param listener Callback methods to process events.
* @param type Type for object deserialization.
* @param <T> Type of object deserialization.
* @return An active subscription.
* @deprecated Use {@link #subscribeToTopic(String, String, String, TypeRef)} instead for a more reactive approach.
*/
@Deprecated
<T> Subscription subscribeToEvents(
String pubsubName,
String topic,
String deadLetterTopic,
SubscriptionListener<T> listener,
TypeRef<T> type);

/**
* Subscribe to pubsub events via streaming using Project Reactor Flux.
*
Expand Down Expand Up @@ -352,6 +371,36 @@ <T> Subscription subscribeToEvents(
*/
<T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata);

/**
* Subscribe to pubsub events via streaming using Project Reactor Flux with a dead-letter topic.
*
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
* @param type Type for object deserialization.
* @return A Flux of deserialized event payloads.
* @param <T> Type of the event payload.
*/
<T> Flux<T> subscribeToTopic(String pubsubName, String topic, String deadLetterTopic, TypeRef<T> type);

/**
* Subscribe to pubsub events via streaming using Project Reactor Flux with metadata and dead-letter topic.
*
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
* @param type Type for object deserialization.
* @param metadata Subscription metadata (e.g., {"rawPayload": "true"}).
* @return A Flux of deserialized event payloads.
* @param <T> Type of the event payload.
*/
<T> Flux<T> subscribeToTopic(
String pubsubName,
String topic,
String deadLetterTopic,
TypeRef<T> type,
Map<String, String> metadata);

/*
* Converse with an LLM.
*
Expand Down
Loading
Loading