From cadd51600c9248882137ba1493013b0e82650d1e Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 4 Mar 2026 13:59:45 -0800 Subject: [PATCH 01/12] bind manual puback --- crt/aws-c-mqtt | 2 +- .../amazon/awssdk/crt/mqtt5/Mqtt5Client.java | 15 +++ .../crt/mqtt5/Mqtt5PubackControlHandle.java | 33 +++++ .../awssdk/crt/mqtt5/PublishReturn.java | 58 +++++++- src/native/java_class_ids.c | 9 +- src/native/java_class_ids.h | 2 +- src/native/mqtt5_client.c | 127 +++++++++++++++++- 7 files changed, 236 insertions(+), 10 deletions(-) create mode 100644 src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 1d512d927..37741c07a 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 1d512d92709f60b74e2cafa018e69a2e647f28e9 +Subproject commit 37741c07a23d35a700100a8fa6628127673ed012 diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java index 00a9c85a8..7611458c2 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java @@ -202,6 +202,20 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() { return mqtt5ClientInternalGetOperationStatistics(getNativeHandle()); } + /** + * Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control. + * + *

To use manual PUBACK control, call {@link PublishReturn#acquirePubackControl()} within + * the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to + * obtain a {@link Mqtt5PubackControlHandle}. Then call this method to send the PUBACK.

+ * + * @param pubackControlHandle An opaque handle obtained from {@link PublishReturn#acquirePubackControl()}. + * @throws CrtRuntimeException If the native client returns an error when invoking the PUBACK. + */ + public void invokePuback(Mqtt5PubackControlHandle pubackControlHandle) throws CrtRuntimeException { + mqtt5ClientInternalInvokePuback(getNativeHandle(), pubackControlHandle.getControlId()); + } + /** * Returns the connectivity state for the Mqtt5Client. * @return True if the client is connected, false otherwise @@ -277,4 +291,5 @@ private static native long mqtt5ClientNew( private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture unsubscribe_suback); private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException; private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client); + private static native void mqtt5ClientInternalInvokePuback(long client, long controlId); } diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java new file mode 100644 index 000000000..04a77ae8b --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java @@ -0,0 +1,33 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.mqtt5; + +/** + * An opaque handle representing manual control over a QoS 1 PUBACK for a received PUBLISH packet. + * + *

This class cannot be instantiated directly. Instances are only created by the CRT library.

+ */ +public class Mqtt5PubackControlHandle { + + private final long controlId; + + /** + * Creates a new Mqtt5PubackControlHandle. Only called from native/JNI code. + * + * @param controlId The native puback control ID returned by aws_mqtt5_client_acquire_puback. + */ + Mqtt5PubackControlHandle(long controlId) { + this.controlId = controlId; + } + + /** + * Returns the native puback control ID. Used internally by JNI. + * + * @return The native puback control ID. + */ + long getControlId() { + return controlId; + } +} \ No newline at end of file diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java index 4baf2aa4d..fe0c72b6e 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java @@ -14,6 +14,15 @@ public class PublishReturn { private PublishPacket publishPacket; + /** + * Single-element long array holding the native manual PUBACK control context pointer. + * Element [0] is the pointer value, valid only during the + * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. + * Native code sets [0] to 0 after the callback returns (via SetLongArrayRegion, + * requiring no extra JNI method ID). + */ + private final long[] nativeContextPtrHolder; + /** * Returns the PublishPacket returned from the server or Null if none was returned. * @return The PublishPacket returned from the server. @@ -22,12 +31,53 @@ public PublishPacket getPublishPacket() { return publishPacket; } + /** + * Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the + * client from automatically sending a PUBACK. The returned handle can be passed to + * {@link Mqtt5Client#invokePuback(Mqtt5PubackControlHandle)} at a later time to send the + * PUBACK to the broker. + * + *

Important: This method must be called within the + * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it after the + * callback returns will throw an {@link IllegalStateException}.

+ * + *

This method may only be called once per received PUBLISH. Subsequent calls will throw + * an {@link IllegalStateException}.

+ * + *

If this method is not called, the client will automatically send a PUBACK for QoS 1 + * messages when the callback returns.

+ * + * @return A {@link Mqtt5PubackControlHandle} that can be used to manually send the PUBACK. + * @throws IllegalStateException if called outside the onMessageReceived callback or called more than once. + */ + public synchronized Mqtt5PubackControlHandle acquirePubackControl() { + if (nativeContextPtrHolder == null || nativeContextPtrHolder[0] == 0) { + throw new IllegalStateException( + "acquirePubackControl() must be called within the onMessageReceived callback and may only be called once."); + } + long controlId = mqtt5AcquirePubackControl(nativeContextPtrHolder[0]); + /* We set the array element to 0 so it can't be double-called */ + nativeContextPtrHolder[0] = 0; + return new Mqtt5PubackControlHandle(controlId); + } + /** * This is only called in JNI to make a new PublishReturn with a PUBLISH packet. - * @param newPublishPacket The PubAckPacket data for QoS 1 packets. Can be null if result is non QoS 1. - * @return A newly created PublishResult + * The nativeContextPtrHolder is a single-element long array; native code sets [0] to 0 + * after the onMessageReceived callback returns to prevent use-after-free. + * + * @param newPublishPacket The PublishPacket data received from the server. + * @param nativeContextPtrHolder Single-element long[] holding the native PUBACK control context pointer. */ - private PublishReturn(PublishPacket newPublishPacket) { + private PublishReturn(PublishPacket newPublishPacket, long[] nativeContextPtrHolder) { this.publishPacket = newPublishPacket; + this.nativeContextPtrHolder = nativeContextPtrHolder; } -} + + /** + * Calls the native aws_mqtt5_client_acquire_puback function. + * @param nativeContextPtr Pointer to the native manual PUBACK control context. + * @return The native puback control ID. + */ + private static native long mqtt5AcquirePubackControl(long nativeContextPtr); +} \ No newline at end of file diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index 97bc76282..69c66d8ab 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -2248,12 +2248,17 @@ static void s_cache_mqtt5_publish_return(JNIEnv *env) { AWS_FATAL_ASSERT(cls); mqtt5_publish_return_properties.return_class = (*env)->NewGlobalRef(env, cls); AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_class); - // Functions + /* + * Constructor: PublishReturn(PublishPacket, long[]) + * The long[] is a single-element array holding the native context pointer. + * Native code sets [0] to 0 via SetLongArrayRegion after the callback returns, + * requiring no extra JNI method ID. + */ mqtt5_publish_return_properties.return_constructor_id = (*env)->GetMethodID( env, mqtt5_publish_return_properties.return_class, "", - "(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;)V"); + "(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;[J)V"); AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_constructor_id); } diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 3d852b860..4d0358f56 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -926,7 +926,7 @@ extern struct java_aws_mqtt5_publish_result_properties mqtt5_publish_result_prop /* mqtt5.PublishReturn */ struct java_aws_mqtt5_publish_return_properties { jclass return_class; - jmethodID return_constructor_id; + jmethodID return_constructor_id; /* (PublishPacket, long[]) - long[0] holds native context ptr */ }; extern struct java_aws_mqtt5_publish_return_properties mqtt5_publish_return_properties; diff --git a/src/native/mqtt5_client.c b/src/native/mqtt5_client.c index 2d59c8b26..1729e385f 100644 --- a/src/native/mqtt5_client.c +++ b/src/native/mqtt5_client.c @@ -38,6 +38,12 @@ struct aws_mqtt5_client_publish_return_data { jobject jni_publish_future; }; +/* Context for manual PUBACK control. Valid only during the publish received callback. */ +struct manual_puback_control_context { + struct aws_mqtt5_client *client; + const struct aws_mqtt5_packet_publish_view *publish_packet; +}; + struct aws_mqtt5_client_subscribe_return_data { struct aws_mqtt5_client_java_jni *java_client; jobject jni_subscribe_future; @@ -538,6 +544,9 @@ static void s_aws_mqtt5_client_java_publish_received( /* One reference is needed for the PublishReturn */ references_needed += 1; + /* One reference is needed for the long[] context pointer holder array */ + references_needed += 1; + /* A Publish packet will need 5 references at minimum */ references_needed += 5; /* Optionals */ @@ -577,12 +586,38 @@ static void s_aws_mqtt5_client_java_publish_received( goto clean_up; } - /* Make the PublishReturn struct that will hold all of the data that is passed to Java */ + /* Create manual PUBACK control context (valid only during this callback) + * We clean this before clean_up since we don't want to create this early and we can be sure to + * hit the aws_mem_release AFTER we return from the publish events callback. + */ + struct manual_puback_control_context *control_context = + aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct manual_puback_control_context)); + + /* + * Create a single-element long[] array to hold the native context pointer. + * This allows native code to invalidate the pointer after the callback returns + * using SetLongArrayRegion(array, 0, 1, &zero) without using an extra JNI method ID. + */ + jlongArray context_ptr_holder = NULL; + control_context->client = java_client->client; + control_context->publish_packet = publish; + + context_ptr_holder = (*env)->NewLongArray(env, 1); + /* If allocation failed, Java will throw IllegalStateException on acquirePubackControl(). */ + if (context_ptr_holder != NULL) { + jlong context_ptr_value = (jlong)(uintptr_t)control_context; + /* Assigning the pointer to the allocated manual_puback_control_context to the single-element of the array */ + (*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &context_ptr_value); + } + + /* Make the PublishReturn struct that will hold all of the data that is passed to Java. + * The constructor takes (PublishPacket, long[]) where long[0] is the native context pointer. */ publish_packet_return_data = (*env)->NewObject( env, mqtt5_publish_return_properties.return_class, mqtt5_publish_return_properties.return_constructor_id, - publish_packet_data); + publish_packet_data, + context_ptr_holder); aws_jni_check_and_clear_exception(env); /* To hide JNI warning */ if (java_client->jni_publish_events) { @@ -594,6 +629,22 @@ static void s_aws_mqtt5_client_java_publish_received( publish_packet_return_data); aws_jni_check_and_clear_exception(env); /* To hide JNI warning */ } + + /* + * Invalidate the context pointer in the long[] array now that the callback has returned. + * This prevents use-after-free if acquirePubackControl() is called after the callback. + * SetLongArrayRegion requires no extra JNI method ID. + */ + if (context_ptr_holder != NULL) { + jlong zero = 0; + (*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &zero); + } + + /* Free the context. The publish_packet pointer is no longer valid after this callback returns */ + if (control_context != NULL) { + aws_mem_release(aws_jni_get_allocator(), control_context); + } + goto clean_up; clean_up: @@ -2169,6 +2220,78 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl } } +/******************************************************************************* + * Manual PUBACK Control Functions + ******************************************************************************/ + +/** + * Called from PublishReturn.mqtt5AcquirePubackControl(long nativeContextPtr). + * Calls aws_mqtt5_client_acquire_puback to take manual control of the PUBACK + * for the received PUBLISH packet. Returns the puback_control_id as a jlong. + * + * This must be called within the onMessageReceived callback while the + * native context pointer is still valid. + */ +JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt5AcquirePubackControl( + JNIEnv *env, + jclass jni_class, + jlong native_context_ptr) { + (void)jni_class; + aws_cache_jni_ids(env); + + if (native_context_ptr == 0) { + aws_jni_throw_runtime_exception( + env, + "PublishReturn.acquirePubackControl: context is no longer valid. " + "acquirePubackControl() must be called within the onMessageReceived callback."); + return 0; + } + + struct manual_puback_control_context *context = + (struct manual_puback_control_context *)(uintptr_t)native_context_ptr; + + if (!context->client || !context->publish_packet) { + aws_jni_throw_runtime_exception( + env, "PublishReturn.acquirePubackControl: invalid native PUBACK control context"); + return 0; + } + + uint64_t control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet); + return (jlong)control_id; +} + +/** + * Called from Mqtt5Client.mqtt5ClientInternalInvokePuback(long client, long controlId). + * Calls aws_mqtt5_client_invoke_puback to send the PUBACK for a previously acquired + * manual PUBACK control handle. + */ +JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePuback( + JNIEnv *env, + jclass jni_class, + jlong jni_client, + jlong control_id) { + (void)jni_class; + aws_cache_jni_ids(env); + + struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client; + if (!java_client) { + s_aws_mqtt5_client_log_and_throw_exception( + env, "Mqtt5Client.invokePuback: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT); + return; + } + if (!java_client->client) { + s_aws_mqtt5_client_log_and_throw_exception( + env, "Mqtt5Client.invokePuback: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT); + return; + } + + int result = aws_mqtt5_client_invoke_puback(java_client->client, (uint64_t)control_id, NULL); + if (result != AWS_OP_SUCCESS) { + s_aws_mqtt5_client_log_and_throw_exception( + env, "Mqtt5Client.invokePuback: aws_mqtt5_client_invoke_puback failed!", aws_last_error()); + } +} + #if UINTPTR_MAX == 0xffffffff # if defined(_MSC_VER) # pragma warning(pop) From cc1060082a0294c5dbc29ddbf3e25b2648441928 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 4 Mar 2026 14:18:04 -0800 Subject: [PATCH 02/12] GraalVM uses ahead of time compilation requiring the jni config to be used. We changed our internal PublishReturn constructor which required changes --- .../software.amazon.awssdk/crt/aws-crt/jni-config.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json index 02134b607..7acb15f6f 100644 --- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json +++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json @@ -1549,7 +1549,8 @@ { "name": "", "parameterTypes": [ - "software.amazon.awssdk.crt.mqtt5.packets.PublishPacket" + "software.amazon.awssdk.crt.mqtt5.packets.PublishPacket", + "long[]" ] } ] From 45286e29cdfb18d1a098395ffbbc386f1c35c975 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 5 Mar 2026 09:53:23 -0800 Subject: [PATCH 03/12] merge with main --- crt/aws-c-auth | 2 +- crt/aws-c-event-stream | 2 +- crt/aws-c-http | 2 +- crt/aws-lc | 2 +- crt/s2n | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crt/aws-c-auth b/crt/aws-c-auth index 5aefd277c..a4409b95d 160000 --- a/crt/aws-c-auth +++ b/crt/aws-c-auth @@ -1 +1 @@ -Subproject commit 5aefd277cfbf049933df13b1ee8439edccbf0964 +Subproject commit a4409b95dad40a45b81e5fc9ff96f41386845e4f diff --git a/crt/aws-c-event-stream b/crt/aws-c-event-stream index c741f95e9..f43a3d24a 160000 --- a/crt/aws-c-event-stream +++ b/crt/aws-c-event-stream @@ -1 +1 @@ -Subproject commit c741f95e9050a1a4bed4b3aa7543bd3e024f6e56 +Subproject commit f43a3d24a7c1f8b50f709ccb4fdf4c7fd2827fff diff --git a/crt/aws-c-http b/crt/aws-c-http index 0d8e1a933..a9745ea99 160000 --- a/crt/aws-c-http +++ b/crt/aws-c-http @@ -1 +1 @@ -Subproject commit 0d8e1a933f46b8af984dfc8168ebcdf32748c184 +Subproject commit a9745ea9998f679cd7456e7d23cc8820e38c97d4 diff --git a/crt/aws-lc b/crt/aws-lc index e50a5f29e..ae0cd62f4 160000 --- a/crt/aws-lc +++ b/crt/aws-lc @@ -1 +1 @@ -Subproject commit e50a5f29ee416a7c99be4e72957e8f96aa51dbb9 +Subproject commit ae0cd62f49db4c32b77cb8ed080013bfb44ed90d diff --git a/crt/s2n b/crt/s2n index f5e5e8303..3276a0876 160000 --- a/crt/s2n +++ b/crt/s2n @@ -1 +1 @@ -Subproject commit f5e5e83031be60691f22442373fb8371274fcd56 +Subproject commit 3276a0876054e9efbeab4a42f34ef60b0bf58c91 From 9709faaab2795323f1c7db149afaebe5c13208b8 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 10:47:28 -0700 Subject: [PATCH 04/12] point to new version of aws-c-mqtt --- crt/aws-c-mqtt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 37741c07a..dc2fe7be8 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 37741c07a23d35a700100a8fa6628127673ed012 +Subproject commit dc2fe7be81070f7c5095ad386d9a23c180a4276b From 23a857369c6271553fbda0c9fea45075c4e673be Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 10:50:20 -0700 Subject: [PATCH 05/12] aws-c-auth v0.10.0 --- crt/aws-c-auth | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-c-auth b/crt/aws-c-auth index a4409b95d..5aefd277c 160000 --- a/crt/aws-c-auth +++ b/crt/aws-c-auth @@ -1 +1 @@ -Subproject commit a4409b95dad40a45b81e5fc9ff96f41386845e4f +Subproject commit 5aefd277cfbf049933df13b1ee8439edccbf0964 From a1646815b99273ac52cb44cc9387b4fe9bf702d4 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 10:53:08 -0700 Subject: [PATCH 06/12] aws-c-event-stream v0.6.0 --- crt/aws-c-event-stream | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-c-event-stream b/crt/aws-c-event-stream index f43a3d24a..c741f95e9 160000 --- a/crt/aws-c-event-stream +++ b/crt/aws-c-event-stream @@ -1 +1 @@ -Subproject commit f43a3d24a7c1f8b50f709ccb4fdf4c7fd2827fff +Subproject commit c741f95e9050a1a4bed4b3aa7543bd3e024f6e56 From 13d973e08cbfb5e55c7a0865425c2c0dab9c0e4e Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 11:14:01 -0700 Subject: [PATCH 07/12] aws-c-http v0.10.11 --- crt/aws-c-http | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-c-http b/crt/aws-c-http index a9745ea99..0d8e1a933 160000 --- a/crt/aws-c-http +++ b/crt/aws-c-http @@ -1 +1 @@ -Subproject commit a9745ea9998f679cd7456e7d23cc8820e38c97d4 +Subproject commit 0d8e1a933f46b8af984dfc8168ebcdf32748c184 From bcb8ecccc30c0c5ba8581c2acba34596fb306924 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 11:29:20 -0700 Subject: [PATCH 08/12] aws-lc e50a5f29ee416a7c99be4e72957e8f96aa51dbb9 --- crt/aws-lc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-lc b/crt/aws-lc index ae0cd62f4..e50a5f29e 160000 --- a/crt/aws-lc +++ b/crt/aws-lc @@ -1 +1 @@ -Subproject commit ae0cd62f49db4c32b77cb8ed080013bfb44ed90d +Subproject commit e50a5f29ee416a7c99be4e72957e8f96aa51dbb9 From 42dfa69832e5bf88e3b6af5078d8073df090a945 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 12:57:47 -0700 Subject: [PATCH 09/12] s2n v1.7.0 --- crt/s2n | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/s2n b/crt/s2n index 3276a0876..f5e5e8303 160000 --- a/crt/s2n +++ b/crt/s2n @@ -1 +1 @@ -Subproject commit 3276a0876054e9efbeab4a42f34ef60b0bf58c91 +Subproject commit f5e5e83031be60691f22442373fb8371274fcd56 From f66b9417ea1d10a8db59e125d40446c15133107c Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Fri, 13 Mar 2026 11:17:21 -0700 Subject: [PATCH 10/12] add manual puback test suite --- .../awssdk/crt/test/Mqtt5ClientTest.java | 385 ++++++++++++++++++ 1 file changed, 385 insertions(+) diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index a847da0fd..8ef939f8c 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -2546,6 +2546,391 @@ public void QoS1_UC1() throws Exception { CrtResource.waitForNoResources(); } + /** + * ============================================================ + * Manual PUBACK Tests + * ============================================================ + */ + + private void doManualPuback_HoldTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_ManualPuback_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + final long PUBACK_HOLD_TIMEOUT_SEC = 60L; + + CompletableFuture firstDeliveryFuture = new CompletableFuture<>(); + CompletableFuture redeliveryFuture = new CompletableFuture<>(); + Mqtt5PubackControlHandle[] pubackHandleHolder = new Mqtt5PubackControlHandle[1]; + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); + connectOptions.withClientId("test/MQTT5_ManualPuback_Java_" + testUUID); + builder.withConnectOptions(connectOptions.build()); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + byte[] receivedPayload = publishReturn.getPublishPacket().getPayload(); + if (!firstDeliveryFuture.isDone()) { + // First delivery: acquire manual PUBACK control to hold the PUBACK + pubackHandleHolder[0] = publishReturn.acquirePubackControl(); + firstDeliveryFuture.complete(receivedPayload); + } else if (!redeliveryFuture.isDone()) { + // Second delivery: broker re-sent because no PUBACK was received + redeliveryFuture.complete(receivedPayload); + } + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Subscribe to the topic with QoS 1 + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Publish a QoS 1 message with a unique UUID payload + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Wait for the first delivery and confirm PUBACK was held + byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload)); + assertNotNull("acquirePubackControl() should have returned a handle", pubackHandleHolder[0]); + + // Wait up to 60 seconds for the broker to re-deliver the message (no PUBACK was sent) + byte[] redeliveredPayload = redeliveryFuture.get(PUBACK_HOLD_TIMEOUT_SEC, TimeUnit.SECONDS); + assertTrue("Re-delivered payload should match the original UUID payload", + java.util.Arrays.equals(redeliveredPayload, payload)); + + // Release the held PUBACK now that we've confirmed re-delivery + client.invokePuback(pubackHandleHolder[0]); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual PUBACK hold test: hold PUBACK and verify broker re-delivers the message */ + @Test + public void ManualPuback_Hold() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPuback_HoldTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + + private void doManualPuback_InvokeTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_ManualPuback_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + final long NO_REDELIVERY_WAIT_SEC = 60L; + + CompletableFuture firstDeliveryFuture = new CompletableFuture<>(); + CompletableFuture unexpectedRedeliveryFuture = new CompletableFuture<>(); + Mqtt5PubackControlHandle[] pubackHandleHolder = new Mqtt5PubackControlHandle[1]; + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); + connectOptions.withClientId("test/MQTT5_ManualPuback_Java_" + testUUID); + builder.withConnectOptions(connectOptions.build()); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + byte[] receivedPayload = publishReturn.getPublishPacket().getPayload(); + if (!firstDeliveryFuture.isDone()) { + // First delivery: acquire manual PUBACK control, then immediately invoke it + pubackHandleHolder[0] = publishReturn.acquirePubackControl(); + firstDeliveryFuture.complete(receivedPayload); + } else if (java.util.Arrays.equals(receivedPayload, payload) && !unexpectedRedeliveryFuture.isDone()) { + // A second delivery of the same payload means the broker re-sent. This should NOT happen + unexpectedRedeliveryFuture.complete(receivedPayload); + } + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Subscribe to the topic with QoS 1 + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Publish a QoS 1 message with a unique UUID payload + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Wait for the first delivery and confirm PUBACK handle was acquired + byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload)); + assertNotNull("acquirePubackControl() should have returned a handle", pubackHandleHolder[0]); + + // Immediately invoke the PUBACK using the acquired handle + client.invokePuback(pubackHandleHolder[0]); + + // Wait 60 seconds and confirm the broker does NOT re-deliver the message + boolean redelivered = false; + try { + unexpectedRedeliveryFuture.get(NO_REDELIVERY_WAIT_SEC, TimeUnit.SECONDS); + redelivered = true; + } catch (java.util.concurrent.TimeoutException ex) { + redelivered = false; + } + assertTrue("Broker should NOT re-deliver the message after invokePuback() was called", + !redelivered); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual PUBACK invoke test: acquire and immediately invoke PUBACK, verify no re-delivery */ + @Test + public void ManualPuback_Invoke() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPuback_InvokeTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + + private void doManualPuback_AcquireDoubleCallRaisesTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_Binding_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + CompletableFuture resultFuture = new CompletableFuture<>(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + try { + // First call should succeed + Mqtt5PubackControlHandle handle = publishReturn.acquirePubackControl(); + // Second call on the same message should throw IllegalStateException + try { + publishReturn.acquirePubackControl(); + resultFuture.complete("no_error"); // Should not reach here + } catch (IllegalStateException ex) { + resultFuture.complete("double_call_raised"); + } catch (Exception ex) { + resultFuture.complete("unexpected_error: " + ex.getMessage()); + } + // handle is valid but we don't invoke it here; let it be GC'd + } catch (Exception ex) { + resultFuture.complete("first_call_failed: " + ex.getMessage()); + } + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + String result = resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + assertEquals("Expected IllegalStateException on double-call, got: " + result, + "double_call_raised", result); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual PUBACK double-call test: calling acquirePubackControl() twice raises IllegalStateException */ + @Test + public void ManualPuback_AcquireDoubleCallRaises() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPuback_AcquireDoubleCallRaisesTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + + private void doManualPuback_AcquirePostCallbackRaisesTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_Binding_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + CompletableFuture callbackDoneFuture = new CompletableFuture<>(); + PublishReturn[] savedPublishReturnHolder = new PublishReturn[1]; + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + // Save the PublishReturn but do NOT call acquirePubackControl() within the callback + savedPublishReturnHolder[0] = publishReturn; + callbackDoneFuture.complete(null); + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Wait for the callback to complete + callbackDoneFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Call acquirePubackControl() after the callback has returned, this should throw IllegalStateException + assertNotNull("savedPublishReturn should have been set", savedPublishReturnHolder[0]); + boolean exceptionThrown = false; + try { + savedPublishReturnHolder[0].acquirePubackControl(); + } catch (IllegalStateException ex) { + exceptionThrown = true; + } + assertTrue("acquirePubackControl() should throw IllegalStateException after callback returns", + exceptionThrown); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual PUBACK post-callback test: calling acquirePubackControl() after callback returns raises IllegalStateException */ + @Test + public void ManualPuback_AcquirePostCallbackRaises() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPuback_AcquirePostCallbackRaisesTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + + private void doManualPuback_Qos0AcquireThrowsTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_Binding_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + CompletableFuture resultFuture = new CompletableFuture<>(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + // For QoS 0, acquirePubackControl() should throw IllegalStateException + // because there is no PUBACK context (nativeContextPtrHolder is null or 0) + try { + publishReturn.acquirePubackControl(); + resultFuture.complete("no_error"); // Should not reach here + } catch (IllegalStateException ex) { + resultFuture.complete("threw_illegal_state"); + } catch (Exception ex) { + resultFuture.complete("unexpected_error: " + ex.getMessage()); + } + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Subscribe with QoS 1 so the broker delivers at QoS 0 (publish at QoS 0) + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Publish at QoS 0, no PUBACK involved + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_MOST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + String result = resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + assertEquals("acquirePubackControl() should throw IllegalStateException for QoS 0 messages, got: " + result, + "threw_illegal_state", result); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual PUBACK QoS 0 test: acquirePubackControl() throws IllegalStateException for QoS 0 messages */ + @Test + public void ManualPuback_Qos0AcquireThrows() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPuback_Qos0AcquireThrowsTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + /** * ============================================================ * Retain Tests From f3484fe160e095ae989431340fe8741a32be5169 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Fri, 13 Mar 2026 11:47:32 -0700 Subject: [PATCH 11/12] forgot to set the acquire puback to 0 for qos 0 publishes --- .../software/amazon/awssdk/crt/mqtt5/PublishReturn.java | 1 + src/native/mqtt5_client.c | 6 +++++- .../software/amazon/awssdk/crt/test/Mqtt5ClientTest.java | 4 ++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java index fe0c72b6e..e7ef07f1f 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java @@ -18,6 +18,7 @@ public class PublishReturn { * Single-element long array holding the native manual PUBACK control context pointer. * Element [0] is the pointer value, valid only during the * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. + * QoS 0 results in this being set to 0. * Native code sets [0] to 0 after the callback returns (via SetLongArrayRegion, * requiring no extra JNI method ID). */ diff --git a/src/native/mqtt5_client.c b/src/native/mqtt5_client.c index 1729e385f..6ac9cd6c8 100644 --- a/src/native/mqtt5_client.c +++ b/src/native/mqtt5_client.c @@ -605,7 +605,11 @@ static void s_aws_mqtt5_client_java_publish_received( context_ptr_holder = (*env)->NewLongArray(env, 1); /* If allocation failed, Java will throw IllegalStateException on acquirePubackControl(). */ if (context_ptr_holder != NULL) { - jlong context_ptr_value = (jlong)(uintptr_t)control_context; + /* + * Only expose the context pointer for QoS 1 publishes. For QoS 0, there is no PUBACK + * to send, so acquirePubackControl() should throw IllegalStateException. We pass 0 (null). + */ + jlong context_ptr_value = (publish->qos == AWS_MQTT5_QOS_AT_MOST_ONCE) ? 0 : (jlong)(uintptr_t)control_context; /* Assigning the pointer to the allocated manual_puback_control_context to the single-element of the array */ (*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &context_ptr_value); } diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index 8ef939f8c..1ed596bb5 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -2884,7 +2884,7 @@ private void doManualPuback_Qos0AcquireThrowsTest() { @Override public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { // For QoS 0, acquirePubackControl() should throw IllegalStateException - // because there is no PUBACK context (nativeContextPtrHolder is null or 0) + // because the native layer passes a null context pointer for QoS 0 publishes. try { publishReturn.acquirePubackControl(); resultFuture.complete("no_error"); // Should not reach here @@ -2904,7 +2904,7 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - // Publish at QoS 0, no PUBACK involved + // Publish at QoS 0, there is no PUBACK involved PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_MOST_ONCE, payload); client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); From 48752500bee5842a58ae4b679bc6c3c2057f42df Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 18 Mar 2026 16:47:13 -0700 Subject: [PATCH 12/12] puback to publish acknowledgement. aws-c-mqtt -> v0.15.2 --- crt/aws-c-mqtt | 2 +- .../amazon/awssdk/crt/mqtt5/Mqtt5Client.java | 21 ++--- .../crt/mqtt5/Mqtt5PubackControlHandle.java | 33 -------- ...t5PublishAcknowledgementControlHandle.java | 35 +++++++++ .../awssdk/crt/mqtt5/PublishReturn.java | 36 +++++---- src/native/mqtt5_client.c | 72 +++++++++-------- .../awssdk/crt/test/Mqtt5ClientTest.java | 78 +++++++++---------- 7 files changed, 145 insertions(+), 132 deletions(-) delete mode 100644 src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java create mode 100644 src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index dc2fe7be8..3c2ceee52 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit dc2fe7be81070f7c5095ad386d9a23c180a4276b +Subproject commit 3c2ceee52b66db42228053a4fb55210c8f8433a0 diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java index 7611458c2..49b5adc6d 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java @@ -203,17 +203,20 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() { } /** - * Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control. + * Sends a publish acknowledgement for a QoS 1 PUBLISH that was previously acquired + * for manual control. * - *

To use manual PUBACK control, call {@link PublishReturn#acquirePubackControl()} within - * the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to - * obtain a {@link Mqtt5PubackControlHandle}. Then call this method to send the PUBACK.

+ *

To use manual publish acknowledgement control, call + * {@link PublishReturn#acquirePublishAcknowledgementControl()} within the + * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to obtain a + * {@link Mqtt5PublishAcknowledgementControlHandle}. Then call this method to send the PUBACK.

* - * @param pubackControlHandle An opaque handle obtained from {@link PublishReturn#acquirePubackControl()}. - * @throws CrtRuntimeException If the native client returns an error when invoking the PUBACK. + * @param publishAcknowledgementControlHandle An opaque handle obtained from + * {@link PublishReturn#acquirePublishAcknowledgementControl()}. + * @throws CrtRuntimeException If the native client returns an error when invoking the publish acknowledgement. */ - public void invokePuback(Mqtt5PubackControlHandle pubackControlHandle) throws CrtRuntimeException { - mqtt5ClientInternalInvokePuback(getNativeHandle(), pubackControlHandle.getControlId()); + public void invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle publishAcknowledgementControlHandle) throws CrtRuntimeException { + mqtt5ClientInternalInvokePublishAcknowledgement(getNativeHandle(), publishAcknowledgementControlHandle.getControlId()); } /** @@ -291,5 +294,5 @@ private static native long mqtt5ClientNew( private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture unsubscribe_suback); private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException; private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client); - private static native void mqtt5ClientInternalInvokePuback(long client, long controlId); + private static native void mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId); } diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java deleted file mode 100644 index 04a77ae8b..000000000 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ -package software.amazon.awssdk.crt.mqtt5; - -/** - * An opaque handle representing manual control over a QoS 1 PUBACK for a received PUBLISH packet. - * - *

This class cannot be instantiated directly. Instances are only created by the CRT library.

- */ -public class Mqtt5PubackControlHandle { - - private final long controlId; - - /** - * Creates a new Mqtt5PubackControlHandle. Only called from native/JNI code. - * - * @param controlId The native puback control ID returned by aws_mqtt5_client_acquire_puback. - */ - Mqtt5PubackControlHandle(long controlId) { - this.controlId = controlId; - } - - /** - * Returns the native puback control ID. Used internally by JNI. - * - * @return The native puback control ID. - */ - long getControlId() { - return controlId; - } -} \ No newline at end of file diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java new file mode 100644 index 000000000..7b3333c9b --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java @@ -0,0 +1,35 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.mqtt5; + +/** + * An opaque handle representing manual control over a publish acknowledgement for a received + * PUBLISH packet. + * + *

This class cannot be instantiated directly. Instances are only created by the CRT library.

+ */ +public class Mqtt5PublishAcknowledgementControlHandle { + + private final long controlId; + + /** + * Creates a new Mqtt5PublishAcknowledgementControlHandle. Only called from native/JNI code. + * + * @param controlId The native publish acknowledgement control ID returned by + * aws_mqtt5_client_acquire_publish_acknowledgement. + */ + Mqtt5PublishAcknowledgementControlHandle(long controlId) { + this.controlId = controlId; + } + + /** + * Returns the native publish acknowledgement control ID. Used internally by JNI. + * + * @return The native publish acknowledgement control ID. + */ + long getControlId() { + return controlId; + } +} diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java index e7ef07f1f..5d7036f15 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java @@ -15,7 +15,7 @@ public class PublishReturn { private PublishPacket publishPacket; /** - * Single-element long array holding the native manual PUBACK control context pointer. + * Single-element long array holding the native manual publish acknowledgement control context pointer. * Element [0] is the pointer value, valid only during the * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. * QoS 0 results in this being set to 0. @@ -33,10 +33,10 @@ public PublishPacket getPublishPacket() { } /** - * Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the - * client from automatically sending a PUBACK. The returned handle can be passed to - * {@link Mqtt5Client#invokePuback(Mqtt5PubackControlHandle)} at a later time to send the - * PUBACK to the broker. + * Acquires manual control over the publish acknowledgement (PUBACK) for this PUBLISH message, + * preventing the client from automatically sending a acknowledgement. The returned handle can be passed to + * {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)} at a later + * time to send the PUBACK to the broker. * *

Important: This method must be called within the * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it after the @@ -48,18 +48,19 @@ public PublishPacket getPublishPacket() { *

If this method is not called, the client will automatically send a PUBACK for QoS 1 * messages when the callback returns.

* - * @return A {@link Mqtt5PubackControlHandle} that can be used to manually send the PUBACK. - * @throws IllegalStateException if called outside the onMessageReceived callback or called more than once. + * @return A {@link Mqtt5PublishAcknowledgementControlHandle} that can be used to manually send the acknowledgement. + * @throws IllegalStateException if called outside the onMessageReceived callback, called more than once, + * or called on a QoS 0 message. */ - public synchronized Mqtt5PubackControlHandle acquirePubackControl() { + public synchronized Mqtt5PublishAcknowledgementControlHandle acquirePublishAcknowledgementControl() { if (nativeContextPtrHolder == null || nativeContextPtrHolder[0] == 0) { throw new IllegalStateException( - "acquirePubackControl() must be called within the onMessageReceived callback and may only be called once."); + "acquirePublishAcknowledgementControl() must be called within the onMessageReceived callback and may only be called once."); } - long controlId = mqtt5AcquirePubackControl(nativeContextPtrHolder[0]); + long controlId = mqtt5AcquirePublishAcknowledgementControl(nativeContextPtrHolder[0]); /* We set the array element to 0 so it can't be double-called */ nativeContextPtrHolder[0] = 0; - return new Mqtt5PubackControlHandle(controlId); + return new Mqtt5PublishAcknowledgementControlHandle(controlId); } /** @@ -68,7 +69,8 @@ public synchronized Mqtt5PubackControlHandle acquirePubackControl() { * after the onMessageReceived callback returns to prevent use-after-free. * * @param newPublishPacket The PublishPacket data received from the server. - * @param nativeContextPtrHolder Single-element long[] holding the native PUBACK control context pointer. + * @param nativeContextPtrHolder Single-element long[] holding the native publish acknowledgement control + * context pointer. */ private PublishReturn(PublishPacket newPublishPacket, long[] nativeContextPtrHolder) { this.publishPacket = newPublishPacket; @@ -76,9 +78,9 @@ private PublishReturn(PublishPacket newPublishPacket, long[] nativeContextPtrHol } /** - * Calls the native aws_mqtt5_client_acquire_puback function. - * @param nativeContextPtr Pointer to the native manual PUBACK control context. - * @return The native puback control ID. + * Calls the native aws_mqtt5_client_acquire_publish_acknowledgement function. + * @param nativeContextPtr Pointer to the native manual publish acknowledgement control context. + * @return The native publish acknowledgement control ID. */ - private static native long mqtt5AcquirePubackControl(long nativeContextPtr); -} \ No newline at end of file + private static native long mqtt5AcquirePublishAcknowledgementControl(long nativeContextPtr); +} diff --git a/src/native/mqtt5_client.c b/src/native/mqtt5_client.c index 6ac9cd6c8..856dba493 100644 --- a/src/native/mqtt5_client.c +++ b/src/native/mqtt5_client.c @@ -38,8 +38,8 @@ struct aws_mqtt5_client_publish_return_data { jobject jni_publish_future; }; -/* Context for manual PUBACK control. Valid only during the publish received callback. */ -struct manual_puback_control_context { +/* Context for manual publish acknowledgement control. Valid only during the publish received callback. */ +struct manual_publish_acknowledgement_control_context { struct aws_mqtt5_client *client; const struct aws_mqtt5_packet_publish_view *publish_packet; }; @@ -586,12 +586,12 @@ static void s_aws_mqtt5_client_java_publish_received( goto clean_up; } - /* Create manual PUBACK control context (valid only during this callback) + /* Create manual publish acknowledgement control context (valid only during this callback). * We clean this before clean_up since we don't want to create this early and we can be sure to * hit the aws_mem_release AFTER we return from the publish events callback. */ - struct manual_puback_control_context *control_context = - aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct manual_puback_control_context)); + struct manual_publish_acknowledgement_control_context *control_context = + aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct manual_publish_acknowledgement_control_context)); /* * Create a single-element long[] array to hold the native context pointer. @@ -603,14 +603,15 @@ static void s_aws_mqtt5_client_java_publish_received( control_context->publish_packet = publish; context_ptr_holder = (*env)->NewLongArray(env, 1); - /* If allocation failed, Java will throw IllegalStateException on acquirePubackControl(). */ + /* If allocation failed, Java will throw IllegalStateException on acquirePublishAcknowledgementControl(). */ if (context_ptr_holder != NULL) { /* - * Only expose the context pointer for QoS 1 publishes. For QoS 0, there is no PUBACK - * to send, so acquirePubackControl() should throw IllegalStateException. We pass 0 (null). + * Only expose the context pointer for QoS 1 publishes. For QoS 0, there is no publish + * acknowledgement to send, so acquirePublishAcknowledgementControl() should throw + * IllegalStateException. We pass 0 (null). */ jlong context_ptr_value = (publish->qos == AWS_MQTT5_QOS_AT_MOST_ONCE) ? 0 : (jlong)(uintptr_t)control_context; - /* Assigning the pointer to the allocated manual_puback_control_context to the single-element of the array */ + /* Assigning the pointer to the allocated manual_publish_acknowledgement_control_context to the array */ (*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &context_ptr_value); } @@ -636,7 +637,7 @@ static void s_aws_mqtt5_client_java_publish_received( /* * Invalidate the context pointer in the long[] array now that the callback has returned. - * This prevents use-after-free if acquirePubackControl() is called after the callback. + * This prevents use-after-free if acquirePublishAcknowledgementControl() is called after the callback. * SetLongArrayRegion requires no extra JNI method ID. */ if (context_ptr_holder != NULL) { @@ -2225,18 +2226,18 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl } /******************************************************************************* - * Manual PUBACK Control Functions + * Manual Publish Acknowledgement Control Functions ******************************************************************************/ /** - * Called from PublishReturn.mqtt5AcquirePubackControl(long nativeContextPtr). - * Calls aws_mqtt5_client_acquire_puback to take manual control of the PUBACK - * for the received PUBLISH packet. Returns the puback_control_id as a jlong. + * Called from PublishReturn.mqtt5AcquirePublishAcknowledgementControl(long nativeContextPtr). + * Calls aws_mqtt5_client_acquire_publish_acknowledgement to take manual control of the publish + * acknowledgement (PUBACK) for the received PUBLISH packet. Returns the pub_ack_control_id as a jlong. * * This must be called within the onMessageReceived callback while the * native context pointer is still valid. */ -JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt5AcquirePubackControl( +JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt5AcquirePublishAcknowledgementControl( JNIEnv *env, jclass jni_class, jlong native_context_ptr) { @@ -2246,53 +2247,58 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt if (native_context_ptr == 0) { aws_jni_throw_runtime_exception( env, - "PublishReturn.acquirePubackControl: context is no longer valid. " - "acquirePubackControl() must be called within the onMessageReceived callback."); + "PublishReturn.acquirePublishAcknowledgementControl: context is no longer valid. " + "acquirePublishAcknowledgementControl() must be called within the onMessageReceived callback."); return 0; } - struct manual_puback_control_context *context = - (struct manual_puback_control_context *)(uintptr_t)native_context_ptr; + struct manual_publish_acknowledgement_control_context *context = + (struct manual_publish_acknowledgement_control_context *)(uintptr_t)native_context_ptr; if (!context->client || !context->publish_packet) { aws_jni_throw_runtime_exception( - env, "PublishReturn.acquirePubackControl: invalid native PUBACK control context"); + env, + "PublishReturn.acquirePublishAcknowledgementControl: invalid native publish acknowledgement control " + "context"); return 0; } - uint64_t control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet); + uint64_t control_id = aws_mqtt5_client_acquire_publish_acknowledgement(context->client, context->publish_packet); return (jlong)control_id; } /** - * Called from Mqtt5Client.mqtt5ClientInternalInvokePuback(long client, long controlId). - * Calls aws_mqtt5_client_invoke_puback to send the PUBACK for a previously acquired - * manual PUBACK control handle. + * Called from Mqtt5Client.mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId). + * Calls aws_mqtt5_client_invoke_publish_acknowledgement to send the publish acknowledgement (PUBACK) + * for a previously acquired manual publish acknowledgement control handle. */ -JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePuback( - JNIEnv *env, - jclass jni_class, - jlong jni_client, - jlong control_id) { +JNIEXPORT void JNICALL + Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePublishAcknowledgement( + JNIEnv *env, + jclass jni_class, + jlong jni_client, + jlong control_id) { (void)jni_class; aws_cache_jni_ids(env); struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client; if (!java_client) { s_aws_mqtt5_client_log_and_throw_exception( - env, "Mqtt5Client.invokePuback: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT); + env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT); return; } if (!java_client->client) { s_aws_mqtt5_client_log_and_throw_exception( - env, "Mqtt5Client.invokePuback: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT); + env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT); return; } - int result = aws_mqtt5_client_invoke_puback(java_client->client, (uint64_t)control_id, NULL); + int result = aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, (uint64_t)control_id, NULL); if (result != AWS_OP_SUCCESS) { s_aws_mqtt5_client_log_and_throw_exception( - env, "Mqtt5Client.invokePuback: aws_mqtt5_client_invoke_puback failed!", aws_last_error()); + env, + "Mqtt5Client.invokePublishAcknowledgement: aws_mqtt5_client_invoke_publish_acknowledgement failed!", + aws_last_error()); } } diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index 1ed596bb5..4b9807467 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -2548,11 +2548,11 @@ public void QoS1_UC1() throws Exception { /** * ============================================================ - * Manual PUBACK Tests + * Manual Publish Acknowledgement Tests * ============================================================ */ - private void doManualPuback_HoldTest() { + private void doManualPublishAcknowledgement_HoldTest() { try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); TlsContext tlsContext = new TlsContext(tlsOptions)) { @@ -2565,7 +2565,7 @@ private void doManualPuback_HoldTest() { CompletableFuture firstDeliveryFuture = new CompletableFuture<>(); CompletableFuture redeliveryFuture = new CompletableFuture<>(); - Mqtt5PubackControlHandle[] pubackHandleHolder = new Mqtt5PubackControlHandle[1]; + Mqtt5PublishAcknowledgementControlHandle[] publishAckHandleHolder = new Mqtt5PublishAcknowledgementControlHandle[1]; Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); LifecycleEvents_Futured events = new LifecycleEvents_Futured(); @@ -2573,7 +2573,7 @@ private void doManualPuback_HoldTest() { builder.withTlsContext(tlsContext); ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); - connectOptions.withClientId("test/MQTT5_ManualPuback_Java_" + testUUID); + connectOptions.withClientId("test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID); builder.withConnectOptions(connectOptions.build()); builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { @@ -2581,8 +2581,8 @@ private void doManualPuback_HoldTest() { public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { byte[] receivedPayload = publishReturn.getPublishPacket().getPayload(); if (!firstDeliveryFuture.isDone()) { - // First delivery: acquire manual PUBACK control to hold the PUBACK - pubackHandleHolder[0] = publishReturn.acquirePubackControl(); + // First delivery: acquire manual publish acknowledgement control to hold the PUBACK + publishAckHandleHolder[0] = publishReturn.acquirePublishAcknowledgementControl(); firstDeliveryFuture.complete(receivedPayload); } else if (!redeliveryFuture.isDone()) { // Second delivery: broker re-sent because no PUBACK was received @@ -2606,7 +2606,7 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { // Wait for the first delivery and confirm PUBACK was held byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload)); - assertNotNull("acquirePubackControl() should have returned a handle", pubackHandleHolder[0]); + assertNotNull("acquirePublishAcknowledgementControl() should have returned a handle", publishAckHandleHolder[0]); // Wait up to 60 seconds for the broker to re-deliver the message (no PUBACK was sent) byte[] redeliveredPayload = redeliveryFuture.get(PUBACK_HOLD_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -2614,7 +2614,7 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { java.util.Arrays.equals(redeliveredPayload, payload)); // Release the held PUBACK now that we've confirmed re-delivery - client.invokePuback(pubackHandleHolder[0]); + client.invokePublishAcknowledgement(publishAckHandleHolder[0]); client.stop(); events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); @@ -2624,31 +2624,31 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { } } - /* Manual PUBACK hold test: hold PUBACK and verify broker re-delivers the message */ + /* Manual publish acknowledgement hold test: hold PUBACK and verify broker re-delivers the message */ @Test public void ManualPuback_Hold() throws Exception { skipIfNetworkUnavailable(); Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); - TestUtils.doRetryableTest(this::doManualPuback_HoldTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_HoldTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); CrtResource.waitForNoResources(); } - private void doManualPuback_InvokeTest() { + private void doManualPublishAcknowledgement_InvokeTest() { try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); TlsContext tlsContext = new TlsContext(tlsOptions)) { String testUUID = UUID.randomUUID().toString(); - String testTopic = "test/MQTT5_ManualPuback_Java_" + testUUID; + String testTopic = "test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID; byte[] payload = testUUID.getBytes(); final long NO_REDELIVERY_WAIT_SEC = 60L; CompletableFuture firstDeliveryFuture = new CompletableFuture<>(); CompletableFuture unexpectedRedeliveryFuture = new CompletableFuture<>(); - Mqtt5PubackControlHandle[] pubackHandleHolder = new Mqtt5PubackControlHandle[1]; + Mqtt5PublishAcknowledgementControlHandle[] publishAckHandleHolder = new Mqtt5PublishAcknowledgementControlHandle[1]; Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); LifecycleEvents_Futured events = new LifecycleEvents_Futured(); @@ -2656,7 +2656,7 @@ private void doManualPuback_InvokeTest() { builder.withTlsContext(tlsContext); ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); - connectOptions.withClientId("test/MQTT5_ManualPuback_Java_" + testUUID); + connectOptions.withClientId("test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID); builder.withConnectOptions(connectOptions.build()); builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { @@ -2664,8 +2664,8 @@ private void doManualPuback_InvokeTest() { public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { byte[] receivedPayload = publishReturn.getPublishPacket().getPayload(); if (!firstDeliveryFuture.isDone()) { - // First delivery: acquire manual PUBACK control, then immediately invoke it - pubackHandleHolder[0] = publishReturn.acquirePubackControl(); + // First delivery: acquire manual publish acknowledgement control, then immediately invoke it + publishAckHandleHolder[0] = publishReturn.acquirePublishAcknowledgementControl(); firstDeliveryFuture.complete(receivedPayload); } else if (java.util.Arrays.equals(receivedPayload, payload) && !unexpectedRedeliveryFuture.isDone()) { // A second delivery of the same payload means the broker re-sent. This should NOT happen @@ -2686,13 +2686,13 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - // Wait for the first delivery and confirm PUBACK handle was acquired + // Wait for the first delivery and confirm publish acknowledgement handle was acquired byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload)); - assertNotNull("acquirePubackControl() should have returned a handle", pubackHandleHolder[0]); + assertNotNull("acquirePublishAcknowledgementControl() should have returned a handle", publishAckHandleHolder[0]); - // Immediately invoke the PUBACK using the acquired handle - client.invokePuback(pubackHandleHolder[0]); + // Immediately invoke the publish acknowledgement using the acquired handle + client.invokePublishAcknowledgement(publishAckHandleHolder[0]); // Wait 60 seconds and confirm the broker does NOT re-deliver the message boolean redelivered = false; @@ -2702,7 +2702,7 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { } catch (java.util.concurrent.TimeoutException ex) { redelivered = false; } - assertTrue("Broker should NOT re-deliver the message after invokePuback() was called", + assertTrue("Broker should NOT re-deliver the message after invokePublishAcknowledgement() was called", !redelivered); client.stop(); @@ -2713,18 +2713,18 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { } } - /* Manual PUBACK invoke test: acquire and immediately invoke PUBACK, verify no re-delivery */ + /* Manual publish acknowledgement invoke test: acquire and immediately invoke, verify no re-delivery */ @Test public void ManualPuback_Invoke() throws Exception { skipIfNetworkUnavailable(); Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); - TestUtils.doRetryableTest(this::doManualPuback_InvokeTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_InvokeTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); CrtResource.waitForNoResources(); } - private void doManualPuback_AcquireDoubleCallRaisesTest() { + private void doManualPublishAcknowledgement_AcquireDoubleCallRaisesTest() { try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); TlsContext tlsContext = new TlsContext(tlsOptions)) { @@ -2745,10 +2745,10 @@ private void doManualPuback_AcquireDoubleCallRaisesTest() { public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { try { // First call should succeed - Mqtt5PubackControlHandle handle = publishReturn.acquirePubackControl(); + Mqtt5PublishAcknowledgementControlHandle handle = publishReturn.acquirePublishAcknowledgementControl(); // Second call on the same message should throw IllegalStateException try { - publishReturn.acquirePubackControl(); + publishReturn.acquirePublishAcknowledgementControl(); resultFuture.complete("no_error"); // Should not reach here } catch (IllegalStateException ex) { resultFuture.complete("double_call_raised"); @@ -2784,18 +2784,18 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { } } - /* Manual PUBACK double-call test: calling acquirePubackControl() twice raises IllegalStateException */ + /* Manual publish acknowledgement double-call test: calling acquirePublishAcknowledgementControl() twice raises IllegalStateException */ @Test public void ManualPuback_AcquireDoubleCallRaises() throws Exception { skipIfNetworkUnavailable(); Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); - TestUtils.doRetryableTest(this::doManualPuback_AcquireDoubleCallRaisesTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_AcquireDoubleCallRaisesTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); CrtResource.waitForNoResources(); } - private void doManualPuback_AcquirePostCallbackRaisesTest() { + private void doManualPublishAcknowledgement_AcquirePostCallbackRaisesTest() { try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); TlsContext tlsContext = new TlsContext(tlsOptions)) { @@ -2815,7 +2815,7 @@ private void doManualPuback_AcquirePostCallbackRaisesTest() { builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { @Override public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { - // Save the PublishReturn but do NOT call acquirePubackControl() within the callback + // Save the PublishReturn but do NOT call acquirePublishAcknowledgementControl() within the callback savedPublishReturnHolder[0] = publishReturn; callbackDoneFuture.complete(null); } @@ -2834,15 +2834,15 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { // Wait for the callback to complete callbackDoneFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - // Call acquirePubackControl() after the callback has returned, this should throw IllegalStateException + // Call acquirePublishAcknowledgementControl() after the callback has returned, this should throw IllegalStateException assertNotNull("savedPublishReturn should have been set", savedPublishReturnHolder[0]); boolean exceptionThrown = false; try { - savedPublishReturnHolder[0].acquirePubackControl(); + savedPublishReturnHolder[0].acquirePublishAcknowledgementControl(); } catch (IllegalStateException ex) { exceptionThrown = true; } - assertTrue("acquirePubackControl() should throw IllegalStateException after callback returns", + assertTrue("acquirePublishAcknowledgementControl() should throw IllegalStateException after callback returns", exceptionThrown); client.stop(); @@ -2853,13 +2853,13 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { } } - /* Manual PUBACK post-callback test: calling acquirePubackControl() after callback returns raises IllegalStateException */ + /* Manual publish acknowledgement post-callback test: calling acquirePublishAcknowledgementControl() after callback returns raises IllegalStateException */ @Test public void ManualPuback_AcquirePostCallbackRaises() throws Exception { skipIfNetworkUnavailable(); Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); - TestUtils.doRetryableTest(this::doManualPuback_AcquirePostCallbackRaisesTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_AcquirePostCallbackRaisesTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); CrtResource.waitForNoResources(); } @@ -2883,10 +2883,10 @@ private void doManualPuback_Qos0AcquireThrowsTest() { builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { @Override public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { - // For QoS 0, acquirePubackControl() should throw IllegalStateException + // For QoS 0, acquirePublishAcknowledgementControl() should throw IllegalStateException // because the native layer passes a null context pointer for QoS 0 publishes. try { - publishReturn.acquirePubackControl(); + publishReturn.acquirePublishAcknowledgementControl(); resultFuture.complete("no_error"); // Should not reach here } catch (IllegalStateException ex) { resultFuture.complete("threw_illegal_state"); @@ -2909,7 +2909,7 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); String result = resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - assertEquals("acquirePubackControl() should throw IllegalStateException for QoS 0 messages, got: " + result, + assertEquals("acquirePublishAcknowledgementControl() should throw IllegalStateException for QoS 0 messages, got: " + result, "threw_illegal_state", result); client.stop(); @@ -2920,7 +2920,7 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { } } - /* Manual PUBACK QoS 0 test: acquirePubackControl() throws IllegalStateException for QoS 0 messages */ + /* Manual publish acknowledgement QoS 0 test: acquirePublishAcknowledgementControl() throws IllegalStateException for QoS 0 messages */ @Test public void ManualPuback_Qos0AcquireThrows() throws Exception { skipIfNetworkUnavailable();