Skip to content

Commit e02a960

Browse files
committed
fixed integration tests
1 parent 07f0d85 commit e02a960

3 files changed

Lines changed: 174 additions & 99 deletions

File tree

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ public void startAndBlock() {
144144
this.dataConverter,
145145
this.maximumTimerInterval,
146146
logger,
147-
this.versioningOptions);
147+
this.versioningOptions,
148+
true);
148149
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
149150
this.activityFactories,
150151
this.dataConverter,

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 154 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,30 @@ final class TaskOrchestrationExecutor {
3737
private final Logger logger;
3838
private final Duration maximumTimerInterval;
3939
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
40+
private final boolean useNativeEntityActions;
4041

4142
public TaskOrchestrationExecutor(
4243
HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
4344
DataConverter dataConverter,
4445
Duration maximumTimerInterval,
4546
Logger logger,
4647
DurableTaskGrpcWorkerVersioningOptions versioningOptions) {
48+
this(orchestrationFactories, dataConverter, maximumTimerInterval, logger, versioningOptions, false);
49+
}
50+
51+
public TaskOrchestrationExecutor(
52+
HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
53+
DataConverter dataConverter,
54+
Duration maximumTimerInterval,
55+
Logger logger,
56+
DurableTaskGrpcWorkerVersioningOptions versioningOptions,
57+
boolean useNativeEntityActions) {
4758
this.orchestrationFactories = orchestrationFactories;
4859
this.dataConverter = dataConverter;
4960
this.maximumTimerInterval = maximumTimerInterval;
5061
this.logger = logger;
5162
this.versioningOptions = versioningOptions;
63+
this.useNativeEntityActions = useNativeEntityActions;
5264
}
5365

5466
public TaskOrchestratorResult execute(
@@ -410,33 +422,50 @@ public void signalEntity(EntityInstanceId entityId, String operationName, Object
410422
String requestId = this.newUUID().toString();
411423
String serializedInput = this.dataConverter.serialize(input);
412424

413-
// Build DTFx RequestMessage JSON payload matching the legacy format that the
414-
// Azure Functions extension (DTFx backend) understands. The extension processes
415-
// entity messages as external events (SendEventAction), NOT the newer proto-native
416-
// SendEntityMessageAction which is designed for the DTS backend.
417-
ObjectNode requestMessage = JSON_MAPPER.createObjectNode();
418-
requestMessage.put("op", operationName);
419-
requestMessage.put("signal", true);
420-
if (serializedInput != null) {
421-
requestMessage.put("input", serializedInput);
422-
}
423-
requestMessage.put("id", requestId);
424-
String eventName = "op";
425-
if (options != null && options.getScheduledTime() != null) {
426-
String scheduledTimeStr = options.getScheduledTime().toString();
427-
requestMessage.put("due", scheduledTimeStr);
428-
eventName = "op@" + scheduledTimeStr;
425+
if (TaskOrchestrationExecutor.this.useNativeEntityActions) {
426+
// Proto-native SendEntityMessageAction for DTS/standalone sidecar backends
427+
EntityOperationSignaledEvent.Builder signalBuilder = EntityOperationSignaledEvent.newBuilder()
428+
.setRequestId(requestId)
429+
.setOperation(operationName)
430+
.setTargetInstanceId(StringValue.of(entityId.toString()));
431+
if (serializedInput != null) {
432+
signalBuilder.setInput(StringValue.of(serializedInput));
433+
}
434+
if (options != null && options.getScheduledTime() != null) {
435+
signalBuilder.setScheduledTime(
436+
DataConverter.getTimestampFromInstant(options.getScheduledTime()));
437+
}
438+
this.pendingActions.put(id, OrchestratorAction.newBuilder()
439+
.setId(id)
440+
.setSendEntityMessage(SendEntityMessageAction.newBuilder()
441+
.setEntityOperationSignaled(signalBuilder))
442+
.build());
443+
} else {
444+
// Legacy DTFx RequestMessage JSON for Azure Functions extension compatibility.
445+
// Uses SendEventAction (external event) instead of SendEntityMessageAction.
446+
ObjectNode requestMessage = JSON_MAPPER.createObjectNode();
447+
requestMessage.put("op", operationName);
448+
requestMessage.put("signal", true);
449+
if (serializedInput != null) {
450+
requestMessage.put("input", serializedInput);
451+
}
452+
requestMessage.put("id", requestId);
453+
String eventName = "op";
454+
if (options != null && options.getScheduledTime() != null) {
455+
String scheduledTimeStr = options.getScheduledTime().toString();
456+
requestMessage.put("due", scheduledTimeStr);
457+
eventName = "op@" + scheduledTimeStr;
458+
}
459+
this.pendingActions.put(id, OrchestratorAction.newBuilder()
460+
.setId(id)
461+
.setSendEvent(SendEventAction.newBuilder()
462+
.setInstance(OrchestrationInstance.newBuilder()
463+
.setInstanceId(entityId.toString()))
464+
.setName(eventName)
465+
.setData(StringValue.of(requestMessage.toString())))
466+
.build());
429467
}
430468

431-
this.pendingActions.put(id, OrchestratorAction.newBuilder()
432-
.setId(id)
433-
.setSendEvent(SendEventAction.newBuilder()
434-
.setInstance(OrchestrationInstance.newBuilder()
435-
.setInstanceId(entityId.toString()))
436-
.setName(eventName)
437-
.setData(StringValue.of(requestMessage.toString())))
438-
.build());
439-
440469
if (!this.isReplaying) {
441470
this.logger.fine(() -> String.format(
442471
"%s: signaling entity '%s' operation '%s' (#%d)",
@@ -472,29 +501,47 @@ public <V> Task<V> callEntity(EntityInstanceId entityId, String operationName, O
472501
String requestId = this.newUUID().toString();
473502
String serializedInput = this.dataConverter.serialize(input);
474503

475-
// Build DTFx RequestMessage JSON for entity call (two-way operation).
476-
// Uses SendEventAction (external event) instead of SendEntityMessageAction for
477-
// compatibility with the Azure Functions extension (DTFx backend).
478-
ObjectNode requestMessage = JSON_MAPPER.createObjectNode();
479-
requestMessage.put("op", operationName);
480-
if (serializedInput != null) {
481-
requestMessage.put("input", serializedInput);
482-
}
483-
requestMessage.put("id", requestId);
484-
requestMessage.put("parent", this.instanceId);
485-
if (this.executionId != null) {
486-
requestMessage.put("parentExecution", this.executionId);
504+
if (TaskOrchestrationExecutor.this.useNativeEntityActions) {
505+
// Proto-native SendEntityMessageAction for DTS/standalone sidecar backends
506+
EntityOperationCalledEvent.Builder callBuilder = EntityOperationCalledEvent.newBuilder()
507+
.setRequestId(requestId)
508+
.setOperation(operationName)
509+
.setTargetInstanceId(StringValue.of(entityId.toString()));
510+
if (serializedInput != null) {
511+
callBuilder.setInput(StringValue.of(serializedInput));
512+
}
513+
callBuilder.setParentInstanceId(StringValue.of(this.instanceId));
514+
if (this.executionId != null) {
515+
callBuilder.setParentExecutionId(StringValue.of(this.executionId));
516+
}
517+
this.pendingActions.put(id, OrchestratorAction.newBuilder()
518+
.setId(id)
519+
.setSendEntityMessage(SendEntityMessageAction.newBuilder()
520+
.setEntityOperationCalled(callBuilder))
521+
.build());
522+
} else {
523+
// Legacy DTFx RequestMessage JSON for Azure Functions extension compatibility.
524+
// Uses SendEventAction (external event) instead of SendEntityMessageAction.
525+
ObjectNode requestMessage = JSON_MAPPER.createObjectNode();
526+
requestMessage.put("op", operationName);
527+
if (serializedInput != null) {
528+
requestMessage.put("input", serializedInput);
529+
}
530+
requestMessage.put("id", requestId);
531+
requestMessage.put("parent", this.instanceId);
532+
if (this.executionId != null) {
533+
requestMessage.put("parentExecution", this.executionId);
534+
}
535+
this.pendingActions.put(id, OrchestratorAction.newBuilder()
536+
.setId(id)
537+
.setSendEvent(SendEventAction.newBuilder()
538+
.setInstance(OrchestrationInstance.newBuilder()
539+
.setInstanceId(entityId.toString()))
540+
.setName("op")
541+
.setData(StringValue.of(requestMessage.toString())))
542+
.build());
487543
}
488544

489-
this.pendingActions.put(id, OrchestratorAction.newBuilder()
490-
.setId(id)
491-
.setSendEvent(SendEventAction.newBuilder()
492-
.setInstance(OrchestrationInstance.newBuilder()
493-
.setInstanceId(entityId.toString()))
494-
.setName("op")
495-
.setData(StringValue.of(requestMessage.toString())))
496-
.build());
497-
498545
if (!this.isReplaying) {
499546
this.logger.info(() -> String.format(
500547
"%s: calling entity '%s' operation '%s' (#%d) requestId=%s",
@@ -572,28 +619,42 @@ public Task<AutoCloseable> lockEntities(List<EntityInstanceId> entityIds) {
572619
// through subsequent entities in the lock set.
573620
{
574621
int id = this.sequenceNumber++;
575-
ObjectNode lockRequestMessage = JSON_MAPPER.createObjectNode();
576-
lockRequestMessage.putNull("op");
577-
lockRequestMessage.put("id", criticalSectionId);
578-
ArrayNode lockSetArray = lockRequestMessage.putArray("lockset");
579-
for (EntityInstanceId eid : sortedIds) {
580-
ObjectNode entityIdNode = JSON_MAPPER.createObjectNode();
581-
entityIdNode.put("name", eid.getName());
582-
entityIdNode.put("key", eid.getKey());
583-
lockSetArray.add(entityIdNode);
584-
}
585-
lockRequestMessage.put("pos", 0);
586-
lockRequestMessage.put("parent", this.instanceId);
622+
if (TaskOrchestrationExecutor.this.useNativeEntityActions) {
623+
// Proto-native lock request for DTS/standalone sidecar backends
624+
this.pendingActions.put(id, OrchestratorAction.newBuilder()
625+
.setId(id)
626+
.setSendEntityMessage(SendEntityMessageAction.newBuilder()
627+
.setEntityLockRequested(EntityLockRequestedEvent.newBuilder()
628+
.setCriticalSectionId(criticalSectionId)
629+
.addAllLockSet(lockSet)
630+
.setPosition(0)
631+
.setParentInstanceId(StringValue.of(this.instanceId))))
632+
.build());
633+
} else {
634+
// Legacy DTFx JSON for Azure Functions extension compatibility
635+
ObjectNode lockRequestMessage = JSON_MAPPER.createObjectNode();
636+
lockRequestMessage.putNull("op");
637+
lockRequestMessage.put("id", criticalSectionId);
638+
ArrayNode lockSetArray = lockRequestMessage.putArray("lockset");
639+
for (EntityInstanceId eid : sortedIds) {
640+
ObjectNode entityIdNode = JSON_MAPPER.createObjectNode();
641+
entityIdNode.put("name", eid.getName());
642+
entityIdNode.put("key", eid.getKey());
643+
lockSetArray.add(entityIdNode);
644+
}
645+
lockRequestMessage.put("pos", 0);
646+
lockRequestMessage.put("parent", this.instanceId);
587647

588-
String targetEntityId = lockSet.get(0);
589-
this.pendingActions.put(id, OrchestratorAction.newBuilder()
590-
.setId(id)
591-
.setSendEvent(SendEventAction.newBuilder()
592-
.setInstance(OrchestrationInstance.newBuilder()
593-
.setInstanceId(targetEntityId))
594-
.setName("op")
595-
.setData(StringValue.of(lockRequestMessage.toString())))
596-
.build());
648+
String targetEntityId = lockSet.get(0);
649+
this.pendingActions.put(id, OrchestratorAction.newBuilder()
650+
.setId(id)
651+
.setSendEvent(SendEventAction.newBuilder()
652+
.setInstance(OrchestrationInstance.newBuilder()
653+
.setInstanceId(targetEntityId))
654+
.setName("op")
655+
.setData(StringValue.of(lockRequestMessage.toString())))
656+
.build());
657+
}
597658
}
598659

599660
// Store the lock set so handleEntityLockGranted can populate lockedEntityIds
@@ -620,19 +681,30 @@ public Task<AutoCloseable> lockEntities(List<EntityInstanceId> entityIds) {
620681
// Release all locks
621682
for (EntityInstanceId lockedEntity : sortedIds) {
622683
int unlockId = this.sequenceNumber++;
623-
// Build DTFx ReleaseMessage JSON for releasing entity locks
624-
ObjectNode releaseMessage = JSON_MAPPER.createObjectNode();
625-
releaseMessage.put("parent", this.instanceId);
626-
releaseMessage.put("id", criticalSectionId);
627-
628-
this.pendingActions.put(unlockId, OrchestratorAction.newBuilder()
629-
.setId(unlockId)
630-
.setSendEvent(SendEventAction.newBuilder()
631-
.setInstance(OrchestrationInstance.newBuilder()
632-
.setInstanceId(lockedEntity.toString()))
633-
.setName("release")
634-
.setData(StringValue.of(releaseMessage.toString())))
635-
.build());
684+
if (TaskOrchestrationExecutor.this.useNativeEntityActions) {
685+
// Proto-native unlock for DTS/standalone sidecar backends
686+
this.pendingActions.put(unlockId, OrchestratorAction.newBuilder()
687+
.setId(unlockId)
688+
.setSendEntityMessage(SendEntityMessageAction.newBuilder()
689+
.setEntityUnlockSent(EntityUnlockSentEvent.newBuilder()
690+
.setCriticalSectionId(criticalSectionId)
691+
.setParentInstanceId(StringValue.of(this.instanceId))
692+
.setTargetInstanceId(StringValue.of(lockedEntity.toString()))))
693+
.build());
694+
} else {
695+
// Legacy DTFx ReleaseMessage JSON for Azure Functions extension compatibility
696+
ObjectNode releaseMessage = JSON_MAPPER.createObjectNode();
697+
releaseMessage.put("parent", this.instanceId);
698+
releaseMessage.put("id", criticalSectionId);
699+
this.pendingActions.put(unlockId, OrchestratorAction.newBuilder()
700+
.setId(unlockId)
701+
.setSendEvent(SendEventAction.newBuilder()
702+
.setInstance(OrchestrationInstance.newBuilder()
703+
.setInstanceId(lockedEntity.toString()))
704+
.setName("release")
705+
.setData(StringValue.of(releaseMessage.toString())))
706+
.build());
707+
}
636708
}
637709

638710
this.isInCriticalSection = false;
@@ -1084,8 +1156,8 @@ private void handleEntityResponseFromEventRaised(TaskRecord<?> matchingTaskRecor
10841156

10851157
private void handleEventSent(HistoryEvent e) {
10861158
// During replay, remove the pending action so we don't re-send already-processed
1087-
// events. This applies to entity operations (signal, call, lock, unlock) which
1088-
// now use SendEventAction, as well as regular sendEvent calls.
1159+
// events. When using the legacy Azure Functions path (useNativeEntityActions=false),
1160+
// this also applies to entity operations which use SendEventAction.
10891161
int taskId = e.getEventId();
10901162
this.pendingActions.remove(taskId);
10911163
}

0 commit comments

Comments
 (0)