-
Notifications
You must be signed in to change notification settings - Fork 42
Manual PUBACK Control #970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cadd516
cc10600
0452994
45286e2
f02d9d2
c02496e
9709faa
23a8573
a164681
13d973e
bcb8ecc
42dfa69
f66b941
f3484fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /** | ||
| * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * SPDX-License-Identifier: Apache-2.0. | ||
| */ | ||
| package software.amazon.awssdk.crt.mqtt5; | ||
|
|
||
| /** | ||
| * An opaque handle representing manual control over a QoS 1 PUBACK for a received PUBLISH packet. | ||
| * | ||
| * <p>This class cannot be instantiated directly. Instances are only created by the CRT library.</p> | ||
| */ | ||
| public class Mqtt5PubackControlHandle { | ||
|
|
||
| private final long controlId; | ||
|
|
||
| /** | ||
| * Creates a new Mqtt5PubackControlHandle. Only called from native/JNI code. | ||
| * | ||
| * @param controlId The native puback control ID returned by aws_mqtt5_client_acquire_puback. | ||
| */ | ||
| Mqtt5PubackControlHandle(long controlId) { | ||
| this.controlId = controlId; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the native puback control ID. Used internally by JNI. | ||
| * | ||
| * @return The native puback control ID. | ||
| */ | ||
| long getControlId() { | ||
| return controlId; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,16 @@ | |
| public class PublishReturn { | ||
| private PublishPacket publishPacket; | ||
|
|
||
| /** | ||
| * Single-element long array holding the native manual PUBACK control context pointer. | ||
| * Element [0] is the pointer value, valid only during the | ||
| * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. | ||
| * QoS 0 results in this being set to 0. | ||
| * Native code sets [0] to 0 after the callback returns (via SetLongArrayRegion, | ||
| * requiring no extra JNI method ID). | ||
| */ | ||
| private final long[] nativeContextPtrHolder; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand why this complexity is necessary. The acquire method is synchronized. |
||
|
|
||
| /** | ||
| * Returns the PublishPacket returned from the server or Null if none was returned. | ||
| * @return The PublishPacket returned from the server. | ||
|
|
@@ -22,12 +32,53 @@ public PublishPacket getPublishPacket() { | |
| return publishPacket; | ||
| } | ||
|
|
||
| /** | ||
| * Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the | ||
| * client from automatically sending a PUBACK. The returned handle can be passed to | ||
| * {@link Mqtt5Client#invokePuback(Mqtt5PubackControlHandle)} at a later time to send the | ||
| * PUBACK to the broker. | ||
| * | ||
| * <p><b>Important:</b> This method must be called within the | ||
| * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it after the | ||
| * callback returns will throw an {@link IllegalStateException}.</p> | ||
| * | ||
| * <p>This method may only be called once per received PUBLISH. Subsequent calls will throw | ||
| * an {@link IllegalStateException}.</p> | ||
| * | ||
| * <p>If this method is not called, the client will automatically send a PUBACK for QoS 1 | ||
| * messages when the callback returns.</p> | ||
| * | ||
| * @return A {@link Mqtt5PubackControlHandle} that can be used to manually send the PUBACK. | ||
| * @throws IllegalStateException if called outside the onMessageReceived callback or called more than once. | ||
| */ | ||
| public synchronized Mqtt5PubackControlHandle acquirePubackControl() { | ||
| if (nativeContextPtrHolder == null || nativeContextPtrHolder[0] == 0) { | ||
| throw new IllegalStateException( | ||
| "acquirePubackControl() must be called within the onMessageReceived callback and may only be called once."); | ||
| } | ||
| long controlId = mqtt5AcquirePubackControl(nativeContextPtrHolder[0]); | ||
| /* We set the array element to 0 so it can't be double-called */ | ||
| nativeContextPtrHolder[0] = 0; | ||
| return new Mqtt5PubackControlHandle(controlId); | ||
| } | ||
|
|
||
| /** | ||
| * This is only called in JNI to make a new PublishReturn with a PUBLISH packet. | ||
| * @param newPublishPacket The PubAckPacket data for QoS 1 packets. Can be null if result is non QoS 1. | ||
| * @return A newly created PublishResult | ||
| * The nativeContextPtrHolder is a single-element long array; native code sets [0] to 0 | ||
| * after the onMessageReceived callback returns to prevent use-after-free. | ||
| * | ||
| * @param newPublishPacket The PublishPacket data received from the server. | ||
| * @param nativeContextPtrHolder Single-element long[] holding the native PUBACK control context pointer. | ||
| */ | ||
| private PublishReturn(PublishPacket newPublishPacket) { | ||
| private PublishReturn(PublishPacket newPublishPacket, long[] nativeContextPtrHolder) { | ||
| this.publishPacket = newPublishPacket; | ||
| this.nativeContextPtrHolder = nativeContextPtrHolder; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Calls the native aws_mqtt5_client_acquire_puback function. | ||
| * @param nativeContextPtr Pointer to the native manual PUBACK control context. | ||
| * @return The native puback control ID. | ||
| */ | ||
| private static native long mqtt5AcquirePubackControl(long nativeContextPtr); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,12 @@ struct aws_mqtt5_client_publish_return_data { | |
| jobject jni_publish_future; | ||
| }; | ||
|
|
||
| /* Context for manual PUBACK control. Valid only during the publish received callback. */ | ||
| struct manual_puback_control_context { | ||
| struct aws_mqtt5_client *client; | ||
| const struct aws_mqtt5_packet_publish_view *publish_packet; | ||
| }; | ||
|
|
||
| struct aws_mqtt5_client_subscribe_return_data { | ||
| struct aws_mqtt5_client_java_jni *java_client; | ||
| jobject jni_subscribe_future; | ||
|
|
@@ -538,6 +544,9 @@ static void s_aws_mqtt5_client_java_publish_received( | |
| /* One reference is needed for the PublishReturn */ | ||
| references_needed += 1; | ||
|
|
||
| /* One reference is needed for the long[] context pointer holder array */ | ||
| references_needed += 1; | ||
|
|
||
| /* A Publish packet will need 5 references at minimum */ | ||
| references_needed += 5; | ||
| /* Optionals */ | ||
|
|
@@ -577,12 +586,42 @@ static void s_aws_mqtt5_client_java_publish_received( | |
| goto clean_up; | ||
| } | ||
|
|
||
| /* Make the PublishReturn struct that will hold all of the data that is passed to Java */ | ||
| /* Create manual PUBACK control context (valid only during this callback) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's discuss this tomorrow. I think there are a lot of ways to simplify this. |
||
| * We clean this before clean_up since we don't want to create this early and we can be sure to | ||
| * hit the aws_mem_release AFTER we return from the publish events callback. | ||
| */ | ||
| struct manual_puback_control_context *control_context = | ||
| aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct manual_puback_control_context)); | ||
|
|
||
| /* | ||
| * Create a single-element long[] array to hold the native context pointer. | ||
| * This allows native code to invalidate the pointer after the callback returns | ||
| * using SetLongArrayRegion(array, 0, 1, &zero) without using an extra JNI method ID. | ||
| */ | ||
| jlongArray context_ptr_holder = NULL; | ||
| control_context->client = java_client->client; | ||
| control_context->publish_packet = publish; | ||
|
|
||
| context_ptr_holder = (*env)->NewLongArray(env, 1); | ||
| /* If allocation failed, Java will throw IllegalStateException on acquirePubackControl(). */ | ||
| if (context_ptr_holder != NULL) { | ||
| /* | ||
| * Only expose the context pointer for QoS 1 publishes. For QoS 0, there is no PUBACK | ||
| * to send, so acquirePubackControl() should throw IllegalStateException. We pass 0 (null). | ||
| */ | ||
| jlong context_ptr_value = (publish->qos == AWS_MQTT5_QOS_AT_MOST_ONCE) ? 0 : (jlong)(uintptr_t)control_context; | ||
| /* Assigning the pointer to the allocated manual_puback_control_context to the single-element of the array */ | ||
| (*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &context_ptr_value); | ||
| } | ||
|
|
||
| /* Make the PublishReturn struct that will hold all of the data that is passed to Java. | ||
| * The constructor takes (PublishPacket, long[]) where long[0] is the native context pointer. */ | ||
| publish_packet_return_data = (*env)->NewObject( | ||
| env, | ||
| mqtt5_publish_return_properties.return_class, | ||
| mqtt5_publish_return_properties.return_constructor_id, | ||
| publish_packet_data); | ||
| publish_packet_data, | ||
| context_ptr_holder); | ||
| aws_jni_check_and_clear_exception(env); /* To hide JNI warning */ | ||
|
|
||
| if (java_client->jni_publish_events) { | ||
|
|
@@ -594,6 +633,22 @@ static void s_aws_mqtt5_client_java_publish_received( | |
| publish_packet_return_data); | ||
| aws_jni_check_and_clear_exception(env); /* To hide JNI warning */ | ||
| } | ||
|
|
||
| /* | ||
| * Invalidate the context pointer in the long[] array now that the callback has returned. | ||
| * This prevents use-after-free if acquirePubackControl() is called after the callback. | ||
| * SetLongArrayRegion requires no extra JNI method ID. | ||
| */ | ||
| if (context_ptr_holder != NULL) { | ||
| jlong zero = 0; | ||
| (*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &zero); | ||
| } | ||
|
|
||
| /* Free the context. The publish_packet pointer is no longer valid after this callback returns */ | ||
| if (control_context != NULL) { | ||
| aws_mem_release(aws_jni_get_allocator(), control_context); | ||
| } | ||
|
|
||
| goto clean_up; | ||
|
|
||
| clean_up: | ||
|
|
@@ -2169,6 +2224,78 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl | |
| } | ||
| } | ||
|
|
||
| /******************************************************************************* | ||
| * Manual PUBACK Control Functions | ||
| ******************************************************************************/ | ||
|
|
||
| /** | ||
| * Called from PublishReturn.mqtt5AcquirePubackControl(long nativeContextPtr). | ||
| * Calls aws_mqtt5_client_acquire_puback to take manual control of the PUBACK | ||
| * for the received PUBLISH packet. Returns the puback_control_id as a jlong. | ||
| * | ||
| * This must be called within the onMessageReceived callback while the | ||
| * native context pointer is still valid. | ||
| */ | ||
| JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt5AcquirePubackControl( | ||
| JNIEnv *env, | ||
| jclass jni_class, | ||
| jlong native_context_ptr) { | ||
| (void)jni_class; | ||
| aws_cache_jni_ids(env); | ||
|
|
||
| if (native_context_ptr == 0) { | ||
| aws_jni_throw_runtime_exception( | ||
| env, | ||
| "PublishReturn.acquirePubackControl: context is no longer valid. " | ||
| "acquirePubackControl() must be called within the onMessageReceived callback."); | ||
| return 0; | ||
| } | ||
|
|
||
| struct manual_puback_control_context *context = | ||
| (struct manual_puback_control_context *)(uintptr_t)native_context_ptr; | ||
|
|
||
| if (!context->client || !context->publish_packet) { | ||
| aws_jni_throw_runtime_exception( | ||
| env, "PublishReturn.acquirePubackControl: invalid native PUBACK control context"); | ||
| return 0; | ||
| } | ||
|
|
||
| uint64_t control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet); | ||
| return (jlong)control_id; | ||
| } | ||
|
|
||
| /** | ||
| * Called from Mqtt5Client.mqtt5ClientInternalInvokePuback(long client, long controlId). | ||
| * Calls aws_mqtt5_client_invoke_puback to send the PUBACK for a previously acquired | ||
| * manual PUBACK control handle. | ||
| */ | ||
| JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePuback( | ||
| JNIEnv *env, | ||
| jclass jni_class, | ||
| jlong jni_client, | ||
| jlong control_id) { | ||
| (void)jni_class; | ||
| aws_cache_jni_ids(env); | ||
|
|
||
| struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client; | ||
| if (!java_client) { | ||
| s_aws_mqtt5_client_log_and_throw_exception( | ||
| env, "Mqtt5Client.invokePuback: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT); | ||
| return; | ||
| } | ||
| if (!java_client->client) { | ||
| s_aws_mqtt5_client_log_and_throw_exception( | ||
| env, "Mqtt5Client.invokePuback: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT); | ||
| return; | ||
| } | ||
|
|
||
| int result = aws_mqtt5_client_invoke_puback(java_client->client, (uint64_t)control_id, NULL); | ||
| if (result != AWS_OP_SUCCESS) { | ||
| s_aws_mqtt5_client_log_and_throw_exception( | ||
| env, "Mqtt5Client.invokePuback: aws_mqtt5_client_invoke_puback failed!", aws_last_error()); | ||
| } | ||
| } | ||
|
|
||
| #if UINTPTR_MAX == 0xffffffff | ||
| # if defined(_MSC_VER) | ||
| # pragma warning(pop) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the exception specification?