Skip to content

Commit dd6fef1

Browse files
committed
addressed pr feedback
1 parent 29e3d82 commit dd6fef1

5 files changed

Lines changed: 207 additions & 119 deletions

File tree

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.InputStream;
2020
import java.nio.charset.StandardCharsets;
2121
import java.util.UUID;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.zip.GZIPInputStream;
2324
import java.util.zip.GZIPOutputStream;
2425

@@ -35,6 +36,7 @@ public final class BlobPayloadStore extends PayloadStore {
3536

3637
private final BlobContainerClient containerClient;
3738
private final LargePayloadStorageOptions options;
39+
private final AtomicBoolean containerVerified = new AtomicBoolean(false);
3840

3941
/**
4042
* Creates a new {@code BlobPayloadStore} from the given options.
@@ -104,14 +106,18 @@ public String upload(String payload) {
104106

105107
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
106108

107-
// Ensure container exists (idempotent)
108-
try {
109-
this.containerClient.createIfNotExists();
110-
} catch (BlobStorageException e) {
111-
// 409 Conflict means it already exists — safe to ignore
112-
if (e.getStatusCode() != 409) {
113-
throw new PayloadStorageException(
114-
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
109+
// Ensure container exists (idempotent) — skip after first successful check
110+
if (!this.containerVerified.get()) {
111+
try {
112+
this.containerClient.createIfNotExists();
113+
this.containerVerified.set(true);
114+
} catch (BlobStorageException e) {
115+
// 409 Conflict means it already exists — safe to ignore
116+
if (e.getStatusCode() != 409) {
117+
throw new PayloadStorageException(
118+
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
119+
}
120+
this.containerVerified.set(true);
115121
}
116122
}
117123

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadInterceptor.java

Lines changed: 146 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import io.grpc.*;
1010

11-
import java.nio.charset.StandardCharsets;
1211
import java.util.logging.Level;
1312
import java.util.logging.Logger;
1413

@@ -40,6 +39,11 @@ public LargePayloadInterceptor(PayloadStore payloadStore, LargePayloadStorageOpt
4039
if (options == null) {
4140
throw new IllegalArgumentException("options must not be null.");
4241
}
42+
if (options.getMaxPayloadBytes() < options.getThresholdBytes()) {
43+
throw new IllegalArgumentException(
44+
"maxPayloadBytes (" + options.getMaxPayloadBytes() +
45+
") must be >= thresholdBytes (" + options.getThresholdBytes() + ").");
46+
}
4347
this.payloadStore = payloadStore;
4448
this.options = options;
4549
}
@@ -60,8 +64,19 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
6064
@Override
6165
@SuppressWarnings("unchecked")
6266
public void onMessage(RespT message) {
63-
RespT resolved = (RespT) resolveResponsePayloads(message);
64-
super.onMessage(resolved);
67+
try {
68+
RespT resolved = (RespT) resolveResponsePayloads(message);
69+
super.onMessage(resolved);
70+
} catch (Exception ex) {
71+
logger.log(Level.SEVERE,
72+
"Failed to resolve externalized payload from blob storage.", ex);
73+
// Surface as a gRPC INTERNAL error so the worker loop can handle
74+
// reconnection via its StatusRuntimeException catch block.
75+
throw Status.INTERNAL
76+
.withDescription("Failed to resolve externalized payload: " + ex.getMessage())
77+
.withCause(ex)
78+
.asRuntimeException();
79+
}
6580
}
6681
};
6782
super.start(wrappedListener, headers);
@@ -140,19 +155,23 @@ private Object externalizeActivityResponse(ActivityResponse r) {
140155
return r.toBuilder().setResult(externalized).build();
141156
}
142157
} catch (Exception ex) {
143-
if (isPermanentStorageFailure(ex)) {
144-
// Convert to a failure response so the orchestration sees a failed activity
145-
return r.toBuilder()
146-
.clearResult()
147-
.setFailureDetails(TaskFailureDetails.newBuilder()
148-
.setErrorType(ex.getClass().getName())
149-
.setErrorMessage(ex.getMessage())
150-
.setStackTrace(StringValue.of(getStackTraceString(ex)))
151-
.setIsNonRetriable(true)
152-
.build())
153-
.build();
154-
}
155-
throw ex instanceof RuntimeException ? (RuntimeException) ex : new RuntimeException(ex);
158+
boolean permanent = isPermanentStorageFailure(ex);
159+
String prefix = permanent
160+
? "Permanent payload storage failure"
161+
: "Transient payload storage failure";
162+
logger.log(Level.WARNING, prefix + " while externalizing activity response.", ex);
163+
// Convert to a failure response so the orchestration sees a failed activity.
164+
// Permanent failures are non-retriable; transient failures are retriable so
165+
// the sidecar can re-dispatch the work item.
166+
return r.toBuilder()
167+
.clearResult()
168+
.setFailureDetails(TaskFailureDetails.newBuilder()
169+
.setErrorType(ex.getClass().getName())
170+
.setErrorMessage(prefix + ": " + ex.getMessage())
171+
.setStackTrace(StringValue.of(getStackTraceString(ex)))
172+
.setIsNonRetriable(permanent)
173+
.build())
174+
.build();
156175
}
157176
return r;
158177
}
@@ -179,109 +198,117 @@ private Object externalizeOrchestratorResponse(OrchestratorResponse r) {
179198

180199
return changed ? builder.build() : r;
181200
} catch (Exception ex) {
182-
if (isPermanentStorageFailure(ex)) {
183-
// Replace with a single Failed completion
184-
return OrchestratorResponse.newBuilder()
185-
.setInstanceId(r.getInstanceId())
186-
.setCompletionToken(r.getCompletionToken())
187-
.setIsPartial(false)
188-
.addActions(OrchestratorAction.newBuilder()
189-
.setCompleteOrchestration(CompleteOrchestrationAction.newBuilder()
190-
.setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED)
191-
.setFailureDetails(TaskFailureDetails.newBuilder()
192-
.setErrorType(ex.getClass().getName())
193-
.setErrorMessage(ex.getMessage())
194-
.setStackTrace(StringValue.of(getStackTraceString(ex)))
195-
.setIsNonRetriable(true)
196-
.build())
201+
boolean permanent = isPermanentStorageFailure(ex);
202+
String prefix = permanent
203+
? "Permanent payload storage failure"
204+
: "Transient payload storage failure";
205+
logger.log(Level.WARNING, prefix + " while externalizing orchestrator response.", ex);
206+
// Replace with a single Failed completion.
207+
// Permanent failures are non-retriable; transient failures are retriable so
208+
// the sidecar can re-dispatch the orchestration.
209+
return OrchestratorResponse.newBuilder()
210+
.setInstanceId(r.getInstanceId())
211+
.setCompletionToken(r.getCompletionToken())
212+
.setIsPartial(false)
213+
.addActions(OrchestratorAction.newBuilder()
214+
.setCompleteOrchestration(CompleteOrchestrationAction.newBuilder()
215+
.setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED)
216+
.setFailureDetails(TaskFailureDetails.newBuilder()
217+
.setErrorType(ex.getClass().getName())
218+
.setErrorMessage(prefix + ": " + ex.getMessage())
219+
.setStackTrace(StringValue.of(getStackTraceString(ex)))
220+
.setIsNonRetriable(permanent)
197221
.build())
198222
.build())
199-
.build();
200-
}
201-
throw ex instanceof RuntimeException ? (RuntimeException) ex : new RuntimeException(ex);
223+
.build())
224+
.build();
202225
}
203226
}
204227

205228
private OrchestratorAction externalizeOrchestratorAction(OrchestratorAction action) {
206229
OrchestratorAction.Builder builder = null;
207230

208-
if (action.hasCompleteOrchestration()) {
209-
CompleteOrchestrationAction complete = action.getCompleteOrchestration();
210-
CompleteOrchestrationAction.Builder cb = null;
231+
switch (action.getOrchestratorActionTypeCase()) {
232+
case COMPLETEORCHESTRATION: {
233+
CompleteOrchestrationAction complete = action.getCompleteOrchestration();
234+
CompleteOrchestrationAction.Builder cb = null;
211235

212-
StringValue result = maybeExternalize(complete.getResult());
213-
if (result != complete.getResult()) {
214-
cb = complete.toBuilder().setResult(result);
215-
}
216-
StringValue details = maybeExternalize(complete.getDetails());
217-
if (details != complete.getDetails()) {
218-
cb = (cb != null ? cb : complete.toBuilder()).setDetails(details);
219-
}
220-
if (cb != null) {
221-
builder = action.toBuilder().setCompleteOrchestration(cb.build());
236+
StringValue result = maybeExternalize(complete.getResult());
237+
if (result != complete.getResult()) {
238+
cb = complete.toBuilder().setResult(result);
239+
}
240+
StringValue details = maybeExternalize(complete.getDetails());
241+
if (details != complete.getDetails()) {
242+
cb = (cb != null ? cb : complete.toBuilder()).setDetails(details);
243+
}
244+
if (cb != null) {
245+
builder = action.toBuilder().setCompleteOrchestration(cb.build());
246+
}
247+
break;
222248
}
223-
}
224-
225-
if (action.hasTerminateOrchestration()) {
226-
TerminateOrchestrationAction term = action.getTerminateOrchestration();
227-
StringValue reason = maybeExternalize(term.getReason());
228-
if (reason != term.getReason()) {
229-
builder = (builder != null ? builder : action.toBuilder())
230-
.setTerminateOrchestration(term.toBuilder().setReason(reason).build());
249+
case TERMINATEORCHESTRATION: {
250+
TerminateOrchestrationAction term = action.getTerminateOrchestration();
251+
StringValue reason = maybeExternalize(term.getReason());
252+
if (reason != term.getReason()) {
253+
builder = action.toBuilder()
254+
.setTerminateOrchestration(term.toBuilder().setReason(reason).build());
255+
}
256+
break;
231257
}
232-
}
233-
234-
if (action.hasScheduleTask()) {
235-
ScheduleTaskAction schedule = action.getScheduleTask();
236-
StringValue input = maybeExternalize(schedule.getInput());
237-
if (input != schedule.getInput()) {
238-
builder = (builder != null ? builder : action.toBuilder())
239-
.setScheduleTask(schedule.toBuilder().setInput(input).build());
258+
case SCHEDULETASK: {
259+
ScheduleTaskAction schedule = action.getScheduleTask();
260+
StringValue input = maybeExternalize(schedule.getInput());
261+
if (input != schedule.getInput()) {
262+
builder = action.toBuilder()
263+
.setScheduleTask(schedule.toBuilder().setInput(input).build());
264+
}
265+
break;
240266
}
241-
}
242-
243-
if (action.hasCreateSubOrchestration()) {
244-
CreateSubOrchestrationAction sub = action.getCreateSubOrchestration();
245-
StringValue input = maybeExternalize(sub.getInput());
246-
if (input != sub.getInput()) {
247-
builder = (builder != null ? builder : action.toBuilder())
248-
.setCreateSubOrchestration(sub.toBuilder().setInput(input).build());
267+
case CREATESUBORCHESTRATION: {
268+
CreateSubOrchestrationAction sub = action.getCreateSubOrchestration();
269+
StringValue input = maybeExternalize(sub.getInput());
270+
if (input != sub.getInput()) {
271+
builder = action.toBuilder()
272+
.setCreateSubOrchestration(sub.toBuilder().setInput(input).build());
273+
}
274+
break;
249275
}
250-
}
251-
252-
if (action.hasSendEvent()) {
253-
SendEventAction sendEvt = action.getSendEvent();
254-
StringValue data = maybeExternalize(sendEvt.getData());
255-
if (data != sendEvt.getData()) {
256-
builder = (builder != null ? builder : action.toBuilder())
257-
.setSendEvent(sendEvt.toBuilder().setData(data).build());
276+
case SENDEVENT: {
277+
SendEventAction sendEvt = action.getSendEvent();
278+
StringValue data = maybeExternalize(sendEvt.getData());
279+
if (data != sendEvt.getData()) {
280+
builder = action.toBuilder()
281+
.setSendEvent(sendEvt.toBuilder().setData(data).build());
282+
}
283+
break;
258284
}
259-
}
260-
261-
if (action.hasSendEntityMessage()) {
262-
SendEntityMessageAction entityMsg = action.getSendEntityMessage();
263-
SendEntityMessageAction.Builder emBuilder = null;
285+
case SENDENTITYMESSAGE: {
286+
SendEntityMessageAction entityMsg = action.getSendEntityMessage();
287+
SendEntityMessageAction.Builder emBuilder = null;
264288

265-
if (entityMsg.hasEntityOperationSignaled()) {
266-
EntityOperationSignaledEvent sig = entityMsg.getEntityOperationSignaled();
267-
StringValue input = maybeExternalize(sig.getInput());
268-
if (input != sig.getInput()) {
269-
emBuilder = entityMsg.toBuilder()
270-
.setEntityOperationSignaled(sig.toBuilder().setInput(input).build());
289+
if (entityMsg.hasEntityOperationSignaled()) {
290+
EntityOperationSignaledEvent sig = entityMsg.getEntityOperationSignaled();
291+
StringValue input = maybeExternalize(sig.getInput());
292+
if (input != sig.getInput()) {
293+
emBuilder = entityMsg.toBuilder()
294+
.setEntityOperationSignaled(sig.toBuilder().setInput(input).build());
295+
}
296+
} else if (entityMsg.hasEntityOperationCalled()) {
297+
EntityOperationCalledEvent called = entityMsg.getEntityOperationCalled();
298+
StringValue input = maybeExternalize(called.getInput());
299+
if (input != called.getInput()) {
300+
emBuilder = entityMsg.toBuilder()
301+
.setEntityOperationCalled(called.toBuilder().setInput(input).build());
302+
}
271303
}
272-
}
273-
if (entityMsg.hasEntityOperationCalled()) {
274-
EntityOperationCalledEvent called = entityMsg.getEntityOperationCalled();
275-
StringValue input = maybeExternalize(called.getInput());
276-
if (input != called.getInput()) {
277-
emBuilder = (emBuilder != null ? emBuilder : entityMsg.toBuilder())
278-
.setEntityOperationCalled(called.toBuilder().setInput(input).build());
304+
if (emBuilder != null) {
305+
builder = action.toBuilder()
306+
.setSendEntityMessage(emBuilder.build());
279307
}
308+
break;
280309
}
281-
if (emBuilder != null) {
282-
builder = (builder != null ? builder : action.toBuilder())
283-
.setSendEntityMessage(emBuilder.build());
284-
}
310+
default:
311+
break;
285312
}
286313

287314
return builder != null ? builder.build() : action;
@@ -768,12 +795,26 @@ private static String getStackTraceString(Throwable t) {
768795
}
769796

770797
/**
771-
* Computes the UTF-8 encoded byte length of a string.
798+
* Computes the UTF-8 encoded byte length of a string without allocating a byte array.
772799
* <p>
773-
* Uses Java's canonical UTF-8 encoding behavior so malformed surrogate
774-
* sequences are measured exactly the same way as payload serialization.
800+
* Iterates the char sequence and counts bytes per the UTF-8 encoding rules,
801+
* correctly handling surrogate pairs as 4-byte sequences.
775802
*/
776803
private static int utf8ByteLength(String s) {
777-
return s.getBytes(StandardCharsets.UTF_8).length;
804+
int count = 0;
805+
for (int i = 0; i < s.length(); i++) {
806+
char c = s.charAt(i);
807+
if (c <= 0x7F) {
808+
count++;
809+
} else if (c <= 0x7FF) {
810+
count += 2;
811+
} else if (Character.isHighSurrogate(c)) {
812+
count += 4;
813+
i++; // skip the low surrogate
814+
} else {
815+
count += 3;
816+
}
817+
}
818+
return count;
778819
}
779820
}

0 commit comments

Comments
 (0)