Skip to content

Commit dfa39a9

Browse files
committed
more fixes
1 parent 33bfd3e commit dfa39a9

29 files changed

Lines changed: 582 additions & 611 deletions

client/src/main/java/com/microsoft/durabletask/AbstractTaskEntity.java

Lines changed: 431 additions & 0 deletions
Large diffs are not rendered by default.

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 30 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
4747
private final Duration maximumTimerInterval;
4848
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
4949
private final int maxConcurrentEntityWorkItems;
50-
private final int maxConcurrentActivityWorkItems;
5150
private final ExecutorService workItemExecutor;
5251

5352
private final TaskHubSidecarServiceBlockingStub sidecarClient;
@@ -57,7 +56,6 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
5756
this.activityFactories.putAll(builder.activityFactories);
5857
this.entityFactories.putAll(builder.entityFactories);
5958
this.maxConcurrentEntityWorkItems = builder.maxConcurrentEntityWorkItems;
60-
this.maxConcurrentActivityWorkItems = builder.maxConcurrentActivityWorkItems;
6159

6260
Channel sidecarGrpcChannel;
6361
if (builder.channel != null) {
@@ -170,9 +168,6 @@ public void startAndBlock() {
170168
while (true) {
171169
try {
172170
GetWorkItemsRequest.Builder requestBuilder = GetWorkItemsRequest.newBuilder();
173-
if (this.maxConcurrentActivityWorkItems > 0) {
174-
requestBuilder.setMaxConcurrentActivityWorkItems(this.maxConcurrentActivityWorkItems);
175-
}
176171
if (!this.entityFactories.isEmpty()) {
177172
// Signal to the sidecar that this worker can handle entity work items
178173
requestBuilder.setMaxConcurrentEntityWorkItems(this.maxConcurrentEntityWorkItems);
@@ -385,54 +380,40 @@ public void startAndBlock() {
385380
spanAttributes);
386381
Scope activityScope = activitySpan.makeCurrent();
387382

388-
this.workItemExecutor.submit(() -> {
389-
String output = null;
390-
TaskFailureDetails failureDetails = null;
391-
Throwable activityError = null;
392-
try {
393-
output = taskActivityExecutor.execute(
394-
activityRequest.getName(),
395-
activityRequest.getInput().getValue(),
396-
activityRequest.getTaskId());
397-
} catch (Throwable e) {
398-
activityError = e;
399-
failureDetails = TaskFailureDetails.newBuilder()
400-
.setErrorType(e.getClass().getName())
401-
.setErrorMessage(e.getMessage())
402-
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
403-
.build();
404-
} finally {
405-
activityScope.close();
406-
TracingHelper.endSpan(activitySpan, activityError);
407-
}
383+
String output = null;
384+
TaskFailureDetails failureDetails = null;
385+
Throwable activityError = null;
386+
try {
387+
output = taskActivityExecutor.execute(
388+
activityRequest.getName(),
389+
activityRequest.getInput().getValue(),
390+
activityRequest.getTaskId());
391+
} catch (Throwable e) {
392+
activityError = e;
393+
failureDetails = TaskFailureDetails.newBuilder()
394+
.setErrorType(e.getClass().getName())
395+
.setErrorMessage(e.getMessage())
396+
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
397+
.build();
398+
} finally {
399+
activityScope.close();
400+
TracingHelper.endSpan(activitySpan, activityError);
401+
}
408402

409-
try {
410-
ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
411-
.setInstanceId(activityInstanceId)
412-
.setTaskId(activityRequest.getTaskId())
413-
.setCompletionToken(workItem.getCompletionToken());
403+
ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
404+
.setInstanceId(activityInstanceId)
405+
.setTaskId(activityRequest.getTaskId())
406+
.setCompletionToken(workItem.getCompletionToken());
414407

415-
if (output != null) {
416-
responseBuilder.setResult(StringValue.of(output));
417-
}
408+
if (output != null) {
409+
responseBuilder.setResult(StringValue.of(output));
410+
}
418411

419-
if (failureDetails != null) {
420-
responseBuilder.setFailureDetails(failureDetails);
421-
}
412+
if (failureDetails != null) {
413+
responseBuilder.setFailureDetails(failureDetails);
414+
}
422415

423-
this.sidecarClient.completeActivityTask(responseBuilder.build());
424-
} catch (Exception e) {
425-
logger.log(Level.WARNING,
426-
String.format("Failed to complete activity '%s' for instance '%s'. Abandoning work item.",
427-
activityRequest.getName(),
428-
activityInstanceId),
429-
e);
430-
this.sidecarClient.abandonTaskActivityWorkItem(
431-
AbandonActivityTaskRequest.newBuilder()
432-
.setCompletionToken(workItem.getCompletionToken())
433-
.build());
434-
}
435-
});
416+
this.sidecarClient.completeActivityTask(responseBuilder.build());
436417
} else if (requestType == RequestCase.ENTITYREQUEST) {
437418
EntityBatchRequest entityRequest = workItem.getEntityRequest();
438419
this.workItemExecutor.submit(() -> {

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ public final class DurableTaskGrpcWorkerBuilder {
2222
Duration maximumTimerInterval;
2323
DurableTaskGrpcWorkerVersioningOptions versioningOptions;
2424
int maxConcurrentEntityWorkItems = 1;
25-
int maxConcurrentActivityWorkItems;
2625
int maxWorkItemThreads;
2726

2827
/**
@@ -96,16 +95,16 @@ public DurableTaskGrpcWorkerBuilder addEntity(String name, TaskEntityFactory fac
9695
/**
9796
* Registers an entity type for the constructed {@link DurableTaskGrpcWorker}.
9897
* <p>
99-
* The entity class must implement {@link ITaskEntity} and have a public no-argument constructor.
98+
* The entity class must implement {@link TaskEntity} and have a public no-argument constructor.
10099
* A new instance of the entity is created for each operation batch using reflection.
101100
* <p>
102101
* The entity name is derived from the simple class name of the provided type.
103102
*
104-
* @param entityClass the entity class to register; must implement {@link ITaskEntity}
103+
* @param entityClass the entity class to register; must implement {@link TaskEntity}
105104
* @return this builder object
106-
* @throws IllegalArgumentException if the class does not implement {@link ITaskEntity}
105+
* @throws IllegalArgumentException if the class does not implement {@link TaskEntity}
107106
*/
108-
public DurableTaskGrpcWorkerBuilder addEntity(Class<? extends ITaskEntity> entityClass) {
107+
public DurableTaskGrpcWorkerBuilder addEntity(Class<? extends TaskEntity> entityClass) {
109108
if (entityClass == null) {
110109
throw new IllegalArgumentException("entityClass must not be null.");
111110
}
@@ -116,21 +115,21 @@ public DurableTaskGrpcWorkerBuilder addEntity(Class<? extends ITaskEntity> entit
116115
/**
117116
* Registers an entity type with a specific name for the constructed {@link DurableTaskGrpcWorker}.
118117
* <p>
119-
* The entity class must implement {@link ITaskEntity} and have a public no-argument constructor.
118+
* The entity class must implement {@link TaskEntity} and have a public no-argument constructor.
120119
* A new instance of the entity is created for each operation batch using reflection.
121120
*
122121
* @param name the name of the entity type
123-
* @param entityClass the entity class to register; must implement {@link ITaskEntity}
122+
* @param entityClass the entity class to register; must implement {@link TaskEntity}
124123
* @return this builder object
125-
* @throws IllegalArgumentException if the class does not implement {@link ITaskEntity}
124+
* @throws IllegalArgumentException if the class does not implement {@link TaskEntity}
126125
*/
127-
public DurableTaskGrpcWorkerBuilder addEntity(String name, Class<? extends ITaskEntity> entityClass) {
126+
public DurableTaskGrpcWorkerBuilder addEntity(String name, Class<? extends TaskEntity> entityClass) {
128127
if (entityClass == null) {
129128
throw new IllegalArgumentException("entityClass must not be null.");
130129
}
131-
if (!ITaskEntity.class.isAssignableFrom(entityClass)) {
130+
if (!TaskEntity.class.isAssignableFrom(entityClass)) {
132131
throw new IllegalArgumentException(
133-
String.format("Type %s does not implement ITaskEntity.", entityClass.getName()));
132+
String.format("Type %s does not implement TaskEntity.", entityClass.getName()));
134133
}
135134
return this.addEntity(name, () -> {
136135
try {
@@ -152,14 +151,14 @@ public DurableTaskGrpcWorkerBuilder addEntity(String name, Class<? extends ITask
152151
* <p>
153152
* <b>Thread safety warning:</b> Because the same instance handles all operation batches,
154153
* the entity implementation must be thread-safe if concurrent entity work items are enabled.
155-
* Implementations that extend {@link TaskEntity} store mutable state in instance fields and
154+
* Implementations that extend {@link AbstractTaskEntity} store mutable state in instance fields and
156155
* are <b>not</b> safe for singleton registration. Use {@link #addEntity(Class)} or
157156
* {@link #addEntity(String, Class)} instead to create a new instance per batch.
158157
*
159158
* @param entity the entity instance to register
160159
* @return this builder object
161160
*/
162-
public DurableTaskGrpcWorkerBuilder addEntity(ITaskEntity entity) {
161+
public DurableTaskGrpcWorkerBuilder addEntity(TaskEntity entity) {
163162
if (entity == null) {
164163
throw new IllegalArgumentException("entity must not be null.");
165164
}
@@ -174,15 +173,15 @@ public DurableTaskGrpcWorkerBuilder addEntity(ITaskEntity entity) {
174173
* <p>
175174
* <b>Thread safety warning:</b> Because the same instance handles all operation batches,
176175
* the entity implementation must be thread-safe if concurrent entity work items are enabled.
177-
* Implementations that extend {@link TaskEntity} store mutable state in instance fields and
176+
* Implementations that extend {@link AbstractTaskEntity} store mutable state in instance fields and
178177
* are <b>not</b> safe for singleton registration. Use {@link #addEntity(String, Class)} instead
179178
* to create a new instance per batch.
180179
*
181180
* @param name the name of the entity type
182181
* @param entity the entity instance to register
183182
* @return this builder object
184183
*/
185-
public DurableTaskGrpcWorkerBuilder addEntity(String name, ITaskEntity entity) {
184+
public DurableTaskGrpcWorkerBuilder addEntity(String name, TaskEntity entity) {
186185
if (entity == null) {
187186
throw new IllegalArgumentException("entity must not be null.");
188187
}
@@ -260,25 +259,7 @@ public DurableTaskGrpcWorkerBuilder maxConcurrentEntityWorkItems(int maxConcurre
260259
}
261260

262261
/**
263-
* Sets the maximum number of activity work items that can be processed concurrently by this worker.
264-
* <p>
265-
* The sidecar enforces this limit by controlling how many activity work items are dispatched
266-
* to this worker at a time. The default value is 10.
267-
*
268-
* @param maxConcurrentActivityWorkItems the maximum number of concurrent activity work items (must be at least 1)
269-
* @return this builder object
270-
* @throws IllegalArgumentException if the value is less than 1
271-
*/
272-
public DurableTaskGrpcWorkerBuilder maxConcurrentActivityWorkItems(int maxConcurrentActivityWorkItems) {
273-
if (maxConcurrentActivityWorkItems < 1) {
274-
throw new IllegalArgumentException("maxConcurrentActivityWorkItems must be at least 1.");
275-
}
276-
this.maxConcurrentActivityWorkItems = maxConcurrentActivityWorkItems;
277-
return this;
278-
}
279-
280-
/**
281-
* Sets the maximum number of threads used for processing activity and entity work items.
262+
* Sets the maximum number of threads used for processing entity work items.
282263
* <p>
283264
* The default value is {@value DurableTaskGrpcWorker#DEFAULT_MAX_WORK_ITEM_THREADS}.
284265
* Threads are created on demand and idle threads are reclaimed after 60 seconds.

client/src/main/java/com/microsoft/durabletask/EntityRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public static byte[] loadAndRun(byte[] entityRequestBytes, TaskEntityFactory ent
106106
* @throws IllegalArgumentException if either parameter is {@code null} or if {@code base64EncodedEntityRequest}
107107
* is not valid base64-encoded protobuf
108108
*/
109-
public static String loadAndRun(String base64EncodedEntityRequest, ITaskEntity entity) {
109+
public static String loadAndRun(String base64EncodedEntityRequest, TaskEntity entity) {
110110
if (entity == null) {
111111
throw new IllegalArgumentException("entity must not be null");
112112
}
@@ -123,7 +123,7 @@ public static String loadAndRun(String base64EncodedEntityRequest, ITaskEntity e
123123
* @throws IllegalArgumentException if either parameter is {@code null} or if {@code entityRequestBytes}
124124
* is not valid protobuf
125125
*/
126-
public static byte[] loadAndRun(byte[] entityRequestBytes, ITaskEntity entity) {
126+
public static byte[] loadAndRun(byte[] entityRequestBytes, TaskEntity entity) {
127127
if (entity == null) {
128128
throw new IllegalArgumentException("entity must not be null");
129129
}

client/src/main/java/com/microsoft/durabletask/ITaskEntity.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)