-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathDurableTaskClient.java
More file actions
545 lines (503 loc) · 28.1 KB
/
DurableTaskClient.java
File metadata and controls
545 lines (503 loc) · 28.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* Base class that defines client operations for managing orchestration instances.
* <p>
* Instances of this class can be used to start, query, raise events to, and terminate orchestration instances. In most
* cases, methods on this class accept an instance ID as a parameter, which identifies the orchestration instance.
* <p>
* At the time of writing, the most common implementation of this class is <code>DurableTaskGrpcClient</code>,
* which works by making gRPC calls to a remote service (e.g. a sidecar) that implements the operation behavior. To
* ensure any owned network resources are properly released, instances of this class should be closed when they are no
* longer needed.
* <p>
* Instances of this class are expected to be safe for multithreaded apps. You can therefore safely cache instances
* of this class and reuse them across multiple contexts. Caching these objects is useful to improve overall
* performance.
*/
public abstract class DurableTaskClient implements AutoCloseable {
/**
* Releases any network resources held by this object.
*/
@Override
public void close() {
// no default implementation
}
/**
* Schedules a new orchestration instance with a random ID for execution.
*
* @param orchestratorName the name of the orchestrator to schedule
* @return the randomly-generated instance ID of the scheduled orchestration instance
*/
public String scheduleNewOrchestrationInstance(String orchestratorName) {
return this.scheduleNewOrchestrationInstance(orchestratorName, null, null);
}
/**
* Schedules a new orchestration instance with a specified input and a random ID for execution.
*
* @param orchestratorName the name of the orchestrator to schedule
* @param input the input to pass to the scheduled orchestration instance. Must be serializable.
* @return the randomly-generated instance ID of the scheduled orchestration instance
*/
public String scheduleNewOrchestrationInstance(String orchestratorName, Object input) {
return this.scheduleNewOrchestrationInstance(orchestratorName, input, null);
}
/**
* Schedules a new orchestration instance with a specified input and ID for execution.
*
* @param orchestratorName the name of the orchestrator to schedule
* @param input the input to pass to the scheduled orchestration instance. Must be serializable.
* @param instanceId the unique ID of the orchestration instance to schedule
* @return the <code>instanceId</code> parameter value
*/
public String scheduleNewOrchestrationInstance(String orchestratorName, Object input, String instanceId) {
NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions()
.setInput(input)
.setInstanceId(instanceId);
return this.scheduleNewOrchestrationInstance(orchestratorName, options);
}
/**
* Schedules a new orchestration instance with a specified set of options for execution.
*
* @param orchestratorName the name of the orchestrator to schedule
* @param options the options for the new orchestration instance, including input, instance ID, etc.
* @return the ID of the scheduled orchestration instance, which was either provided in <code>options</code>
* or randomly generated
*/
public abstract String scheduleNewOrchestrationInstance(
String orchestratorName,
NewOrchestrationInstanceOptions options);
/**
* Sends an event notification message to a waiting orchestration instance.
* <p>
* In order to handle the event, the target orchestration instance must be waiting for an event named
* <code>eventName</code> using the {@link TaskOrchestrationContext#waitForExternalEvent(String)} method.
* If the target orchestration instance is not yet waiting for an event named <code>eventName</code>,
* then the event will be saved in the orchestration instance state and dispatched immediately when the
* orchestrator calls {@link TaskOrchestrationContext#waitForExternalEvent(String)}. This event saving occurs even
* if the orchestrator has canceled its wait operation before the event was received.
* <p>
* Raised events for a completed or non-existent orchestration instance will be silently discarded.
*
* @param instanceId the ID of the orchestration instance that will handle the event
* @param eventName the case-insensitive name of the event
*/
public void raiseEvent(String instanceId, String eventName) {
this.raiseEvent(instanceId, eventName, null);
}
/**
* Sends an event notification message with a payload to a waiting orchestration instance.
* <p>
* In order to handle the event, the target orchestration instance must be waiting for an event named
* <code>eventName</code> using the {@link TaskOrchestrationContext#waitForExternalEvent(String)} method.
* If the target orchestration instance is not yet waiting for an event named <code>eventName</code>,
* then the event will be saved in the orchestration instance state and dispatched immediately when the
* orchestrator calls {@link TaskOrchestrationContext#waitForExternalEvent(String)}. This event saving occurs even
* if the orchestrator has canceled its wait operation before the event was received.
* <p>
* Raised events for a completed or non-existent orchestration instance will be silently discarded.
*
* @param instanceId the ID of the orchestration instance that will handle the event
* @param eventName the case-insensitive name of the event
* @param eventPayload the serializable data payload to include with the event
*/
public abstract void raiseEvent(String instanceId, String eventName, @Nullable Object eventPayload);
/**
* Fetches orchestration instance metadata from the configured durable store.
*
* @param instanceId the unique ID of the orchestration instance to fetch
* @param getInputsAndOutputs <code>true</code> to fetch the orchestration instance's inputs, outputs, and custom
* status, or <code>false</code> to omit them
* @return a metadata record that describes the orchestration instance and its execution status, or
* a default instance if no such instance is found. Please refer to method
* {@link OrchestrationMetadata#isInstanceFound()} to check if an instance is found.
*/
@Nullable
public abstract OrchestrationMetadata getInstanceMetadata(String instanceId, boolean getInputsAndOutputs);
/**
* Waits for an orchestration to start running and returns an {@link OrchestrationMetadata} object that contains
* metadata about the started instance.
* <p>
* A "started" orchestration instance is any instance not in the <code>Pending</code> state.
* <p>
* If an orchestration instance is already running when this method is called, the method will return immediately.
* <p>
* Note that this method overload will not fetch the orchestration's inputs, outputs, or custom status payloads.
*
* @param instanceId the unique ID of the orchestration instance to wait for
* @param timeout the amount of time to wait for the orchestration instance to start
* @throws TimeoutException when the orchestration instance is not started within the specified amount of time
* @return the orchestration instance metadata or <code>null</code> if no such instance is found
*/
@Nullable
public OrchestrationMetadata waitForInstanceStart(String instanceId, Duration timeout) throws TimeoutException {
return this.waitForInstanceStart(instanceId, timeout, false);
}
/**
* Waits for an orchestration to start running and returns an {@link OrchestrationMetadata} object that contains
* metadata about the started instance and optionally its input, output, and custom status payloads.
* <p>
* A "started" orchestration instance is any instance not in the <code>Pending</code> state.
* <p>
* If an orchestration instance is already running when this method is called, the method will return immediately.
*
* @param instanceId the unique ID of the orchestration instance to wait for
* @param timeout the amount of time to wait for the orchestration instance to start
* @param getInputsAndOutputs <code>true</code> to fetch the orchestration instance's inputs, outputs, and custom
* status, or <code>false</code> to omit them
* @throws TimeoutException when the orchestration instance is not started within the specified amount of time
* @return the orchestration instance metadata or <code>null</code> if no such instance is found
*/
@Nullable
public abstract OrchestrationMetadata waitForInstanceStart(
String instanceId,
Duration timeout,
boolean getInputsAndOutputs) throws TimeoutException;
/**
* Waits for an orchestration to complete and returns an {@link OrchestrationMetadata} object that contains
* metadata about the completed instance.
* <p>
* A "completed" orchestration instance is any instance in one of the terminal states. For example, the
* <code>Completed</code>, <code>Failed</code>, or <code>Terminated</code> states.
* <p>
* Orchestrations are long-running and could take hours, days, or months before completing.
* Orchestrations can also be eternal, in which case they'll never complete unless terminated.
* In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are used.
* <p>
* If an orchestration instance is already complete when this method is called, the method will return immediately.
*
* @param instanceId the unique ID of the orchestration instance to wait for
* @param timeout the amount of time to wait for the orchestration instance to complete
* @param getInputsAndOutputs <code>true</code> to fetch the orchestration instance's inputs, outputs, and custom
* status, or <code>false</code> to omit them
* @throws TimeoutException when the orchestration instance is not completed within the specified amount of time
* @return the orchestration instance metadata or <code>null</code> if no such instance is found
*/
@Nullable
public abstract OrchestrationMetadata waitForInstanceCompletion(
String instanceId,
Duration timeout,
boolean getInputsAndOutputs) throws TimeoutException;
/**
* Terminates a running orchestration instance and updates its runtime status to <code>Terminated</code>.
* <p>
* This method internally enqueues a "terminate" message in the task hub. When the task hub worker processes
* this message, it will update the runtime status of the target instance to <code>Terminated</code>.
* You can use the {@link #waitForInstanceCompletion} to wait for the instance to reach the terminated state.
* <p>
* Terminating an orchestration instance has no effect on any in-flight activity function executions
* or sub-orchestrations that were started by the terminated instance. Those actions will continue to run
* without interruption. However, their results will be discarded. If you want to terminate sub-orchestrations,
* you must issue separate terminate commands for each sub-orchestration instance.
* <p>
* At the time of writing, there is no way to terminate an in-flight activity execution.
* <p>
* Attempting to terminate a completed or non-existent orchestration instance will fail silently.
*
* @param instanceId the unique ID of the orchestration instance to terminate
* @param output the optional output to set for the terminated orchestration instance.
* This value must be serializable.
*/
public abstract void terminate(String instanceId, @Nullable Object output);
/**
* Fetches orchestration instance metadata from the configured durable store using a status query filter.
*
* @param query filter criteria that determines which orchestrations to fetch data for.
* @return the result of the query operation, including instance metadata and possibly a continuation token
*/
public abstract OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery query);
/**
* Initializes the target task hub data store.
* <p>
* This is an administrative operation that only needs to be done once for the lifetime of the task hub.
*
* @param recreateIfExists <code>true</code> to delete any existing task hub first; <code>false</code> to make this
* operation a no-op if the task hub data store already exists. Note that deleting a task
* hub will result in permanent data loss. Use this operation with care.
*/
public abstract void createTaskHub(boolean recreateIfExists);
/**
* Permanently deletes the target task hub data store and any orchestration data it may contain.
* <p>
* This is an administrative operation that is irreversible. It should be used with great care.
*/
public abstract void deleteTaskHub();
/**
* Purges orchestration instance metadata from the durable store.
* <p>
* This method can be used to permanently delete orchestration metadata from the underlying storage provider,
* including any stored inputs, outputs, and orchestration history records. This is often useful for implementing
* data retention policies and for keeping storage costs minimal. Only orchestration instances in the
* <code>Completed</code>, <code>Failed</code>, or <code>Terminated</code> state can be purged.
* <p>
* If the target orchestration instance is not found in the data store, or if the instance is found but not in a
* terminal state, then the returned {@link PurgeResult} will report that zero instances were purged.
* Otherwise, the existing data will be purged and the returned {@link PurgeResult} will report that one instance
* was purged.
*
* @param instanceId the unique ID of the orchestration instance to purge
* @return the result of the purge operation, including the number of purged orchestration instances (0 or 1)
*/
public abstract PurgeResult purgeInstance(String instanceId);
/**
* Purges orchestration instance metadata from the durable store using a filter that determines which instances to
* purge data for.
* <p>
* This method can be used to permanently delete orchestration metadata from the underlying storage provider,
* including any stored inputs, outputs, and orchestration history records. This is often useful for implementing
* data retention policies and for keeping storage costs minimal. Only orchestration instances in the
* <code>Completed</code>, <code>Failed</code>, or <code>Terminated</code> state can be purged.
* <p>
* Depending on the type of the durable store, purge operations that target multiple orchestration instances may
* take a long time to complete and be resource intensive. It may therefore be useful to break up purge operations
* into multiple method calls over a period of time and have them cover smaller time windows.
*
* @param purgeInstanceCriteria orchestration instance filter criteria used to determine which instances to purge
* @throws TimeoutException when purging instances is not completed within the specified amount of time.
* The default timeout for purging instances is 10 minutes
* @return the result of the purge operation, including the number of purged orchestration instances (0 or 1)
*/
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException;
/**
* Restarts an existing orchestration instance with the original input.
* @param instanceId the ID of the previously run orchestration instance to restart.
* @param restartWithNewInstanceId <code>true</code> to restart the orchestration instance with a new instance ID
* <code>false</code> to restart the orchestration instance with same instance ID
* @return the ID of the scheduled orchestration instance, which is either <code>instanceId</code> or randomly
* generated depending on the value of <code>restartWithNewInstanceId</code>
*/
public abstract String restartInstance(String instanceId, boolean restartWithNewInstanceId);
/**
* Rewinds a failed orchestration instance to the last known good state and replays from there.
* <p>
* This method can only be used on orchestration instances that are in a <code>Failed</code> state.
* When rewound, the orchestration instance will restart from the point of failure as if the failure
* never occurred. It rewinds the orchestration by replaying any
* Failed Activities and Failed suborchestrations that themselves have Failed Activities
* <p>
* <b>Note:</b> Rewind requires a backend that supports it. When using Azure Functions with the
* Durable Task extension, rewind is fully supported. The standalone {@code GrpcDurableTaskWorker}
* does not currently support orchestration processing for rewind.
*
* @param instanceId the ID of the orchestration instance to rewind
*/
public void rewindInstance(String instanceId) {
this.rewindInstance(instanceId, null);
}
/**
* Rewinds a failed orchestration instance to the last known good state and replays from there.
* <p>
* This method can only be used on orchestration instances that are in a <code>Failed</code> state.
* When rewound, the orchestration instance will restart from the point of failure as if the failure
* never occurred. It rewinds the orchestration by replaying any
* Failed Activities and Failed suborchestrations that themselves have Failed Activities
* <p>
* <b>Note:</b> Rewind requires a backend that supports it. When using Azure Functions with the
* Durable Task extension, rewind is fully supported. The standalone {@code GrpcDurableTaskWorker}
* does not currently support orchestration processing for rewind.
*
* @param instanceId the ID of the orchestration instance to rewind
* @param reason the reason for rewinding the orchestration instance
*/
public abstract void rewindInstance(String instanceId, @Nullable String reason);
/**
* Suspends a running orchestration instance.
* @param instanceId the ID of the orchestration instance to suspend
*/
public void suspendInstance (String instanceId) {
this.suspendInstance(instanceId, null);
}
/**
* Resumes a running orchestration instance.
* @param instanceId the ID of the orchestration instance to resume
*/
public void resumeInstance(String instanceId) {
this.resumeInstance(instanceId, null);
}
/**
* Suspends a running orchestration instance.
* @param instanceId the ID of the orchestration instance to suspend
* @param reason the reason for suspending the orchestration instance
*/
public abstract void suspendInstance(String instanceId, @Nullable String reason);
/**
* Resumes a running orchestration instance.
* @param instanceId the ID of the orchestration instance to resume
* @param reason the reason for resuming the orchestration instance
*/
public abstract void resumeInstance(String instanceId, @Nullable String reason);
// region Entity APIs
/**
* Gets the entity client for interacting with durable entities.
* <p>
* This mirrors the .NET SDK's {@code DurableTaskClient.Entities} property, providing a
* dedicated client for entity operations such as signaling, querying, and storage management.
*
* @return the {@link DurableEntityClient} for this client
* @throws UnsupportedOperationException if the current client implementation does not support entities
*/
public DurableEntityClient getEntities() {
throw new UnsupportedOperationException("Entity operations are not supported by this client implementation.");
}
/**
* Sends a signal to a durable entity instance without waiting for a response.
* <p>
* If the target entity does not exist, it will be created automatically when it receives the signal.
*
* @param entityId the target entity's instance ID
* @param operationName the name of the operation to invoke on the entity
* @deprecated Use {@code getEntities().signalEntity(entityId, operationName)} instead.
*/
@Deprecated
public void signalEntity(EntityInstanceId entityId, String operationName) {
this.getEntities().signalEntity(entityId, operationName);
}
/**
* Sends a signal with input to a durable entity instance without waiting for a response.
* <p>
* If the target entity does not exist, it will be created automatically when it receives the signal.
*
* @param entityId the target entity's instance ID
* @param operationName the name of the operation to invoke on the entity
* @param input the serializable input for the operation, or {@code null}
* @deprecated Use {@code getEntities().signalEntity(entityId, operationName, input)} instead.
*/
@Deprecated
public void signalEntity(EntityInstanceId entityId, String operationName, @Nullable Object input) {
this.getEntities().signalEntity(entityId, operationName, input);
}
/**
* Sends a signal with input and options to a durable entity instance without waiting for a response.
*
* @param entityId the target entity's instance ID
* @param operationName the name of the operation to invoke on the entity
* @param input the serializable input for the operation, or {@code null}
* @param options additional options for the signal, or {@code null}
* @deprecated Use {@code getEntities().signalEntity(entityId, operationName, input, options)} instead.
*/
@Deprecated
public void signalEntity(
EntityInstanceId entityId,
String operationName,
@Nullable Object input,
@Nullable SignalEntityOptions options) {
this.getEntities().signalEntity(entityId, operationName, input, options);
}
/**
* Fetches the metadata for a durable entity instance, including its state.
*
* @param entityId the entity instance ID to query
* @return the entity metadata, or {@code null} if the entity does not exist
* @deprecated Use {@code getEntities().getEntityMetadata(entityId)} instead.
*/
@Deprecated
@Nullable
public EntityMetadata getEntityMetadata(EntityInstanceId entityId) {
return this.getEntities().getEntityMetadata(entityId);
}
/**
* Fetches the metadata for a durable entity instance, optionally including its state.
*
* @param entityId the entity instance ID to query
* @param includeState {@code true} to include the entity's serialized state in the result
* @return the entity metadata, or {@code null} if the entity does not exist
* @deprecated Use {@code getEntities().getEntityMetadata(entityId, includeState)} instead.
*/
@Deprecated
@Nullable
public EntityMetadata getEntityMetadata(EntityInstanceId entityId, boolean includeState) {
return this.getEntities().getEntityMetadata(entityId, includeState);
}
/**
* Queries the durable store for entity instances matching the specified filter criteria.
*
* @param query the query filter criteria
* @return the query result containing matching entities and an optional continuation token
* @deprecated Use {@code getEntities().queryEntities(query)} instead.
*/
@Deprecated
public EntityQueryResult queryEntities(EntityQuery query) {
return this.getEntities().queryEntities(query);
}
/**
* Returns an auto-paginating iterable over entity instances matching the specified filter criteria.
*
* @param query the query filter criteria
* @return a pageable iterable over all matching entities
* @deprecated Use {@code getEntities().getAllEntities(query)} instead.
*/
@Deprecated
public EntityQueryPageable getAllEntities(EntityQuery query) {
return this.getEntities().getAllEntities(query);
}
/**
* Returns an auto-paginating iterable over all entity instances.
*
* @return a pageable iterable over all entities
* @deprecated Use {@code getEntities().getAllEntities()} instead.
*/
@Deprecated
public EntityQueryPageable getAllEntities() {
return this.getEntities().getAllEntities();
}
/**
* Cleans up entity storage by removing empty entities and/or releasing orphaned locks.
*
* @param request the clean storage request specifying what to clean
* @return the result of the clean operation, including counts of removed entities and released locks
* @deprecated Use {@code getEntities().cleanEntityStorage(request)} instead.
*/
@Deprecated
public CleanEntityStorageResult cleanEntityStorage(CleanEntityStorageRequest request) {
return this.getEntities().cleanEntityStorage(request);
}
// endregion
// region History and Instance Listing APIs
/**
* Retrieves the full history of the specified orchestration instance.
* <p>
* Each history event is returned as a transport-neutral {@link OrchestrationHistoryEvent} that
* decouples the public API from the underlying wire format.
*
* @param instanceId the orchestration instance ID
* @return list of history events for the orchestration
* @throws UnsupportedOperationException if this client implementation does not support history retrieval
* @throws IllegalArgumentException if the instance is not found
*/
public List<OrchestrationHistoryEvent> getOrchestrationHistory(String instanceId) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not support retrieving orchestration history.");
}
/**
* Lists orchestration instance IDs that match the specified runtime status and
* completed time range, using key-based pagination.
*
* @param runtimeStatus optional set of runtime statuses to filter by; if {@code null}, all statuses are included
* @param completedTimeFrom inclusive lower bound of the orchestration completed time filter
* @param completedTimeTo inclusive upper bound of the orchestration completed time filter
* @param pageSize maximum number of instance IDs to return in a single page; must be between 1 and 1000
* @param lastInstanceKey continuation key from the previous page; {@code null} to start from the beginning
* @return a page of orchestration instance IDs along with a continuation token
* @throws IllegalArgumentException if {@code pageSize} is outside the supported range, or if both
* {@code completedTimeFrom} and {@code completedTimeTo} are supplied and
* {@code completedTimeTo} is not strictly after {@code completedTimeFrom}
* @throws UnsupportedOperationException if this client implementation does not support instance ID listing
*/
public InstanceIdPage listInstanceIds(
@Nullable Collection<OrchestrationRuntimeStatus> runtimeStatus,
@Nullable Instant completedTimeFrom,
@Nullable Instant completedTimeTo,
int pageSize,
@Nullable String lastInstanceKey) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not support listing instance IDs by completed time.");
}
// endregion
}