Skip to content

Commit 4e4dcc3

Browse files
author
Siri Varma Vegiraju
committed
feat: add dead-letter topic support to streaming subscriptions (#1608)
Adds optional deadLetterTopic parameter to the streaming subscribe APIs on DaprPreviewClient / DaprClientImpl, wiring it through to the SubscribeTopicEventsRequestInitialAlpha1.dead_letter_topic proto field. New overloads: - subscribeToEvents(pubsubName, topic, deadLetterTopic, listener, type) - subscribeToTopic(pubsubName, topic, deadLetterTopic, type) - subscribeToTopic(pubsubName, topic, deadLetterTopic, type, metadata) Existing methods delegate to the new ones with a null deadLetterTopic, and the field is only set on the request when a non-empty value is provided. Adds unit tests covering both presence and absence of the field on the gRPC initial frame. Fixes: #1608 Signed-off-by: Siri Varma Vegiraju <s_vegiraju@apple.com>
1 parent 89708ae commit 4e4dcc3

3 files changed

Lines changed: 342 additions & 5 deletions

File tree

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -490,14 +490,29 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
490490
@Override
491491
public <T> Subscription subscribeToEvents(
492492
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type) {
493-
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
493+
return subscribeToEvents(pubsubName, topic, null, listener, type);
494+
}
495+
496+
/**
497+
* {@inheritDoc}
498+
*/
499+
@Override
500+
public <T> Subscription subscribeToEvents(
501+
String pubsubName,
502+
String topic,
503+
String deadLetterTopic,
504+
SubscriptionListener<T> listener,
505+
TypeRef<T> type) {
506+
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder =
494507
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
495508
.setTopic(topic)
496-
.setPubsubName(pubsubName)
497-
.build();
509+
.setPubsubName(pubsubName);
510+
if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
511+
initialRequestBuilder.setDeadLetterTopic(deadLetterTopic);
512+
}
498513
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request =
499514
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
500-
.setInitialRequest(initialRequest)
515+
.setInitialRequest(initialRequestBuilder.build())
501516
.build();
502517
return buildSubscription(listener, type, request);
503518
}
@@ -525,14 +540,35 @@ public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T>
525540
*/
526541
@Override
527542
public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type) {
528-
return subscribeToTopic(pubsubName, topic, type, null);
543+
return subscribeToTopic(pubsubName, topic, null, type, null);
529544
}
530545

531546
/**
532547
* {@inheritDoc}
533548
*/
534549
@Override
535550
public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata) {
551+
return subscribeToTopic(pubsubName, topic, null, type, metadata);
552+
}
553+
554+
/**
555+
* {@inheritDoc}
556+
*/
557+
@Override
558+
public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, String deadLetterTopic, TypeRef<T> type) {
559+
return subscribeToTopic(pubsubName, topic, deadLetterTopic, type, null);
560+
}
561+
562+
/**
563+
* {@inheritDoc}
564+
*/
565+
@Override
566+
public <T> Flux<T> subscribeToTopic(
567+
String pubsubName,
568+
String topic,
569+
String deadLetterTopic,
570+
TypeRef<T> type,
571+
Map<String, String> metadata) {
536572
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder =
537573
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
538574
.setTopic(topic)
@@ -542,6 +578,10 @@ public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T>
542578
initialRequestBuilder.putAllMetadata(metadata);
543579
}
544580

581+
if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
582+
initialRequestBuilder.setDeadLetterTopic(deadLetterTopic);
583+
}
584+
545585
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request =
546586
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
547587
.setInitialRequest(initialRequestBuilder.build())

sdk/src/main/java/io/dapr/client/DaprPreviewClient.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,25 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
292292
<T> Subscription subscribeToEvents(
293293
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);
294294

295+
/**
296+
* Subscribe to pubsub via streaming with a dead-letter topic.
297+
* @param pubsubName Name of the pubsub component.
298+
* @param topic Name of the topic to subscribe to.
299+
* @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
300+
* @param listener Callback methods to process events.
301+
* @param type Type for object deserialization.
302+
* @param <T> Type of object deserialization.
303+
* @return An active subscription.
304+
* @deprecated Use {@link #subscribeToTopic(String, String, String, TypeRef)} instead for a more reactive approach.
305+
*/
306+
@Deprecated
307+
<T> Subscription subscribeToEvents(
308+
String pubsubName,
309+
String topic,
310+
String deadLetterTopic,
311+
SubscriptionListener<T> listener,
312+
TypeRef<T> type);
313+
295314
/**
296315
* Subscribe to pubsub events via streaming using Project Reactor Flux.
297316
*
@@ -352,6 +371,36 @@ <T> Subscription subscribeToEvents(
352371
*/
353372
<T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata);
354373

374+
/**
375+
* Subscribe to pubsub events via streaming using Project Reactor Flux with a dead-letter topic.
376+
*
377+
* @param pubsubName Name of the pubsub component.
378+
* @param topic Name of the topic to subscribe to.
379+
* @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
380+
* @param type Type for object deserialization.
381+
* @return A Flux of deserialized event payloads.
382+
* @param <T> Type of the event payload.
383+
*/
384+
<T> Flux<T> subscribeToTopic(String pubsubName, String topic, String deadLetterTopic, TypeRef<T> type);
385+
386+
/**
387+
* Subscribe to pubsub events via streaming using Project Reactor Flux with metadata and dead-letter topic.
388+
*
389+
* @param pubsubName Name of the pubsub component.
390+
* @param topic Name of the topic to subscribe to.
391+
* @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
392+
* @param type Type for object deserialization.
393+
* @param metadata Subscription metadata (e.g., {"rawPayload": "true"}).
394+
* @return A Flux of deserialized event payloads.
395+
* @param <T> Type of the event payload.
396+
*/
397+
<T> Flux<T> subscribeToTopic(
398+
String pubsubName,
399+
String topic,
400+
String deadLetterTopic,
401+
TypeRef<T> type,
402+
Map<String, String> metadata);
403+
355404
/*
356405
* Converse with an LLM.
357406
*

0 commit comments

Comments
 (0)