Skip to content

Commit 59460d9

Browse files
committed
addressed copilot pr comments
1 parent ab50230 commit 59460d9

5 files changed

Lines changed: 108 additions & 32 deletions

File tree

.github/workflows/build-validation.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,15 @@ jobs:
122122
continue-on-error: true
123123

124124
- name: Kill Durable Task Emulator
125+
if: always()
125126
run: docker kill durabletask-emulator
126127

127128
- name: Kill Azurite
129+
if: always()
128130
run: docker kill azurite
129131

130132
- name: Upload Durable Task Emulator Logs
133+
if: always()
131134
uses: actions/upload-artifact@v4
132135
with:
133136
name: Durable Task Emulator Logs

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import java.io.InputStream;
2121
import java.nio.charset.StandardCharsets;
2222
import java.util.UUID;
23+
import java.util.concurrent.CountDownLatch;
2324
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicReference;
2426
import java.util.regex.Pattern;
2527
import java.util.zip.GZIPInputStream;
2628
import java.util.zip.GZIPOutputStream;
@@ -45,7 +47,13 @@ public final class BlobPayloadStore extends PayloadStore {
4547

4648
private final BlobContainerClient containerClient;
4749
private final LargePayloadStorageOptions options;
48-
private final AtomicBoolean containerVerified = new AtomicBoolean(false);
50+
51+
// Container-creation guard. The first thread to call ensureContainerExists() creates the
52+
// latch and performs the RPC. Concurrent callers await the latch so they don't race ahead
53+
// and upload to a container that hasn't been created yet. On failure the reference is
54+
// reset to null so a subsequent call can retry.
55+
private final AtomicReference<CountDownLatch> containerLatch = new AtomicReference<>();
56+
private volatile boolean containerVerified;
4957

5058
/**
5159
* Creates a new {@code BlobPayloadStore} from the given options.
@@ -115,25 +123,9 @@ public String upload(String payload) {
115123

116124
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
117125

118-
// Ensure container exists (idempotent) — skip after first successful check.
119-
// compareAndSet lets only one concurrent caller perform the RPC; others skip.
120-
// On failure we reset the flag so a later call can retry.
121-
if (this.containerVerified.compareAndSet(false, true)) {
122-
try {
123-
this.containerClient.createIfNotExists();
124-
} catch (BlobStorageException e) {
125-
// 409 Conflict means it already exists — safe to ignore, leave flag set.
126-
if (e.getStatusCode() != 409) {
127-
this.containerVerified.set(false); // allow a future upload to retry
128-
throw new PayloadStorageException(
129-
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
130-
}
131-
} catch (RuntimeException e) {
132-
// Any other transport/SDK failure: also allow retry on next upload.
133-
this.containerVerified.set(false);
134-
throw e;
135-
}
136-
}
126+
// Ensure container exists before uploading. Thread-safe: the first caller creates
127+
// the container while concurrent callers wait for it to complete.
128+
ensureContainerExists();
137129

138130
try {
139131
// Defense-in-depth: require the blob to not already exist (If-None-Match: *).
@@ -193,6 +185,54 @@ public String upload(String payload) {
193185
return encodeToken(this.containerClient.getBlobContainerName(), blobName);
194186
}
195187

188+
/**
189+
* Ensures the blob container exists, creating it if necessary. Thread-safe: the first
190+
* caller performs the RPC while concurrent callers wait for it to complete. On success
191+
* the check is skipped on all future calls. On failure the guard is reset so a later
192+
* call can retry.
193+
*/
194+
private void ensureContainerExists() {
195+
if (this.containerVerified) {
196+
return;
197+
}
198+
199+
CountDownLatch latch = new CountDownLatch(1);
200+
CountDownLatch existing = this.containerLatch.compareAndExchange(null, latch);
201+
if (existing != null) {
202+
// Another thread is already creating the container — wait for it.
203+
try {
204+
existing.await();
205+
} catch (InterruptedException e) {
206+
Thread.currentThread().interrupt();
207+
throw new PayloadStorageException("Interrupted while waiting for container creation.", e);
208+
}
209+
// If the creating thread failed, containerVerified is still false; the next
210+
// upload attempt will retry. For now, return and let the upload proceed
211+
// (it will fail fast with a clear error if the container doesn't exist).
212+
return;
213+
}
214+
215+
// This thread is responsible for creating the container.
216+
try {
217+
this.containerClient.createIfNotExists();
218+
this.containerVerified = true;
219+
} catch (BlobStorageException e) {
220+
if (e.getStatusCode() == 409) {
221+
// 409 Conflict means it already exists — safe to ignore.
222+
this.containerVerified = true;
223+
} else {
224+
this.containerLatch.set(null); // allow a future call to retry
225+
throw new PayloadStorageException(
226+
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
227+
}
228+
} catch (RuntimeException e) {
229+
this.containerLatch.set(null); // allow a future call to retry
230+
throw e;
231+
} finally {
232+
latch.countDown(); // unblock waiting threads
233+
}
234+
}
235+
196236
@Override
197237
public String download(String token) {
198238
String[] decoded = decodeToken(token);

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadWorkerExtensions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public static DurableTaskGrpcWorkerBuilder useExternalizedPayloads(
3636
PayloadStore store = new BlobPayloadStore(options);
3737
builder.addInterceptor(new LargePayloadInterceptor(store, options));
3838
builder.setSupportsLargePayloads(true);
39+
builder.setLargePayloadThresholdBytes(options.getThresholdBytes());
3940
return builder;
4041
}
4142

@@ -59,6 +60,7 @@ public static DurableTaskGrpcWorkerBuilder useExternalizedPayloads(
5960
Objects.requireNonNull(options, "options must not be null");
6061
builder.addInterceptor(new LargePayloadInterceptor(store, options));
6162
builder.setSupportsLargePayloads(true);
63+
builder.setLargePayloadThresholdBytes(options.getThresholdBytes());
6264
return builder;
6365
}
6466
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -613,10 +613,14 @@ private static TaskFailureDetails validateActionsSize(
613613
if (effectiveSize > maxChunkBytes) {
614614
double maxMB = maxChunkBytes / 1024.0 / 1024.0;
615615
double actionMB = effectiveSize / 1024.0 / 1024.0;
616+
String advice = largePayloadsEnabled
617+
? "Large-payload externalization is enabled but the action still exceeds the limit after " +
618+
"estimated externalization of string payload fields. Reduce non-string field sizes or " +
619+
"increase the maximum chunk size."
620+
: "Enable large-payload externalization to Azure Blob Storage to support oversized actions.";
616621
String errorMessage = String.format(
617-
"A single orchestrator action of type %s with id %d exceeds the %.2fMB limit: %.2fMB. " +
618-
"Enable large-payload externalization to Azure Blob Storage to support oversized actions.",
619-
action.getOrchestratorActionTypeCase(), action.getId(), maxMB, actionMB);
622+
"A single orchestrator action of type %s with id %d exceeds the %.2fMB limit: %.2fMB. %s",
623+
action.getOrchestratorActionTypeCase(), action.getId(), maxMB, actionMB, advice);
620624
return TaskFailureDetails.newBuilder()
621625
.setErrorType("java.lang.IllegalStateException")
622626
.setErrorMessage(errorMessage)
@@ -794,22 +798,46 @@ private static int failureDetailsSavings(
794798
}
795799

796800
/**
797-
* Returns the bytes saved if a StringValue with {@code value.length() >= thresholdBytes}
798-
* is replaced by a fixed-size blob-reference token. Zero if the value is null, empty,
799-
* or below the threshold.
801+
* Returns the bytes saved if a StringValue whose UTF-8 byte length meets or exceeds
802+
* {@code thresholdBytes} is replaced by a fixed-size blob-reference token.
803+
* Zero if the value is null, empty, or below the threshold.
804+
* <p>
805+
* Uses the same UTF-8 byte-counting logic as {@code LargePayloadInterceptor.utf8ByteLength}
806+
* to ensure the estimation threshold and the interceptor's externalization threshold
807+
* are compared in consistent units.
800808
*/
801809
private static int stringValueSavings(StringValue sv, int thresholdBytes) {
802810
if (sv == null) return 0;
803811
String v = sv.getValue();
804812
if (v == null || v.isEmpty()) return 0;
805-
// Use String.length() rather than UTF-8 byte length — this is a lower-bound
806-
// approximation that is slightly conservative (under-estimates savings), which
807-
// means we err toward false positives not false negatives.
808-
int n = v.length();
813+
int n = utf8ByteLength(v);
809814
if (n < thresholdBytes) return 0;
810815
return Math.max(0, n - APPROX_TOKEN_WIRE_SIZE);
811816
}
812817

818+
/**
819+
* Computes the UTF-8 encoded byte length of a string without allocating a byte array.
820+
* Mirrors {@code LargePayloadInterceptor.utf8ByteLength} to keep estimation consistent
821+
* with actual externalization decisions.
822+
*/
823+
private static int utf8ByteLength(String s) {
824+
int count = 0;
825+
for (int i = 0; i < s.length(); i++) {
826+
char c = s.charAt(i);
827+
if (c <= 0x7F) {
828+
count++;
829+
} else if (c <= 0x7FF) {
830+
count += 2;
831+
} else if (Character.isHighSurrogate(c)) {
832+
count += 4;
833+
i++; // skip the low surrogate
834+
} else {
835+
count += 3;
836+
}
837+
}
838+
return count;
839+
}
840+
813841
/**
814842
* Completes an orchestrator task with automatic chunking if the response exceeds the maximum chunk size.
815843
* Matches the .NET SDK's CompleteOrchestratorTaskWithChunkingAsync behavior.

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,8 +390,11 @@ public DurableTaskGrpcWorkerBuilder addInterceptor(ClientInterceptor interceptor
390390
* Indicates that this worker supports large payload externalization.
391391
* <p>
392392
* When enabled, the worker announces the {@code WORKER_CAPABILITY_LARGE_PAYLOADS} capability
393-
* to the sidecar and skips the pre-send action size validation (since the gRPC interceptor
394-
* will externalize oversized payloads before they hit the wire).
393+
* to the sidecar. Pre-send action size validation still runs but uses an estimated
394+
* post-externalization size that accounts for payload fields the gRPC interceptor will
395+
* replace with small blob-reference tokens. Use
396+
* {@link #setLargePayloadThresholdBytes(int)} to align the estimation threshold with
397+
* the interceptor's externalization threshold.
395398
*
396399
* @param enabled whether large payload support is enabled
397400
* @return this builder object

0 commit comments

Comments
 (0)