diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt
index 41b6a7d6d..dc2fe7be8 160000
--- a/crt/aws-c-mqtt
+++ b/crt/aws-c-mqtt
@@ -1 +1 @@
-Subproject commit 41b6a7d6d566a56eff69743df66c077d56a80c9d
+Subproject commit dc2fe7be81070f7c5095ad386d9a23c180a4276b
diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
index 00a9c85a8..7611458c2 100644
--- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
+++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
@@ -202,6 +202,20 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() {
return mqtt5ClientInternalGetOperationStatistics(getNativeHandle());
}
+ /**
+ * Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control.
+ *
+ *
To use manual PUBACK control, call {@link PublishReturn#acquirePubackControl()} within
+ * the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to
+ * obtain a {@link Mqtt5PubackControlHandle}. Then call this method to send the PUBACK.
+ *
+ * @param pubackControlHandle An opaque handle obtained from {@link PublishReturn#acquirePubackControl()}.
+ * @throws CrtRuntimeException If the native client returns an error when invoking the PUBACK.
+ */
+ public void invokePuback(Mqtt5PubackControlHandle pubackControlHandle) throws CrtRuntimeException {
+ mqtt5ClientInternalInvokePuback(getNativeHandle(), pubackControlHandle.getControlId());
+ }
+
/**
* Returns the connectivity state for the Mqtt5Client.
* @return True if the client is connected, false otherwise
@@ -277,4 +291,5 @@ private static native long mqtt5ClientNew(
private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture unsubscribe_suback);
private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException;
private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client);
+ private static native void mqtt5ClientInternalInvokePuback(long client, long controlId);
}
diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java
new file mode 100644
index 000000000..04a77ae8b
--- /dev/null
+++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+package software.amazon.awssdk.crt.mqtt5;
+
+/**
+ * An opaque handle representing manual control over a QoS 1 PUBACK for a received PUBLISH packet.
+ *
+ * This class cannot be instantiated directly. Instances are only created by the CRT library.
+ */
+public class Mqtt5PubackControlHandle {
+
+ private final long controlId;
+
+ /**
+ * Creates a new Mqtt5PubackControlHandle. Only called from native/JNI code.
+ *
+ * @param controlId The native puback control ID returned by aws_mqtt5_client_acquire_puback.
+ */
+ Mqtt5PubackControlHandle(long controlId) {
+ this.controlId = controlId;
+ }
+
+ /**
+ * Returns the native puback control ID. Used internally by JNI.
+ *
+ * @return The native puback control ID.
+ */
+ long getControlId() {
+ return controlId;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java
index 4baf2aa4d..e7ef07f1f 100644
--- a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java
+++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java
@@ -14,6 +14,16 @@
public class PublishReturn {
private PublishPacket publishPacket;
+ /**
+ * Single-element long array holding the native manual PUBACK control context pointer.
+ * Element [0] is the pointer value, valid only during the
+ * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback.
+ * QoS 0 results in this being set to 0.
+ * Native code sets [0] to 0 after the callback returns (via SetLongArrayRegion,
+ * requiring no extra JNI method ID).
+ */
+ private final long[] nativeContextPtrHolder;
+
/**
* Returns the PublishPacket returned from the server or Null if none was returned.
* @return The PublishPacket returned from the server.
@@ -22,12 +32,53 @@ public PublishPacket getPublishPacket() {
return publishPacket;
}
+ /**
+ * Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the
+ * client from automatically sending a PUBACK. The returned handle can be passed to
+ * {@link Mqtt5Client#invokePuback(Mqtt5PubackControlHandle)} at a later time to send the
+ * PUBACK to the broker.
+ *
+ * Important: This method must be called within the
+ * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it after the
+ * callback returns will throw an {@link IllegalStateException}.
+ *
+ * This method may only be called once per received PUBLISH. Subsequent calls will throw
+ * an {@link IllegalStateException}.
+ *
+ * If this method is not called, the client will automatically send a PUBACK for QoS 1
+ * messages when the callback returns.
+ *
+ * @return A {@link Mqtt5PubackControlHandle} that can be used to manually send the PUBACK.
+ * @throws IllegalStateException if called outside the onMessageReceived callback or called more than once.
+ */
+ public synchronized Mqtt5PubackControlHandle acquirePubackControl() {
+ if (nativeContextPtrHolder == null || nativeContextPtrHolder[0] == 0) {
+ throw new IllegalStateException(
+ "acquirePubackControl() must be called within the onMessageReceived callback and may only be called once.");
+ }
+ long controlId = mqtt5AcquirePubackControl(nativeContextPtrHolder[0]);
+ /* We set the array element to 0 so it can't be double-called */
+ nativeContextPtrHolder[0] = 0;
+ return new Mqtt5PubackControlHandle(controlId);
+ }
+
/**
* This is only called in JNI to make a new PublishReturn with a PUBLISH packet.
- * @param newPublishPacket The PubAckPacket data for QoS 1 packets. Can be null if result is non QoS 1.
- * @return A newly created PublishResult
+ * The nativeContextPtrHolder is a single-element long array; native code sets [0] to 0
+ * after the onMessageReceived callback returns to prevent use-after-free.
+ *
+ * @param newPublishPacket The PublishPacket data received from the server.
+ * @param nativeContextPtrHolder Single-element long[] holding the native PUBACK control context pointer.
*/
- private PublishReturn(PublishPacket newPublishPacket) {
+ private PublishReturn(PublishPacket newPublishPacket, long[] nativeContextPtrHolder) {
this.publishPacket = newPublishPacket;
+ this.nativeContextPtrHolder = nativeContextPtrHolder;
}
-}
+
+ /**
+ * Calls the native aws_mqtt5_client_acquire_puback function.
+ * @param nativeContextPtr Pointer to the native manual PUBACK control context.
+ * @return The native puback control ID.
+ */
+ private static native long mqtt5AcquirePubackControl(long nativeContextPtr);
+}
\ No newline at end of file
diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
index 02134b607..7acb15f6f 100644
--- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
+++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
@@ -1549,7 +1549,8 @@
{
"name": "",
"parameterTypes": [
- "software.amazon.awssdk.crt.mqtt5.packets.PublishPacket"
+ "software.amazon.awssdk.crt.mqtt5.packets.PublishPacket",
+ "long[]"
]
}
]
diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c
index 97bc76282..69c66d8ab 100644
--- a/src/native/java_class_ids.c
+++ b/src/native/java_class_ids.c
@@ -2248,12 +2248,17 @@ static void s_cache_mqtt5_publish_return(JNIEnv *env) {
AWS_FATAL_ASSERT(cls);
mqtt5_publish_return_properties.return_class = (*env)->NewGlobalRef(env, cls);
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_class);
- // Functions
+ /*
+ * Constructor: PublishReturn(PublishPacket, long[])
+ * The long[] is a single-element array holding the native context pointer.
+ * Native code sets [0] to 0 via SetLongArrayRegion after the callback returns,
+ * requiring no extra JNI method ID.
+ */
mqtt5_publish_return_properties.return_constructor_id = (*env)->GetMethodID(
env,
mqtt5_publish_return_properties.return_class,
"",
- "(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;)V");
+ "(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;[J)V");
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_constructor_id);
}
diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h
index 3d852b860..4d0358f56 100644
--- a/src/native/java_class_ids.h
+++ b/src/native/java_class_ids.h
@@ -926,7 +926,7 @@ extern struct java_aws_mqtt5_publish_result_properties mqtt5_publish_result_prop
/* mqtt5.PublishReturn */
struct java_aws_mqtt5_publish_return_properties {
jclass return_class;
- jmethodID return_constructor_id;
+ jmethodID return_constructor_id; /* (PublishPacket, long[]) - long[0] holds native context ptr */
};
extern struct java_aws_mqtt5_publish_return_properties mqtt5_publish_return_properties;
diff --git a/src/native/mqtt5_client.c b/src/native/mqtt5_client.c
index 2d59c8b26..6ac9cd6c8 100644
--- a/src/native/mqtt5_client.c
+++ b/src/native/mqtt5_client.c
@@ -38,6 +38,12 @@ struct aws_mqtt5_client_publish_return_data {
jobject jni_publish_future;
};
+/* Context for manual PUBACK control. Valid only during the publish received callback. */
+struct manual_puback_control_context {
+ struct aws_mqtt5_client *client;
+ const struct aws_mqtt5_packet_publish_view *publish_packet;
+};
+
struct aws_mqtt5_client_subscribe_return_data {
struct aws_mqtt5_client_java_jni *java_client;
jobject jni_subscribe_future;
@@ -538,6 +544,9 @@ static void s_aws_mqtt5_client_java_publish_received(
/* One reference is needed for the PublishReturn */
references_needed += 1;
+ /* One reference is needed for the long[] context pointer holder array */
+ references_needed += 1;
+
/* A Publish packet will need 5 references at minimum */
references_needed += 5;
/* Optionals */
@@ -577,12 +586,42 @@ static void s_aws_mqtt5_client_java_publish_received(
goto clean_up;
}
- /* Make the PublishReturn struct that will hold all of the data that is passed to Java */
+ /* Create manual PUBACK control context (valid only during this callback)
+ * We clean this before clean_up since we don't want to create this early and we can be sure to
+ * hit the aws_mem_release AFTER we return from the publish events callback.
+ */
+ struct manual_puback_control_context *control_context =
+ aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct manual_puback_control_context));
+
+ /*
+ * Create a single-element long[] array to hold the native context pointer.
+ * This allows native code to invalidate the pointer after the callback returns
+ * using SetLongArrayRegion(array, 0, 1, &zero) without using an extra JNI method ID.
+ */
+ jlongArray context_ptr_holder = NULL;
+ control_context->client = java_client->client;
+ control_context->publish_packet = publish;
+
+ context_ptr_holder = (*env)->NewLongArray(env, 1);
+ /* If allocation failed, Java will throw IllegalStateException on acquirePubackControl(). */
+ if (context_ptr_holder != NULL) {
+ /*
+ * Only expose the context pointer for QoS 1 publishes. For QoS 0, there is no PUBACK
+ * to send, so acquirePubackControl() should throw IllegalStateException. We pass 0 (null).
+ */
+ jlong context_ptr_value = (publish->qos == AWS_MQTT5_QOS_AT_MOST_ONCE) ? 0 : (jlong)(uintptr_t)control_context;
+ /* Assigning the pointer to the allocated manual_puback_control_context to the single-element of the array */
+ (*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &context_ptr_value);
+ }
+
+ /* Make the PublishReturn struct that will hold all of the data that is passed to Java.
+ * The constructor takes (PublishPacket, long[]) where long[0] is the native context pointer. */
publish_packet_return_data = (*env)->NewObject(
env,
mqtt5_publish_return_properties.return_class,
mqtt5_publish_return_properties.return_constructor_id,
- publish_packet_data);
+ publish_packet_data,
+ context_ptr_holder);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
if (java_client->jni_publish_events) {
@@ -594,6 +633,22 @@ static void s_aws_mqtt5_client_java_publish_received(
publish_packet_return_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
}
+
+ /*
+ * Invalidate the context pointer in the long[] array now that the callback has returned.
+ * This prevents use-after-free if acquirePubackControl() is called after the callback.
+ * SetLongArrayRegion requires no extra JNI method ID.
+ */
+ if (context_ptr_holder != NULL) {
+ jlong zero = 0;
+ (*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &zero);
+ }
+
+ /* Free the context. The publish_packet pointer is no longer valid after this callback returns */
+ if (control_context != NULL) {
+ aws_mem_release(aws_jni_get_allocator(), control_context);
+ }
+
goto clean_up;
clean_up:
@@ -2169,6 +2224,78 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl
}
}
+/*******************************************************************************
+ * Manual PUBACK Control Functions
+ ******************************************************************************/
+
+/**
+ * Called from PublishReturn.mqtt5AcquirePubackControl(long nativeContextPtr).
+ * Calls aws_mqtt5_client_acquire_puback to take manual control of the PUBACK
+ * for the received PUBLISH packet. Returns the puback_control_id as a jlong.
+ *
+ * This must be called within the onMessageReceived callback while the
+ * native context pointer is still valid.
+ */
+JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt5AcquirePubackControl(
+ JNIEnv *env,
+ jclass jni_class,
+ jlong native_context_ptr) {
+ (void)jni_class;
+ aws_cache_jni_ids(env);
+
+ if (native_context_ptr == 0) {
+ aws_jni_throw_runtime_exception(
+ env,
+ "PublishReturn.acquirePubackControl: context is no longer valid. "
+ "acquirePubackControl() must be called within the onMessageReceived callback.");
+ return 0;
+ }
+
+ struct manual_puback_control_context *context =
+ (struct manual_puback_control_context *)(uintptr_t)native_context_ptr;
+
+ if (!context->client || !context->publish_packet) {
+ aws_jni_throw_runtime_exception(
+ env, "PublishReturn.acquirePubackControl: invalid native PUBACK control context");
+ return 0;
+ }
+
+ uint64_t control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet);
+ return (jlong)control_id;
+}
+
+/**
+ * Called from Mqtt5Client.mqtt5ClientInternalInvokePuback(long client, long controlId).
+ * Calls aws_mqtt5_client_invoke_puback to send the PUBACK for a previously acquired
+ * manual PUBACK control handle.
+ */
+JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePuback(
+ JNIEnv *env,
+ jclass jni_class,
+ jlong jni_client,
+ jlong control_id) {
+ (void)jni_class;
+ aws_cache_jni_ids(env);
+
+ struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
+ if (!java_client) {
+ s_aws_mqtt5_client_log_and_throw_exception(
+ env, "Mqtt5Client.invokePuback: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
+ return;
+ }
+ if (!java_client->client) {
+ s_aws_mqtt5_client_log_and_throw_exception(
+ env, "Mqtt5Client.invokePuback: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
+ return;
+ }
+
+ int result = aws_mqtt5_client_invoke_puback(java_client->client, (uint64_t)control_id, NULL);
+ if (result != AWS_OP_SUCCESS) {
+ s_aws_mqtt5_client_log_and_throw_exception(
+ env, "Mqtt5Client.invokePuback: aws_mqtt5_client_invoke_puback failed!", aws_last_error());
+ }
+}
+
#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(pop)
diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java
index a847da0fd..1ed596bb5 100644
--- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java
+++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java
@@ -2546,6 +2546,391 @@ public void QoS1_UC1() throws Exception {
CrtResource.waitForNoResources();
}
+ /**
+ * ============================================================
+ * Manual PUBACK Tests
+ * ============================================================
+ */
+
+ private void doManualPuback_HoldTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_ManualPuback_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ final long PUBACK_HOLD_TIMEOUT_SEC = 60L;
+
+ CompletableFuture firstDeliveryFuture = new CompletableFuture<>();
+ CompletableFuture redeliveryFuture = new CompletableFuture<>();
+ Mqtt5PubackControlHandle[] pubackHandleHolder = new Mqtt5PubackControlHandle[1];
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ ConnectPacketBuilder connectOptions = new ConnectPacketBuilder();
+ connectOptions.withClientId("test/MQTT5_ManualPuback_Java_" + testUUID);
+ builder.withConnectOptions(connectOptions.build());
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ byte[] receivedPayload = publishReturn.getPublishPacket().getPayload();
+ if (!firstDeliveryFuture.isDone()) {
+ // First delivery: acquire manual PUBACK control to hold the PUBACK
+ pubackHandleHolder[0] = publishReturn.acquirePubackControl();
+ firstDeliveryFuture.complete(receivedPayload);
+ } else if (!redeliveryFuture.isDone()) {
+ // Second delivery: broker re-sent because no PUBACK was received
+ redeliveryFuture.complete(receivedPayload);
+ }
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Subscribe to the topic with QoS 1
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Publish a QoS 1 message with a unique UUID payload
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Wait for the first delivery and confirm PUBACK was held
+ byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload));
+ assertNotNull("acquirePubackControl() should have returned a handle", pubackHandleHolder[0]);
+
+ // Wait up to 60 seconds for the broker to re-deliver the message (no PUBACK was sent)
+ byte[] redeliveredPayload = redeliveryFuture.get(PUBACK_HOLD_TIMEOUT_SEC, TimeUnit.SECONDS);
+ assertTrue("Re-delivered payload should match the original UUID payload",
+ java.util.Arrays.equals(redeliveredPayload, payload));
+
+ // Release the held PUBACK now that we've confirmed re-delivery
+ client.invokePuback(pubackHandleHolder[0]);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual PUBACK hold test: hold PUBACK and verify broker re-delivers the message */
+ @Test
+ public void ManualPuback_Hold() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPuback_HoldTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
+ private void doManualPuback_InvokeTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_ManualPuback_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ final long NO_REDELIVERY_WAIT_SEC = 60L;
+
+ CompletableFuture firstDeliveryFuture = new CompletableFuture<>();
+ CompletableFuture unexpectedRedeliveryFuture = new CompletableFuture<>();
+ Mqtt5PubackControlHandle[] pubackHandleHolder = new Mqtt5PubackControlHandle[1];
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ ConnectPacketBuilder connectOptions = new ConnectPacketBuilder();
+ connectOptions.withClientId("test/MQTT5_ManualPuback_Java_" + testUUID);
+ builder.withConnectOptions(connectOptions.build());
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ byte[] receivedPayload = publishReturn.getPublishPacket().getPayload();
+ if (!firstDeliveryFuture.isDone()) {
+ // First delivery: acquire manual PUBACK control, then immediately invoke it
+ pubackHandleHolder[0] = publishReturn.acquirePubackControl();
+ firstDeliveryFuture.complete(receivedPayload);
+ } else if (java.util.Arrays.equals(receivedPayload, payload) && !unexpectedRedeliveryFuture.isDone()) {
+ // A second delivery of the same payload means the broker re-sent. This should NOT happen
+ unexpectedRedeliveryFuture.complete(receivedPayload);
+ }
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Subscribe to the topic with QoS 1
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Publish a QoS 1 message with a unique UUID payload
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Wait for the first delivery and confirm PUBACK handle was acquired
+ byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload));
+ assertNotNull("acquirePubackControl() should have returned a handle", pubackHandleHolder[0]);
+
+ // Immediately invoke the PUBACK using the acquired handle
+ client.invokePuback(pubackHandleHolder[0]);
+
+ // Wait 60 seconds and confirm the broker does NOT re-deliver the message
+ boolean redelivered = false;
+ try {
+ unexpectedRedeliveryFuture.get(NO_REDELIVERY_WAIT_SEC, TimeUnit.SECONDS);
+ redelivered = true;
+ } catch (java.util.concurrent.TimeoutException ex) {
+ redelivered = false;
+ }
+ assertTrue("Broker should NOT re-deliver the message after invokePuback() was called",
+ !redelivered);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual PUBACK invoke test: acquire and immediately invoke PUBACK, verify no re-delivery */
+ @Test
+ public void ManualPuback_Invoke() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPuback_InvokeTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
+ private void doManualPuback_AcquireDoubleCallRaisesTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ CompletableFuture resultFuture = new CompletableFuture<>();
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ try {
+ // First call should succeed
+ Mqtt5PubackControlHandle handle = publishReturn.acquirePubackControl();
+ // Second call on the same message should throw IllegalStateException
+ try {
+ publishReturn.acquirePubackControl();
+ resultFuture.complete("no_error"); // Should not reach here
+ } catch (IllegalStateException ex) {
+ resultFuture.complete("double_call_raised");
+ } catch (Exception ex) {
+ resultFuture.complete("unexpected_error: " + ex.getMessage());
+ }
+ // handle is valid but we don't invoke it here; let it be GC'd
+ } catch (Exception ex) {
+ resultFuture.complete("first_call_failed: " + ex.getMessage());
+ }
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ String result = resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ assertEquals("Expected IllegalStateException on double-call, got: " + result,
+ "double_call_raised", result);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual PUBACK double-call test: calling acquirePubackControl() twice raises IllegalStateException */
+ @Test
+ public void ManualPuback_AcquireDoubleCallRaises() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPuback_AcquireDoubleCallRaisesTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
+ private void doManualPuback_AcquirePostCallbackRaisesTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ CompletableFuture callbackDoneFuture = new CompletableFuture<>();
+ PublishReturn[] savedPublishReturnHolder = new PublishReturn[1];
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ // Save the PublishReturn but do NOT call acquirePubackControl() within the callback
+ savedPublishReturnHolder[0] = publishReturn;
+ callbackDoneFuture.complete(null);
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Wait for the callback to complete
+ callbackDoneFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Call acquirePubackControl() after the callback has returned, this should throw IllegalStateException
+ assertNotNull("savedPublishReturn should have been set", savedPublishReturnHolder[0]);
+ boolean exceptionThrown = false;
+ try {
+ savedPublishReturnHolder[0].acquirePubackControl();
+ } catch (IllegalStateException ex) {
+ exceptionThrown = true;
+ }
+ assertTrue("acquirePubackControl() should throw IllegalStateException after callback returns",
+ exceptionThrown);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual PUBACK post-callback test: calling acquirePubackControl() after callback returns raises IllegalStateException */
+ @Test
+ public void ManualPuback_AcquirePostCallbackRaises() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPuback_AcquirePostCallbackRaisesTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
+ private void doManualPuback_Qos0AcquireThrowsTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ CompletableFuture resultFuture = new CompletableFuture<>();
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ // For QoS 0, acquirePubackControl() should throw IllegalStateException
+ // because the native layer passes a null context pointer for QoS 0 publishes.
+ try {
+ publishReturn.acquirePubackControl();
+ resultFuture.complete("no_error"); // Should not reach here
+ } catch (IllegalStateException ex) {
+ resultFuture.complete("threw_illegal_state");
+ } catch (Exception ex) {
+ resultFuture.complete("unexpected_error: " + ex.getMessage());
+ }
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Subscribe with QoS 1 so the broker delivers at QoS 0 (publish at QoS 0)
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Publish at QoS 0, there is no PUBACK involved
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_MOST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ String result = resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ assertEquals("acquirePubackControl() should throw IllegalStateException for QoS 0 messages, got: " + result,
+ "threw_illegal_state", result);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual PUBACK QoS 0 test: acquirePubackControl() throws IllegalStateException for QoS 0 messages */
+ @Test
+ public void ManualPuback_Qos0AcquireThrows() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPuback_Qos0AcquireThrowsTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
/**
* ============================================================
* Retain Tests