Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,002 changes: 739 additions & 263 deletions transact/src/main/java/dev/dbos/transact/DBOS.java

Large diffs are not rendered by default.

80 changes: 56 additions & 24 deletions transact/src/main/java/dev/dbos/transact/DBOSClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,33 @@
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

class ClientWorkflowHandle<T, E extends Exception> implements WorkflowHandle<T, E> {

private final SystemDatabase systemDatabase;
private final String workflowId;

public ClientWorkflowHandle(SystemDatabase systemDatabase, String workflowId) {
this.systemDatabase = systemDatabase;
this.workflowId = workflowId;
}

@Override
public String workflowId() {
return workflowId;
}

@Override
public T getResult() throws E {
var result = systemDatabase.<T>awaitWorkflowResult(workflowId);
return Result.<T, E>process(result);
}

@Override
public WorkflowStatus getStatus() {
return systemDatabase.getWorkflowStatus(workflowId);
}
}

/**
* DBOSClient allows external programs to interact with DBOS apps via direct system database access.
* Example interactions: Start/enqueue a workflow, and get the result Get events and send messages
Expand Down Expand Up @@ -470,31 +497,36 @@ public EnqueueOptions(
String serializationFormat =
options.serialization() != null ? options.serialization().formatName() : null;

return DBOSExecutor.enqueueWorkflow(
Objects.requireNonNull(
options.workflowName(), "EnqueueOptions workflowName must not be null"),
Objects.requireNonNull(options.className(), "EnqueueOptions className must not be null"),
Objects.requireNonNullElse(options.instanceName(), ""),
null,
args,
new DBOSExecutor.ExecutionOptions(
Objects.requireNonNullElseGet(options.workflowId(), () -> UUID.randomUUID().toString()),
Timeout.of(options.timeout()),
options.deadline,
var workflowId =
DBOSExecutor.enqueueWorkflow(
Objects.requireNonNull(
options.workflowName(), "EnqueueOptions workflowName must not be null"),
Objects.requireNonNull(
options.queueName(), "EnqueueOptions queueName must not be null"),
options.deduplicationId,
options.priority,
options.queuePartitionKey,
false,
false,
serializationFormat),
null,
null,
null,
options.appVersion,
systemDatabase,
this.serializer);
options.className(), "EnqueueOptions className must not be null"),
Objects.requireNonNullElse(options.instanceName(), ""),
null,
args,
new DBOSExecutor.ExecutionOptions(
Objects.requireNonNullElseGet(
options.workflowId(), () -> UUID.randomUUID().toString()),
Timeout.of(options.timeout()),
options.deadline,
Objects.requireNonNull(
options.queueName(), "EnqueueOptions queueName must not be null"),
options.deduplicationId,
options.priority,
options.queuePartitionKey,
false,
false,
serializationFormat),
null,
null,
null,
options.appVersion,
systemDatabase,
this.serializer);

return new ClientWorkflowHandle<>(systemDatabase, workflowId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,9 @@ public static boolean inStep() {
var ctx = DBOSContextHolder.get();
return ctx == null ? false : ctx.isInStep();
}

public static SerializationStrategy serializationStrategy() {
var ctx = DBOSContextHolder.get();
return ctx != null ? ctx.getSerialization() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ void resumeWorkflow(String workflowId) throws SQLException {
String forkWorkflow(String originalWorkflowId, int startStep, ForkOptions options)
throws SQLException {

Objects.requireNonNull(options);
options = Objects.requireNonNullElseGet(options, ForkOptions::new);

var status = getWorkflowStatus(originalWorkflowId);
if (status == null) {
Expand Down Expand Up @@ -897,7 +897,11 @@ private static void insertForkedWorkflowStatus(
stmt.setString(6, applicationVersion);
stmt.setString(7, originalStatus.appId());
stmt.setString(8, originalStatus.authenticatedUser());
stmt.setString(9, JSONUtil.serializeArray(originalStatus.authenticatedRoles()));
stmt.setString(
9,
originalStatus.authenticatedRoles() == null
? null
: JSONUtil.serializeArray(originalStatus.authenticatedRoles()));
stmt.setString(10, originalStatus.assumedRole());
stmt.setString(11, Constants.DBOS_INTERNAL_QUEUE);
stmt.setString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public ExecutorService get() {
listeners.add(schedulerService);

for (var listener : listeners) {
listener.dbosLaunched();
listener.dbosLaunched(dbos);
}

var recoveryTask =
Expand Down Expand Up @@ -650,7 +650,7 @@ public <T, E extends Exception> T runStepInternal(
/** Retrieve the workflowHandle for the workflowId */
public <R, E extends Exception> WorkflowHandle<R, E> retrieveWorkflow(String workflowId) {
logger.debug("retrieveWorkflow {}", workflowId);
return new WorkflowHandleDBPoll<R, E>(workflowId);
return new WorkflowHandleDBPoll<>(this, workflowId);
}

public void sleep(Duration duration) {
Expand Down Expand Up @@ -1089,6 +1089,10 @@ public <T, E extends Exception> WorkflowHandle<T, E> startWorkflow(
logger.debug("startWorkflow {}", options);

var invocation = captureInvocation(supplier);
if (invocation.executor() != this) {
throw new IllegalStateException(
"The @Workflow method must be called on the DBOS instance passed to the startWorkflow lambda");
}
var workflow = getWorkflow(invocation);

var ctx = DBOSContextHolder.get();
Expand Down Expand Up @@ -1278,19 +1282,21 @@ private <T, E extends Exception> WorkflowHandle<T, E> executeWorkflow(
"queue %s does not exist".formatted(options.queueName()));
}

return enqueueWorkflow(
workflow.name(),
workflow.className(),
workflow.instanceName(),
maxRetries,
args,
options,
parent,
executorId(),
appVersion(),
appId(),
systemDatabase,
this.serializer);
var workflowId =
enqueueWorkflow(
workflow.name(),
workflow.className(),
workflow.instanceName(),
maxRetries,
args,
options,
parent,
executorId(),
appVersion(),
appId(),
systemDatabase,
this.serializer);
return new WorkflowHandleDBPoll<>(this, workflowId);
}

logger.debug("executeWorkflow {}({}) {}", workflow.fullyQualifiedName(), args, options);
Expand Down Expand Up @@ -1413,10 +1419,10 @@ private <T, E extends Exception> WorkflowHandle<T, E> executeWorkflow(
TimeUnit.MILLISECONDS);
}

return new WorkflowHandleFuture<T, E>(workflowId, future, this);
return new WorkflowHandleFuture<T, E>(this, workflowId, future);
}

public static <T, E extends Exception> WorkflowHandle<T, E> enqueueWorkflow(
public static String enqueueWorkflow(
String name,
String className,
String instanceName,
Expand Down Expand Up @@ -1468,10 +1474,10 @@ public static <T, E extends Exception> WorkflowHandle<T, E> enqueueWorkflow(
options.isDequeuedRequest,
options.serialization(),
serializer);
return new WorkflowHandleDBPoll<T, E>(workflowId);
return workflowId;
} catch (DBOSWorkflowExecutionConflictException e) {
logger.debug("Workflow execution conflict for workflowId {}", workflowId);
return new WorkflowHandleDBPoll<T, E>(workflowId);
return workflowId;
} catch (DBOSQueueDuplicatedException e) {
logger.debug(
"Workflow queue {} reused deduplicationId {}", e.queueName(), e.deduplicationId());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package dev.dbos.transact.execution;

import dev.dbos.transact.DBOS;

/**
* For registering callbacks that hear about `DBOS.launch()` and `DBOS.shutdown()`. At this point,
* DBOS is ready to run workflows, and no additional registrations are allowed.
*/
public interface DBOSLifecycleListener {
/** Called from within DBOS.launch, after workflow processing is allowed */
void dbosLaunched();
void dbosLaunched(DBOS.Instance dbos);

/** Called from within DBOS.shutdown, before workflow processing is stopped */
void dbosShutDown();
Expand Down
Loading