diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index 83233beb..0a4dbba1 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -2,7 +2,6 @@ import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.DBOSContext; -import dev.dbos.transact.context.DBOSContextHolder; import dev.dbos.transact.database.ExternalState; import dev.dbos.transact.execution.DBOSExecutor; import dev.dbos.transact.execution.DBOSLifecycleListener; @@ -54,13 +53,13 @@ private DBOS() {} private static final Logger logger = LoggerFactory.getLogger(DBOS.class); private static final String version = loadVersionFromResources(); + private static final AtomicReference globalInstance = new AtomicReference<>(); private static @Nullable String loadVersionFromResources() { final String PROPERTIES_FILE = "/dev/dbos/transact/app.properties"; final String VERSION_KEY = "app.version"; Properties props = new Properties(); try (InputStream input = DBOS.class.getResourceAsStream(PROPERTIES_FILE)) { - if (input == null) { logger.warn("Could not find {} resource file", PROPERTIES_FILE); return ""; @@ -71,14 +70,13 @@ private DBOS() {} // Retrieve the version property, defaulting to "unknown" return props.getProperty(VERSION_KEY, ""); - } catch (IOException ex) { logger.error("Error loading version properties", ex); return ""; } } - public static @Nullable String version() { + public static String version() { return version; } @@ -86,14 +84,99 @@ public static class Instance { private final WorkflowRegistry workflowRegistry = new WorkflowRegistry(); private final QueueRegistry queueRegistry = new QueueRegistry(); private final Set lifecycleRegistry = ConcurrentHashMap.newKeySet(); + private final DBOSConfig config; + private final AtomicReference dbosExecutor = new AtomicReference<>(); + private AlertHandler alertHandler; - private DBOSConfig config; + public Instance(@NonNull DBOSConfig config) { + Objects.requireNonNull(config.appName(), "DBOSConfig.appName must not be null"); + if (config.dataSource() == null) { + Objects.requireNonNull(config.databaseUrl(), "DBOSConfig.databaseUrl must not be null"); + Objects.requireNonNull(config.dbUser(), "DBOSConfig.dbUser must not be null"); + Objects.requireNonNull(config.dbPassword(), "DBOSConfig.dbPassword must not be null"); + } - private final AtomicReference dbosExecutor = new AtomicReference<>(); + this.config = config; + } + + public String version() { + return DBOS.version(); + } + + /** + * Register a lifecycle listener that receives callbacks when DBOS is launched or shut down + * + * @param listener + */ + public void registerLifecycleListener(@NonNull DBOSLifecycleListener listener) { + if (dbosExecutor.get() != null) { + throw new IllegalStateException( + "Cannot register lifecycle listener after DBOS is launched"); + } - private Instance() { - DBOSContextHolder.clear(); // CB: Why + lifecycleRegistry.add(listener); + } + + /** + * Register a DBOS queue. This must be called on each queue prior to launch, so that recovery + * has the queue options available. + * + * @param queue `Queue` to register + */ + public void registerQueue(@NonNull Queue queue) { + if (dbosExecutor.get() != null) { + throw new IllegalStateException("Cannot build a queue after DBOS is launched"); + } + + queueRegistry.register(queue); + } + + /** + * Register a set of DBOS queues. Each queue must be registered prior to launch, so that + * recovery has the queue options available. + * + * @param queues collection of `Queue` instances to register + */ + public void registerQueues(@NonNull Queue... queues) { + for (Queue queue : queues) { + registerQueue(queue); + } + } + + /** + * Register all workflows and steps in the provided class instance + * + * @param The interface type for the instance + * @param interfaceClass The interface class for the workflows + * @param implementation An implementation instance providing the workflow and step function + * code + * @return A proxy, with interface {@literal }, that provides durability for the workflow + * functions + */ + public @NonNull T registerWorkflows( + @NonNull Class interfaceClass, @NonNull T implementation) { + return registerWorkflows(interfaceClass, implementation, ""); + } + + /** + * Register all workflows and steps in the provided class instance + * + * @param The interface type for the instance + * @param interfaceClass The interface class for the workflows + * @param implementation An implementation instance providing the workflow and step function + * code + * @param instanceName Name of the instance, allowing multiple instances of the same class to be + * registered + * @return A proxy, with interface {@literal }, that provides durability for the workflow + * functions + */ + public @NonNull T registerWorkflows( + @NonNull Class interfaceClass, @NonNull T implementation, @NonNull String instanceName) { + registerClassWorkflows(interfaceClass, implementation, instanceName); + + return DBOSInvocationHandler.createProxy( + interfaceClass, implementation, instanceName, () -> this.dbosExecutor.get()); } private void registerClassWorkflows( @@ -129,7 +212,7 @@ private void registerClassWorkflows( } } - private @NonNull String registerWorkflowMethod( + private void registerWorkflowMethod( @NonNull Workflow wfTag, @NonNull Object target, @NonNull String className, @@ -148,51 +231,16 @@ private void registerClassWorkflows( method, wfTag.maxRecoveryAttempts(), wfTag.serializationStrategy()); - return name; - } - - void registerLifecycleListener(@NonNull DBOSLifecycleListener l) { - if (dbosExecutor.get() != null) { - throw new IllegalStateException( - "Cannot register lifecycle listener after DBOS is launched"); - } - - lifecycleRegistry.add(l); - } - - void registerQueue(@NonNull Queue queue) { - if (dbosExecutor.get() != null) { - throw new IllegalStateException("Cannot build a queue after DBOS is launched"); - } - - queueRegistry.register(queue); - } - - public @NonNull T registerWorkflows( - @NonNull Class interfaceClass, @NonNull T implementation) { - return registerWorkflows(interfaceClass, implementation, ""); - } - - public @NonNull T registerWorkflows( - @NonNull Class interfaceClass, @NonNull T implementation, @NonNull String instanceName) { - registerClassWorkflows(interfaceClass, implementation, instanceName); - - return DBOSInvocationHandler.createProxy( - interfaceClass, implementation, instanceName, () -> this.dbosExecutor.get()); - } - - private void registerInternals() { - this.registerQueue(new Queue(Constants.DBOS_INTERNAL_QUEUE)); - } - - void clearRegistry() { - workflowRegistry.clear(); - queueRegistry.clear(); - lifecycleRegistry.clear(); - - registerInternals(); } + /** + * Registers an {@link AlertHandler} to handle alerts generated by DBOS. This method must be + * called before DBOS is launched; attempting to register an alert handler after launch will + * result in an {@link IllegalStateException}. + * + * @param handler the {@link AlertHandler} instance to register; must not be null + * @throws IllegalStateException if called after DBOS has been launched + */ public void registerAlertHandler(AlertHandler handler) { if (dbosExecutor.get() != null) { throw new IllegalStateException("Cannot set alert handler after DBOS is launched"); @@ -201,37 +249,26 @@ public void registerAlertHandler(AlertHandler handler) { this.alertHandler = handler; } - // package private methods for test purposes + // package private method for test purposes @Nullable DBOSExecutor getDbosExecutor() { return dbosExecutor.get(); } - public void setConfig(@NonNull DBOSConfig config) { - if (this.config != null) { - throw new IllegalStateException("DBOS has already been configured"); - } - - Objects.requireNonNull(config.appName(), "DBOSConfig.appName must not be null"); - if (config.dataSource() == null) { - Objects.requireNonNull(config.databaseUrl(), "DBOSConfig.databaseUrl must not be null"); - Objects.requireNonNull(config.dbUser(), "DBOSConfig.dbUser must not be null"); - Objects.requireNonNull(config.dbPassword(), "DBOSConfig.dbPassword must not be null"); - } - - this.config = config; - } - + /** + * Launch DBOS, and start recovery. All workflows, queues, and other objects should be + * registered before launch + */ public void launch() { - if (this.config == null) { - throw new IllegalStateException("DBOS must be configured before launch()"); - } - var ver = DBOS.version(); - logger.info("Launching DBOS {}", ver == null ? "" : "v" + ver); + logger.info("Launching DBOS v{}", DBOS.version()); if (dbosExecutor.get() == null) { var executor = new DBOSExecutor(config); if (dbosExecutor.compareAndSet(null, executor)) { + if (config.migrate()) { + MigrationManager.runMigrations(config); + } + executor.start( this, new HashSet<>(this.lifecycleRegistry), @@ -243,6 +280,10 @@ public void launch() { } } + /** + * Shut down DBOS. This method should only be used in test environments, where DBOS is used + * multiple times in the same JVM. + */ public void shutdown() { var current = dbosExecutor.get(); if (current != null) { @@ -253,6 +294,533 @@ public void shutdown() { } logger.info("DBOS shut down"); } + + // helper for methods that can only be called after launch + private DBOSExecutor ensureLaunched(String caller) { + var exec = dbosExecutor.get(); + if (exec == null) { + throw new IllegalStateException( + String.format("Cannot call %s before DBOS is launched", caller)); + } + return exec; + } + + /** + * Retrieve a queue definition + * + * @param queueName Name of the queue + * @return Queue definition for given `queueName` + */ + public @NonNull Optional getQueue(@NonNull String queueName) { + return ensureLaunched("getQueue").getQueue(queueName); + } + + /** + * Durable sleep. Use this instead of Thread.sleep, especially in workflows. On restart or + * during recovery the original expected wakeup time is honoured as opposed to sleeping all over + * again. + * + * @param duration amount of time to sleep + */ + public void sleep(@NonNull Duration duration) { + if (!DBOSContext.inWorkflow()) { + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else if (DBOSContext.inStep()) { + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + ensureLaunched("sleep").sleep(duration); + } + } + + /** + * Start or enqueue a workflow with a return value + * + * @param Return type of the workflow + * @param Type of checked exception thrown by the workflow, if any + * @param supplier A lambda that calls exactly one workflow function + * @param options Start workflow options + * @return A handle to the enqueued or running workflow + */ + public @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingSupplier supplier, @NonNull StartWorkflowOptions options) { + return ensureLaunched("startWorkflow").startWorkflow(supplier, options); + } + + /** + * Start or enqueue a workflow with default options + * + * @param Return type of the workflow + * @param Type of checked exception thrown by the workflow, if any + * @param supplier A lambda that calls exactly one workflow function + * @return A handle to the enqueued or running workflow + */ + public @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingSupplier supplier) { + return startWorkflow(supplier, new StartWorkflowOptions()); + } + + /** + * Start or enqueue a workflow with no return value + * + * @param Type of checked exception thrown by the workflow, if any + * @param runnable A lambda that calls exactly one workflow function + * @param options Start workflow options + * @return A handle to the enqueued or running workflow + */ + public @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingRunnable runnable, @NonNull StartWorkflowOptions options) { + return startWorkflow( + () -> { + runnable.execute(); + return null; + }, + options); + } + + /** + * Start or enqueue a workflow with no return value, using default options + * + * @param Type of checked exception thrown by the workflow, if any + * @param runnable A lambda that calls exactly one workflow function + * @return A handle to the enqueued or running workflow + */ + public @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingRunnable runnable) { + return startWorkflow(runnable, new StartWorkflowOptions()); + } + + /** + * Execute a workflow based on registration and arguments. This is expected to be used by event + * listeners, not app code. + * + * @param regWorkflow Registration of the workflow. @see getRegisteredWorkflows + * @param args Workflow function arguments + * @param options Execution options, such as ID, queue, and timeout/deadline + * @return WorkflowHandle to the executed workflow + */ + public WorkflowHandle startWorkflow( + RegisteredWorkflow regWorkflow, Object[] args, StartWorkflowOptions options) { + return ensureLaunched("startWorkflow").startWorkflow(regWorkflow, args, options); + } + + /** + * Get the result of a workflow, or rethrow the exception thrown by the workflow + * + * @param Return type of the workflow + * @param Checked exception type, if any, thrown by the workflow + * @param workflowId ID of the workflow to retrieve + * @return Return value of the workflow + * @throws E if the workflow threw an exception + */ + public T getResult(@NonNull String workflowId) throws E { + return ensureLaunched("getResult").getResult(workflowId); + } + + /** + * Get the status of a workflow + * + * @param workflowId ID of the workflow to query + * @return Current workflow status for the provided workflowId, or null. + */ + public @Nullable WorkflowStatus getWorkflowStatus(@NonNull String workflowId) { + return ensureLaunched("getWorkflowStatus").getWorkflowStatus(workflowId); + } + + /** + * Send a message to a workflow + * + * @param destinationId recipient of the message + * @param message message to be sent + * @param topic topic to which the message is send + * @param idempotencyKey optional idempotency key for exactly-once send + */ + public void send( + @NonNull String destinationId, + @NonNull Object message, + @Nullable String topic, + @Nullable String idempotencyKey) { + send(destinationId, message, topic, idempotencyKey, null); + } + + /** + * Send a message to a workflow + * + * @param destinationId recipient of the message + * @param message message to be sent + * @param topic topic to which the message is send + */ + public void send( + @NonNull String destinationId, @NonNull Object message, @Nullable String topic) { + send(destinationId, message, topic, null, null); + } + + /** + * Send a message to a workflow with serialization strategy + * + * @param destinationId recipient of the message + * @param message message to be sent + * @param topic topic to which the message is send + * @param idempotencyKey optional idempotency key for exactly-once send + * @param serialization serialization strategy to use (null for default) + */ + public void send( + @NonNull String destinationId, + @NonNull Object message, + @Nullable String topic, + @Nullable String idempotencyKey, + @Nullable SerializationStrategy serialization) { + if (serialization == null) serialization = SerializationStrategy.DEFAULT; + ensureLaunched("send").send(destinationId, message, topic, idempotencyKey, serialization); + } + + /** + * Get a message sent to a particular topic + * + * @param topic the topic whose message to get + * @param timeout duration after which the call times out + * @return the message if there is one or else null + */ + public @Nullable Object recv(@Nullable String topic, @NonNull Duration timeout) { + return ensureLaunched("recv").recv(topic, timeout); + } + + /** + * Call within a workflow to publish a key value pair. Uses the workflow's serialization format. + * + * @param key identifier for published data + * @param value data that is published + */ + public void setEvent(@NonNull String key, @NonNull Object value) { + setEvent(key, value, null); + } + + /** + * Call within a workflow to publish a key value pair with a specific serialization strategy. + * + * @param key identifier for published data + * @param value data that is published + * @param serialization serialization strategy to use (null to use workflow's default) + */ + public void setEvent( + @NonNull String key, @NonNull Object value, @Nullable SerializationStrategy serialization) { + // If no explicit serialization specified, use the workflow context's serialization + if (serialization == null) { + serialization = serializationStrategy(); + } + ensureLaunched("setEvent").setEvent(key, value, serialization); + } + + /** + * Get the data published by a workflow + * + * @param workflowId id of the workflow who data is to be retrieved + * @param key identifies the data + * @param timeout time to wait for data before timing out + * @return the published value or null + */ + public @Nullable Object getEvent( + @NonNull String workflowId, @NonNull String key, @NonNull Duration timeout) { + logger.debug("Received getEvent for {} {}", workflowId, key); + + return ensureLaunched("getEvent").getEvent(workflowId, key, timeout); + } + + /** + * Run the provided function as a step; this variant is for functions with a return value + * + * @param Checked exception thrown by the step, if any + * @param stepfunc function or lambda to run + * @param opts step name, and retry options for running the step + * @throws E + */ + public T runStep( + @NonNull ThrowingSupplier stepfunc, @NonNull StepOptions opts) throws E { + + return ensureLaunched("runStep").runStepInternal(stepfunc, opts, null); + } + + /** + * Run the provided function as a step; this variant is for functions with a return value + * + * @param Checked exception thrown by the step, if any + * @param stepfunc function or lambda to run + * @param name name of the step, for tracing and to record in the system database + * @throws E + */ + public T runStep( + @NonNull ThrowingSupplier stepfunc, @NonNull String name) throws E { + return runStep(stepfunc, new StepOptions(name)); + } + + /** + * Run the provided function as a step; this variant is for functions with no return value + * + * @param Checked exception thrown by the step, if any + * @param stepfunc function or lambda to run + * @param opts step name, and retry options for running the step + * @throws E + */ + public void runStep( + @NonNull ThrowingRunnable stepfunc, @NonNull StepOptions opts) throws E { + runStep( + () -> { + stepfunc.execute(); + return null; + }, + opts); + } + + /** + * Run the provided function as a step; this variant is for functions with no return value + * + * @param Checked exception thrown by the step, if any + * @param stepfunc function or lambda to run + * @param name Name of the step, for tracing and recording in the system database + * @throws E + */ + public void runStep( + @NonNull ThrowingRunnable stepfunc, @NonNull String name) throws E { + runStep(stepfunc, new StepOptions(name)); + } + + /** + * Resume a workflow starting from the step after the last complete step + * + * @param Return type of the workflow function + * @param Checked exception thrown by the workflow function, if any + * @param workflowId id of the workflow + * @return A handle to the workflow + */ + public @NonNull WorkflowHandle resumeWorkflow( + @NonNull String workflowId) { + return ensureLaunched("resumeWorkflow").resumeWorkflow(workflowId); + } + + /*** + * + * Cancel the workflow. After this function is called, the next step (not the + * current one) will not execute + * + * @param workflowId ID of the workflow to cancel + */ + public void cancelWorkflow(@NonNull String workflowId) { + ensureLaunched("cancelWorkflow").cancelWorkflow(workflowId); + } + + /** + * Fork the workflow. Re-execute with another Id from the step provided. Steps prior to the + * provided step are copied over + * + * @param Return type of the workflow function + * @param Checked exception thrown by the workflow function, if any + * @param workflowId Original workflow Id + * @param startStep Start execution from this step. Prior steps copied over + * @param options {@link ForkOptions} containing forkedWorkflowId, applicationVersion, timeout + * @return handle to the workflow + */ + public @NonNull WorkflowHandle forkWorkflow( + @NonNull String workflowId, int startStep, @NonNull ForkOptions options) { + return ensureLaunched("forkWorkflow").forkWorkflow(workflowId, startStep, options); + } + + /** + * Fork the workflow. Re-execute with another Id from the step provided. Steps prior to the + * provided step are copied over + * + * @param Return type of the workflow function + * @param Checked exception thrown by the workflow function, if any + * @param workflowId Original workflow Id + * @param startStep Start execution from this step. Prior steps copied over + * @return handle to the workflow + */ + public @NonNull WorkflowHandle forkWorkflow( + @NonNull String workflowId, int startStep) { + return forkWorkflow(workflowId, startStep, new ForkOptions()); + } + + /** + * Deletes a workflow from the system. Does not delete child workflows. + * + * @param workflowId the unique identifier of the workflow to delete. Must not be null. + * @throws IllegalArgumentException if workflowId is null + */ + public void deleteWorkflow(@NonNull String workflowId) { + deleteWorkflow(workflowId, false); + } + + /** + * Deletes a workflow and optionally its child workflows from the system. + * + * @param workflowId the unique identifier of the workflow to delete. Must not be null. + * @param deleteChildren if true, also deletes all child workflows associated with the specified + * workflow; if false, only deletes the specified workflow + * @throws IllegalArgumentException if workflowId is null + */ + public void deleteWorkflow(@NonNull String workflowId, boolean deleteChildren) { + ensureLaunched("deleteWorkflow").deleteWorkflow(workflowId, deleteChildren); + } + + /** + * Retrieve a handle to a workflow, given its ID. Note that a handle is always returned, whether + * the workflow exists or not; getStatus() can be used to tell the difference + * + * @param Return type of the workflow function + * @param Checked exception thrown by the workflow function, if any + * @param workflowId ID of the workflow to retrieve + * @return Workflow handle for the provided workflow ID + */ + public @NonNull WorkflowHandle retrieveWorkflow( + @NonNull String workflowId) { + return ensureLaunched("retrieveWorkflow").retrieveWorkflow(workflowId); + } + + /** + * List all workflows + * + * @param input {@link ListWorkflowsInput} parameters to query workflows + * @return a list of workflow status {@link WorkflowStatus} + */ + public @NonNull List listWorkflows(@NonNull ListWorkflowsInput input) { + return ensureLaunched("listWorkflows").listWorkflows(input); + } + + /** + * List the steps in the workflow + * + * @param workflowId Id of the workflow whose steps to return + * @return list of step information {@link StepInfo} + */ + public @NonNull List listWorkflowSteps(@NonNull String workflowId) { + return ensureLaunched("listWorkflowSteps").listWorkflowSteps(workflowId); + } + + /** + * Get all workflows registered with DBOS. + * + * @return list of all registered workflow methods + */ + public @NonNull Collection getRegisteredWorkflows() { + return ensureLaunched("getRegisteredWorkflows").getWorkflows(); + } + + /** + * Get all workflow classes registered with DBOS. + * + * @return list of all class instances containing registered workflow methods + */ + public @NonNull Collection getRegisteredWorkflowInstances() { + return ensureLaunched("getRegisteredWorkflowInstances").getInstances(); + } + + /** + * Get a system database record stored by an external service A unique value is stored per + * combination of service, workflowName, and key + * + * @param service Identity of the service maintaining the record + * @param workflowName Fully qualified name of the workflow + * @param key Key assigned within the service+workflow + * @return Value associated with the service+workflow+key combination + */ + public Optional getExternalState( + String service, String workflowName, String key) { + return ensureLaunched("getExternalState").getExternalState(service, workflowName, key); + } + + /** + * Insert or update a system database record stored by an external service A timestamped unique + * value is stored per combination of service, workflowName, and key + * + * @param state ExternalState containing the service, workflow, key, and value to store + * @return Value associated with the service+workflow+key combination, in case the stored value + * already had a higher version or timestamp + */ + public ExternalState upsertExternalState(ExternalState state) { + return ensureLaunched("upsertExternalState").upsertExternalState(state); + } + + /** + * Marks a breaking change within a workflow. Returns true for new workflows (i.e. workflow + * sthat reach this point in the workflow after the breaking change was created) and false for + * old worklows (i.e. workflows that reached this point in the workflow before the breaking + * change was created). The workflow should execute the new code if this method returns true, + * otherwise execute the old code. Note, patching must be enabled in DBOS configuration and this + * method must be called from within a workflow context. + * + * @param patchName the name of the patch to apply + * @return true for workflows started after the breaking change, false for workflows started + * before the breaking change + * @throws RuntimeException if patching is not enabled in DBOS config or if called outside a + * workflow + */ + public boolean patch(@NonNull String patchName) { + return ensureLaunched("patch").patch(patchName); + } + + /** + * Deprecates a previously applied breaking change patch within a workflow. Safely executes + * workflows containing the patch marker, but does not insert the patch marker into new + * workflows. Always returns true (boolean return gives deprecatePatch the same signature as + * {@link #patch}). Like {@link #patch}, patching must be enabled in DBOS configuration and this + * method must be called from within a workflow context. + * + * @param patchName the name of the patch to deprecate + * @return true (always returns true or throws) + * @throws RuntimeException if patching is not enabled in DBOS config or if called outside a + * workflow + */ + public boolean deprecatePatch(@NonNull String patchName) { + return ensureLaunched("deprecatePatch").deprecatePatch(patchName); + } + } + + /** + * Initializes the singleton instance of DBOS with config. Should be called once during app + * startup, before launch. @DBOSConfig config dbos configuration + */ + public static void configure(DBOSConfig config) { + if (globalInstance.get() != null) { + throw new IllegalStateException("DBOS is already configured"); + } + + var instance = new DBOS.Instance(config); + var updated = globalInstance.compareAndSet(null, instance); + if (!updated) { + throw new IllegalStateException("DBOS is already configured"); + } + } + + /** + * Unconditionally initializes the singleton instance of DBOS with config, even if one was already + * set. For use in tests that reinitialize DBOS @DBOSConfig config dbos configuration. Package + * private method for test purposes + */ + static void reinitialize(DBOSConfig config) { + var previousInstance = globalInstance.getAndSet(new DBOS.Instance(config)); + if (previousInstance != null) { + previousInstance.shutdown(); + } + } + + private static Instance ensureInstance() { + var instance = globalInstance.get(); + if (instance == null) { + throw new IllegalStateException("DBOS instance is not initialized"); + } + return instance; + } + + // package private method for test purposes + static @Nullable DBOSExecutor getDbosExecutor() { + var instance = ensureInstance(); + return instance == null ? null : instance.getDbosExecutor(); } /** @@ -290,11 +858,9 @@ public void shutdown() { * the queue options available. * * @param queue `Queue` to register - * @return input queue */ - public static @NonNull Queue registerQueue(@NonNull Queue queue) { + public static void registerQueue(@NonNull Queue queue) { ensureInstance().registerQueue(queue); - return queue; } /** @@ -304,9 +870,7 @@ public void shutdown() { * @param queues collection of `Queue` instances to register */ public static void registerQueues(@NonNull Queue... queues) { - for (Queue queue : queues) { - registerQueue(queue); - } + ensureInstance().registerQueues(queues); } /** @@ -330,35 +894,6 @@ public static void registerAlertHandler(AlertHandler handler) { ensureInstance().registerAlertHandler(handler); } - /** - * Reinitializes the singleton instance of DBOS with config. For use in tests that reinitialize - * DBOS @DBOSConfig config dbos configuration - */ - public static synchronized Instance reinitialize(DBOSConfig config) { - if (config.migrate()) { - MigrationManager.runMigrations(config); - } - var instance = new Instance(); - instance.setConfig(config); - instance.registerInternals(); - globalInstance = instance; - return instance; - } - - /** - * Initializes the singleton instance of DBOS with config. Should be called once during app - * startup, before launch. @DBOSConfig config dbos configuration - */ - public static synchronized Instance configure(DBOSConfig config) { - var instance = ensureInstance(); - instance.setConfig(config); - instance.registerInternals(); - if (config.migrate()) { - MigrationManager.runMigrations(config); - } - return instance; - } - /** * Launch DBOS, and start recovery. All workflows, queues, and other objects should be registered * before launch @@ -372,32 +907,7 @@ public static void launch() { * multiple times in the same JVM. */ public static void shutdown() { - if (globalInstance != null) globalInstance.shutdown(); - } - - private static @Nullable Instance globalInstance = null; - - public static @Nullable Instance instance() { - return globalInstance; - } - - private static synchronized Instance ensureInstance() { - if (globalInstance == null) { - globalInstance = new Instance(); - } - return globalInstance; - } - - static DBOSExecutor executor(String caller) { - var inst = instance(); - if (inst == null) - throw new IllegalStateException( - String.format("Cannot call %s before DBOS is created", caller)); - var executor = inst.getDbosExecutor(); - if (executor == null) - throw new IllegalStateException( - String.format("Cannot call %s before DBOS is launched", caller)); - return executor; + ensureInstance().shutdown(); } /** @@ -432,6 +942,16 @@ public static boolean inStep() { return DBOSContext.inStep(); } + /** + * Get the serialization format of the current workflow context. + * + * @return the SerializationStrategy (e.g., "portable_json", "java_jackson"), or null if not in a + * workflow context or using default serialization + */ + public static @Nullable SerializationStrategy serializationStrategy() { + return DBOSContext.serializationStrategy(); + } + /** * Retrieve a queue definition * @@ -439,7 +959,7 @@ public static boolean inStep() { * @return Queue definition for given `queueName` */ public static @NonNull Optional getQueue(@NonNull String queueName) { - return executor("getQueue").getQueue(queueName); + return ensureInstance().getQueue(queueName); } /** @@ -449,35 +969,7 @@ public static boolean inStep() { * @param duration amount of time to sleep */ public static void sleep(@NonNull Duration duration) { - if (!inWorkflow()) { - try { - Thread.sleep(duration.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } else if (inStep()) { - try { - Thread.sleep(duration.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } else { - executor("sleep").sleep(duration); - } - } - - /** - * Start or enqueue a workflow with a return value - * - * @param Return type of the workflow - * @param Type of checked exception thrown by the workflow, if any - * @param supplier A lambda that calls exactly one workflow function - * @param options Start workflow options - * @return A handle to the enqueued or running workflow - */ - public static @NonNull WorkflowHandle startWorkflow( - @NonNull ThrowingSupplier supplier, @NonNull StartWorkflowOptions options) { - return executor("startWorkflow").startWorkflow(supplier, options); + ensureInstance().sleep(duration); } /** @@ -490,7 +982,7 @@ public static void sleep(@NonNull Duration duration) { */ public static @NonNull WorkflowHandle startWorkflow( @NonNull ThrowingSupplier supplier) { - return startWorkflow(supplier, new StartWorkflowOptions()); + return ensureInstance().startWorkflow(supplier, new StartWorkflowOptions()); } /** @@ -503,12 +995,7 @@ public static void sleep(@NonNull Duration duration) { */ public static @NonNull WorkflowHandle startWorkflow( @NonNull ThrowingRunnable runnable, @NonNull StartWorkflowOptions options) { - return startWorkflow( - () -> { - runnable.execute(); - return null; - }, - options); + return ensureInstance().startWorkflow(runnable, options); } /** @@ -520,7 +1007,35 @@ public static void sleep(@NonNull Duration duration) { */ public static @NonNull WorkflowHandle startWorkflow( @NonNull ThrowingRunnable runnable) { - return startWorkflow(runnable, new StartWorkflowOptions()); + return ensureInstance().startWorkflow(runnable, new StartWorkflowOptions()); + } + + /** + * Start or enqueue a workflow with a return value + * + * @param Return type of the workflow + * @param Type of checked exception thrown by the workflow, if any + * @param supplier A lambda that calls exactly one workflow function + * @param options Start workflow options + * @return A handle to the enqueued or running workflow + */ + public static @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingSupplier supplier, @NonNull StartWorkflowOptions options) { + return ensureInstance().startWorkflow(supplier, options); + } + + /** + * Execute a workflow based on registration and arguments. This is expected to be used by event + * listeners, not app code. + * + * @param regWorkflow Registration of the workflow. @see getRegisteredWorkflows + * @param args Workflow function arguments + * @param options Execution options, such as ID, queue, and timeout/deadline + * @return WorkflowHandle to the executed workflow + */ + public static WorkflowHandle startWorkflow( + RegisteredWorkflow regWorkflow, Object[] args, StartWorkflowOptions options) { + return ensureInstance().startWorkflow(regWorkflow, args, options); } /** @@ -533,7 +1048,7 @@ public static void sleep(@NonNull Duration duration) { * @throws E if the workflow threw an exception */ public static T getResult(@NonNull String workflowId) throws E { - return executor("getResult").getResult(workflowId); + return ensureInstance().getResult(workflowId); } /** @@ -543,18 +1058,7 @@ public static T getResult(@NonNull String workflowId) t * @return Current workflow status for the provided workflowId, or null. */ public static @Nullable WorkflowStatus getWorkflowStatus(@NonNull String workflowId) { - return executor("getWorkflowStatus").getWorkflowStatus(workflowId); - } - - /** - * Get the serialization format of the current workflow context. - * - * @return the serialization format name (e.g., "portable_json", "java_jackson"), or null if not - * in a workflow context or using default serialization - */ - public static @Nullable SerializationStrategy getSerialization() { - var ctx = DBOSContextHolder.get(); - return ctx != null ? ctx.getSerialization() : null; + return ensureInstance().getWorkflowStatus(workflowId); } /** @@ -570,38 +1074,37 @@ public static void send( @NonNull Object message, @Nullable String topic, @Nullable String idempotencyKey) { - send(destinationId, message, topic, idempotencyKey, null); + ensureInstance().send(destinationId, message, topic, idempotencyKey); } /** - * Send a message to a workflow with serialization strategy + * Send a message to a workflow * * @param destinationId recipient of the message * @param message message to be sent * @param topic topic to which the message is send - * @param idempotencyKey optional idempotency key for exactly-once send - * @param serialization serialization strategy to use (null for default) */ public static void send( - @NonNull String destinationId, - @NonNull Object message, - @Nullable String topic, - @Nullable String idempotencyKey, - @Nullable SerializationStrategy serialization) { - if (serialization == null) serialization = SerializationStrategy.DEFAULT; - executor("send").send(destinationId, message, topic, idempotencyKey, serialization); + @NonNull String destinationId, @NonNull Object message, @NonNull String topic) { + ensureInstance().send(destinationId, message, topic); } /** - * Send a message to a workflow + * Send a message to a workflow with serialization strategy * * @param destinationId recipient of the message * @param message message to be sent * @param topic topic to which the message is send + * @param idempotencyKey optional idempotency key for exactly-once send + * @param serialization serialization strategy to use (null for default) */ public static void send( - @NonNull String destinationId, @NonNull Object message, @Nullable String topic) { - DBOS.send(destinationId, message, topic, null, null); + @NonNull String destinationId, + @NonNull Object message, + @Nullable String topic, + @Nullable String idempotencyKey, + @Nullable SerializationStrategy serialization) { + ensureInstance().send(destinationId, message, topic, idempotencyKey, serialization); } /** @@ -611,8 +1114,8 @@ public static void send( * @param timeout duration after which the call times out * @return the message if there is one or else null */ - public static @Nullable Object recv(@NonNull String topic, @NonNull Duration timeout) { - return executor("recv").recv(topic, timeout); + public static @Nullable Object recv(@Nullable String topic, @NonNull Duration timeout) { + return ensureInstance().recv(topic, timeout); } /** @@ -622,7 +1125,7 @@ public static void send( * @param value data that is published */ public static void setEvent(@NonNull String key, @NonNull Object value) { - setEvent(key, value, null); + ensureInstance().setEvent(key, value); } /** @@ -634,11 +1137,7 @@ public static void setEvent(@NonNull String key, @NonNull Object value) { */ public static void setEvent( @NonNull String key, @NonNull Object value, @Nullable SerializationStrategy serialization) { - // If no explicit serialization specified, use the workflow context's serialization - if (serialization == null) { - serialization = getSerialization(); - } - executor("setEvent").setEvent(key, value, serialization); + ensureInstance().setEvent(key, value, serialization); } /** @@ -651,23 +1150,7 @@ public static void setEvent( */ public static @Nullable Object getEvent( @NonNull String workflowId, @NonNull String key, @NonNull Duration timeout) { - logger.debug("Received getEvent for {} {}", workflowId, key); - - return executor("getEvent").getEvent(workflowId, key, timeout); - } - - /** - * Run the provided function as a step; this variant is for functions with a return value - * - * @param Checked exception thrown by the step, if any - * @param stepfunc function or lambda to run - * @param opts step name, and retry options for running the step - * @throws E - */ - public static T runStep( - @NonNull ThrowingSupplier stepfunc, @NonNull StepOptions opts) throws E { - - return executor("runStep").runStepInternal(stepfunc, opts, null); + return ensureInstance().getEvent(workflowId, key, timeout); } /** @@ -681,7 +1164,7 @@ public static T runStep( public static T runStep( @NonNull ThrowingSupplier stepfunc, @NonNull String name) throws E { - return executor("runStep").runStepInternal(stepfunc, new StepOptions(name), null); + return ensureInstance().runStep(stepfunc, name); } /** @@ -694,14 +1177,7 @@ public static T runStep( */ public static void runStep( @NonNull ThrowingRunnable stepfunc, @NonNull StepOptions opts) throws E { - executor("runStep") - .runStepInternal( - () -> { - stepfunc.execute(); - return null; - }, - opts, - null); + ensureInstance().runStep(stepfunc, opts); } /** @@ -714,7 +1190,21 @@ public static void runStep( */ public static void runStep( @NonNull ThrowingRunnable stepfunc, @NonNull String name) throws E { - runStep(stepfunc, new StepOptions(name)); + ensureInstance().runStep(stepfunc, name); + } + + /** + * Run the provided function as a step; this variant is for functions with a return value + * + * @param Checked exception thrown by the step, if any + * @param stepfunc function or lambda to run + * @param opts step name, and retry options for running the step + * @throws E + */ + public static T runStep( + @NonNull ThrowingSupplier stepfunc, @NonNull StepOptions opts) throws E { + + return ensureInstance().runStep(stepfunc, opts); } /** @@ -727,7 +1217,7 @@ public static void runStep( */ public static @NonNull WorkflowHandle resumeWorkflow( @NonNull String workflowId) { - return executor("resumeWorkflow").resumeWorkflow(workflowId); + return ensureInstance().resumeWorkflow(workflowId); } /*** @@ -738,7 +1228,7 @@ public static void runStep( * @param workflowId ID of the workflow to cancel */ public static void cancelWorkflow(@NonNull String workflowId) { - executor("cancelWorkflow").cancelWorkflow(workflowId); + ensureInstance().cancelWorkflow(workflowId); } /** @@ -754,7 +1244,7 @@ public static void cancelWorkflow(@NonNull String workflowId) { */ public static @NonNull WorkflowHandle forkWorkflow( @NonNull String workflowId, int startStep, @NonNull ForkOptions options) { - return executor("forkWorkflow").forkWorkflow(workflowId, startStep, options); + return ensureInstance().forkWorkflow(workflowId, startStep, options); } /** @@ -769,7 +1259,7 @@ public static void cancelWorkflow(@NonNull String workflowId) { */ public static @NonNull WorkflowHandle forkWorkflow( @NonNull String workflowId, int startStep) { - return forkWorkflow(workflowId, startStep, new ForkOptions()); + return ensureInstance().forkWorkflow(workflowId, startStep); } /** @@ -779,7 +1269,7 @@ public static void cancelWorkflow(@NonNull String workflowId) { * @throws IllegalArgumentException if workflowId is null */ public static void deleteWorkflow(@NonNull String workflowId) { - deleteWorkflow(workflowId, false); + ensureInstance().deleteWorkflow(workflowId); } /** @@ -791,7 +1281,7 @@ public static void deleteWorkflow(@NonNull String workflowId) { * @throws IllegalArgumentException if workflowId is null */ public static void deleteWorkflow(@NonNull String workflowId, boolean deleteChildren) { - executor("deleteWorkflow").deleteWorkflow(workflowId, deleteChildren); + ensureInstance().deleteWorkflow(workflowId, deleteChildren); } /** @@ -805,7 +1295,7 @@ public static void deleteWorkflow(@NonNull String workflowId, boolean deleteChil */ public static @NonNull WorkflowHandle retrieveWorkflow( @NonNull String workflowId) { - return executor("retrieveWorkflow").retrieveWorkflow(workflowId); + return ensureInstance().retrieveWorkflow(workflowId); } /** @@ -815,7 +1305,7 @@ public static void deleteWorkflow(@NonNull String workflowId, boolean deleteChil * @return a list of workflow status {@link WorkflowStatus} */ public static @NonNull List listWorkflows(@NonNull ListWorkflowsInput input) { - return executor("listWorkflows").listWorkflows(input); + return ensureInstance().listWorkflows(input); } /** @@ -825,7 +1315,7 @@ public static void deleteWorkflow(@NonNull String workflowId, boolean deleteChil * @return list of step information {@link StepInfo} */ public static @NonNull List listWorkflowSteps(@NonNull String workflowId) { - return executor("listWorkflowSteps").listWorkflowSteps(workflowId); + return ensureInstance().listWorkflowSteps(workflowId); } /** @@ -834,7 +1324,7 @@ public static void deleteWorkflow(@NonNull String workflowId, boolean deleteChil * @return list of all registered workflow methods */ public static @NonNull Collection getRegisteredWorkflows() { - return executor("getRegisteredWorkflows").getWorkflows(); + return ensureInstance().getRegisteredWorkflows(); } /** @@ -843,21 +1333,7 @@ public static void deleteWorkflow(@NonNull String workflowId, boolean deleteChil * @return list of all class instances containing registered workflow methods */ public static @NonNull Collection getRegisteredWorkflowInstances() { - return executor("getRegisteredWorkflowInstances").getInstances(); - } - - /** - * Execute a workflow based on registration and arguments. This is expected to be used by generic - * callers, not app code. - * - * @param regWorkflow Registration of the workflow. @see getRegisteredWorkflows - * @param args Workflow function arguments - * @param options Execution options, such as ID, queue, and timeout/deadline - * @return WorkflowHandle to the executed workflow - */ - public static WorkflowHandle startWorkflow( - RegisteredWorkflow regWorkflow, Object[] args, StartWorkflowOptions options) { - return executor("startWorkflow").startWorkflow(regWorkflow, args, options); + return ensureInstance().getRegisteredWorkflowInstances(); } /** @@ -871,7 +1347,7 @@ public static void deleteWorkflow(@NonNull String workflowId, boolean deleteChil */ public static Optional getExternalState( String service, String workflowName, String key) { - return executor("getExternalState").getExternalState(service, workflowName, key); + return ensureInstance().getExternalState(service, workflowName, key); } /** @@ -883,7 +1359,7 @@ public static Optional getExternalState( * already had a higher version or timestamp */ public static ExternalState upsertExternalState(ExternalState state) { - return executor("upsertExternalState").upsertExternalState(state); + return ensureInstance().upsertExternalState(state); } /** @@ -901,7 +1377,7 @@ public static ExternalState upsertExternalState(ExternalState state) { * workflow */ public static boolean patch(@NonNull String patchName) { - return executor("patch").patch(patchName); + return ensureInstance().patch(patchName); } /** @@ -917,6 +1393,6 @@ public static boolean patch(@NonNull String patchName) { * workflow */ public static boolean deprecatePatch(@NonNull String patchName) { - return executor("deprecatePatch").deprecatePatch(patchName); + return ensureInstance().deprecatePatch(patchName); } } diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java index 83e78a50..ae5a902c 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java +++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java @@ -29,6 +29,33 @@ import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; +class ClientWorkflowHandle implements WorkflowHandle { + + private final SystemDatabase systemDatabase; + private final String workflowId; + + public ClientWorkflowHandle(SystemDatabase systemDatabase, String workflowId) { + this.systemDatabase = systemDatabase; + this.workflowId = workflowId; + } + + @Override + public String workflowId() { + return workflowId; + } + + @Override + public T getResult() throws E { + var result = systemDatabase.awaitWorkflowResult(workflowId); + return Result.process(result); + } + + @Override + public WorkflowStatus getStatus() { + return systemDatabase.getWorkflowStatus(workflowId); + } +} + /** * DBOSClient allows external programs to interact with DBOS apps via direct system database access. * Example interactions: Start/enqueue a workflow, and get the result Get events and send messages @@ -470,31 +497,36 @@ public EnqueueOptions( String serializationFormat = options.serialization() != null ? options.serialization().formatName() : null; - return DBOSExecutor.enqueueWorkflow( - Objects.requireNonNull( - options.workflowName(), "EnqueueOptions workflowName must not be null"), - Objects.requireNonNull(options.className(), "EnqueueOptions className must not be null"), - Objects.requireNonNullElse(options.instanceName(), ""), - null, - args, - new DBOSExecutor.ExecutionOptions( - Objects.requireNonNullElseGet(options.workflowId(), () -> UUID.randomUUID().toString()), - Timeout.of(options.timeout()), - options.deadline, + var workflowId = + DBOSExecutor.enqueueWorkflow( + Objects.requireNonNull( + options.workflowName(), "EnqueueOptions workflowName must not be null"), Objects.requireNonNull( - options.queueName(), "EnqueueOptions queueName must not be null"), - options.deduplicationId, - options.priority, - options.queuePartitionKey, - false, - false, - serializationFormat), - null, - null, - null, - options.appVersion, - systemDatabase, - this.serializer); + options.className(), "EnqueueOptions className must not be null"), + Objects.requireNonNullElse(options.instanceName(), ""), + null, + args, + new DBOSExecutor.ExecutionOptions( + Objects.requireNonNullElseGet( + options.workflowId(), () -> UUID.randomUUID().toString()), + Timeout.of(options.timeout()), + options.deadline, + Objects.requireNonNull( + options.queueName(), "EnqueueOptions queueName must not be null"), + options.deduplicationId, + options.priority, + options.queuePartitionKey, + false, + false, + serializationFormat), + null, + null, + null, + options.appVersion, + systemDatabase, + this.serializer); + + return new ClientWorkflowHandle<>(systemDatabase, workflowId); } /** diff --git a/transact/src/main/java/dev/dbos/transact/context/DBOSContext.java b/transact/src/main/java/dev/dbos/transact/context/DBOSContext.java index 58699d08..499e69b1 100644 --- a/transact/src/main/java/dev/dbos/transact/context/DBOSContext.java +++ b/transact/src/main/java/dev/dbos/transact/context/DBOSContext.java @@ -163,4 +163,9 @@ public static boolean inStep() { var ctx = DBOSContextHolder.get(); return ctx == null ? false : ctx.isInStep(); } + + public static SerializationStrategy serializationStrategy() { + var ctx = DBOSContextHolder.get(); + return ctx != null ? ctx.getSerialization() : null; + } } diff --git a/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java b/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java index 517de711..35e572f8 100644 --- a/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java @@ -812,7 +812,7 @@ void resumeWorkflow(String workflowId) throws SQLException { String forkWorkflow(String originalWorkflowId, int startStep, ForkOptions options) throws SQLException { - Objects.requireNonNull(options); + options = Objects.requireNonNullElseGet(options, ForkOptions::new); var status = getWorkflowStatus(originalWorkflowId); if (status == null) { @@ -897,7 +897,11 @@ private static void insertForkedWorkflowStatus( stmt.setString(6, applicationVersion); stmt.setString(7, originalStatus.appId()); stmt.setString(8, originalStatus.authenticatedUser()); - stmt.setString(9, JSONUtil.serializeArray(originalStatus.authenticatedRoles())); + stmt.setString( + 9, + originalStatus.authenticatedRoles() == null + ? null + : JSONUtil.serializeArray(originalStatus.authenticatedRoles())); stmt.setString(10, originalStatus.assumedRole()); stmt.setString(11, Constants.DBOS_INTERNAL_QUEUE); stmt.setString( diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index b70466bd..475d1ee4 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -196,7 +196,7 @@ public ExecutorService get() { listeners.add(schedulerService); for (var listener : listeners) { - listener.dbosLaunched(); + listener.dbosLaunched(dbos); } var recoveryTask = @@ -650,7 +650,7 @@ public T runStepInternal( /** Retrieve the workflowHandle for the workflowId */ public WorkflowHandle retrieveWorkflow(String workflowId) { logger.debug("retrieveWorkflow {}", workflowId); - return new WorkflowHandleDBPoll(workflowId); + return new WorkflowHandleDBPoll<>(this, workflowId); } public void sleep(Duration duration) { @@ -1089,6 +1089,10 @@ public WorkflowHandle startWorkflow( logger.debug("startWorkflow {}", options); var invocation = captureInvocation(supplier); + if (invocation.executor() != this) { + throw new IllegalStateException( + "The @Workflow method must be called on the DBOS instance passed to the startWorkflow lambda"); + } var workflow = getWorkflow(invocation); var ctx = DBOSContextHolder.get(); @@ -1278,19 +1282,21 @@ private WorkflowHandle executeWorkflow( "queue %s does not exist".formatted(options.queueName())); } - return enqueueWorkflow( - workflow.name(), - workflow.className(), - workflow.instanceName(), - maxRetries, - args, - options, - parent, - executorId(), - appVersion(), - appId(), - systemDatabase, - this.serializer); + var workflowId = + enqueueWorkflow( + workflow.name(), + workflow.className(), + workflow.instanceName(), + maxRetries, + args, + options, + parent, + executorId(), + appVersion(), + appId(), + systemDatabase, + this.serializer); + return new WorkflowHandleDBPoll<>(this, workflowId); } logger.debug("executeWorkflow {}({}) {}", workflow.fullyQualifiedName(), args, options); @@ -1413,10 +1419,10 @@ private WorkflowHandle executeWorkflow( TimeUnit.MILLISECONDS); } - return new WorkflowHandleFuture(workflowId, future, this); + return new WorkflowHandleFuture(this, workflowId, future); } - public static WorkflowHandle enqueueWorkflow( + public static String enqueueWorkflow( String name, String className, String instanceName, @@ -1468,10 +1474,10 @@ public static WorkflowHandle enqueueWorkflow( options.isDequeuedRequest, options.serialization(), serializer); - return new WorkflowHandleDBPoll(workflowId); + return workflowId; } catch (DBOSWorkflowExecutionConflictException e) { logger.debug("Workflow execution conflict for workflowId {}", workflowId); - return new WorkflowHandleDBPoll(workflowId); + return workflowId; } catch (DBOSQueueDuplicatedException e) { logger.debug( "Workflow queue {} reused deduplicationId {}", e.queueName(), e.deduplicationId()); diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSLifecycleListener.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSLifecycleListener.java index 31de26b8..27b48b87 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSLifecycleListener.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSLifecycleListener.java @@ -1,12 +1,14 @@ package dev.dbos.transact.execution; +import dev.dbos.transact.DBOS; + /** * For registering callbacks that hear about `DBOS.launch()` and `DBOS.shutdown()`. At this point, * DBOS is ready to run workflows, and no additional registrations are allowed. */ public interface DBOSLifecycleListener { /** Called from within DBOS.launch, after workflow processing is allowed */ - void dbosLaunched(); + void dbosLaunched(DBOS.Instance dbos); /** Called from within DBOS.shutdown, before workflow processing is stopped */ void dbosShutDown(); diff --git a/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java b/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java index 462628e4..cb17bb58 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java +++ b/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java @@ -30,59 +30,61 @@ public class SchedulerService implements DBOSLifecycleListener { + record ScheduledWorkflow( + RegisteredWorkflow workflow, Cron cron, String queue, boolean ignoreMissed) {} + private static final Logger logger = LoggerFactory.getLogger(SchedulerService.class); private static final CronParser cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.SPRING53)); + private static final Class[] expectedParams = new Class[] {Instant.class, Instant.class}; - private final String schedulerQueueName; - private final AtomicReference scheduler = new AtomicReference<>(); + private final String defaultSchedulerQueueName; + private final AtomicReference dbosRef = new AtomicReference<>(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4); - public SchedulerService(String defSchedulerQueue) { - this.schedulerQueueName = Objects.requireNonNull(defSchedulerQueue); + public SchedulerService(String defaultSchedulerQueueName) { + this.defaultSchedulerQueueName = Objects.requireNonNull(defaultSchedulerQueueName); } public static void validateScheduledWorkflow(RegisteredWorkflow workflow) { var method = workflow.workflowMethod(); var skedTag = method.getAnnotation(Scheduled.class); - if (skedTag == null) { - return; - } + if (skedTag != null) { + var paramTypes = method.getParameterTypes(); + if (!Arrays.equals(paramTypes, expectedParams)) { + throw new IllegalArgumentException( + "Invalid signature for Scheduled workflow %s. Signature must be (Instant, Instant)" + .formatted(workflow.fullyQualifiedName())); + } - var expectedParams = new Class[] {Instant.class, Instant.class}; - var paramTypes = method.getParameterTypes(); - if (!Arrays.equals(paramTypes, expectedParams)) { - throw new IllegalArgumentException( - "Invalid signature for Scheduled workflow %s. Signature must be (Instant, Instant)" - .formatted(workflow.fullyQualifiedName())); + cronParser.parse(skedTag.cron()); } - - cronParser.parse(skedTag.cron()); } - public void dbosLaunched() { - if (this.scheduler.get() == null) { - var scheduler = Executors.newScheduledThreadPool(4); - if (this.scheduler.compareAndSet(null, scheduler)) { - startScheduledWorkflows(); - } + @Override + public void dbosLaunched(DBOS.Instance dbos) { + DBOS.Instance prev = this.dbosRef.getAndUpdate(existing -> existing == null ? dbos : existing); + if (prev == null) { + startScheduledWorkflows(dbos); + } else if (prev != dbos) { + throw new IllegalStateException( + "SchedulerService already initialized with a different DBOS instance"); } } + @Override public void dbosShutDown() { - var scheduler = this.scheduler.getAndSet(null); - if (scheduler != null) { + DBOS.Instance prev = this.dbosRef.getAndSet(null); + if (prev != null) { List notRun = scheduler.shutdownNow(); logger.debug("Shutting down scheduler service. Tasks not run {}", notRun.size()); } } - record ScheduledWorkflow( - RegisteredWorkflow workflow, Cron cron, String queue, boolean ignoreMissed) {} - - private ZonedDateTime getLastTime(ScheduledWorkflow swf) { + private static ZonedDateTime getLastTime(DBOS.Instance dbos, ScheduledWorkflow swf) { if (!swf.ignoreMissed()) { var state = - DBOS.getExternalState( + dbos.getExternalState( "DBOS.SchedulerService", swf.workflow().fullyQualifiedName(), "lastTime"); if (state.isPresent()) { return ZonedDateTime.parse(state.get().value()); @@ -91,13 +93,14 @@ private ZonedDateTime getLastTime(ScheduledWorkflow swf) { return ZonedDateTime.now(ZoneOffset.UTC).withNano(0); } - private ZonedDateTime setLastTime(ScheduledWorkflow swf, ZonedDateTime lastTime) { + private static ZonedDateTime setLastTime( + DBOS.Instance dbos, ScheduledWorkflow swf, ZonedDateTime lastTime) { if (swf.ignoreMissed()) { return ZonedDateTime.now(ZoneOffset.UTC).withNano(0); } var state = - DBOS.upsertExternalState( + dbos.upsertExternalState( new ExternalState( "DBOS.SchedulerService", swf.workflow().fullyQualifiedName(), @@ -108,14 +111,11 @@ private ZonedDateTime setLastTime(ScheduledWorkflow swf, ZonedDateTime lastTime) return ZonedDateTime.parse(state.value()).plus(1, ChronoUnit.MILLIS); } - private void startScheduledWorkflows() { - logger.debug("startScheduledWorkflows"); - - var expectedParams = new Class[] {Instant.class, Instant.class}; - - // collect all workflows that have an @Scheduled annotation - List scheduledWorkflows = new ArrayList<>(); - for (var wf : DBOS.getRegisteredWorkflows()) { + private static List getScheduledWorkflows( + DBOS.Instance dbos, String defaultSchedulerQueueName) { + var registeredWorkflows = dbos.getRegisteredWorkflows(); + var scheduledWorkflows = new ArrayList(); + for (var wf : registeredWorkflows) { var method = wf.workflowMethod(); var skedTag = method.getAnnotation(Scheduled.class); if (skedTag == null) { @@ -130,21 +130,25 @@ private void startScheduledWorkflows() { continue; } - String queue = - skedTag.queue() != null && !skedTag.queue().isEmpty() - ? skedTag.queue() - : this.schedulerQueueName; - var q = DBOS.getQueue(queue); - if (!q.isPresent()) { + // fields of Java annotations can't be null. + // @Scheduled.queue defaults to empty string if not specified + // using requireNonNullElse here for safety purposes + var queueName = Objects.requireNonNullElse(skedTag.queue(), ""); + queueName = queueName.isEmpty() ? defaultSchedulerQueueName : queueName; + var queue = dbos.getQueue(queueName); + if (!queue.isPresent()) { logger.error( - "Scheduled workflow {} refers to undefined queue {}", wf.fullyQualifiedName(), queue); - queue = this.schedulerQueueName; + "Scheduled workflow {} refers to undefined queue {}", + wf.fullyQualifiedName(), + queueName); + continue; } try { var cron = cronParser.parse(skedTag.cron()); scheduledWorkflows.add( - new ScheduledWorkflow(wf, Objects.requireNonNull(cron), queue, skedTag.ignoreMissed())); + new ScheduledWorkflow( + wf, Objects.requireNonNull(cron), queueName, skedTag.ignoreMissed())); } catch (IllegalArgumentException e) { logger.error( "Scheduled workflow {} has invalid cron expression {}", @@ -152,32 +156,35 @@ private void startScheduledWorkflows() { skedTag.cron()); } } + return scheduledWorkflows; + } - for (var _swf : scheduledWorkflows) { + private void startScheduledWorkflows(DBOS.Instance dbos) { + logger.debug("startScheduledWorkflows"); + var scheduledWorkflows = getScheduledWorkflows(dbos, defaultSchedulerQueueName); + for (var _scheduledWorkflow : scheduledWorkflows) { + var _nextTime = getLastTime(dbos, _scheduledWorkflow); var task = new Runnable() { - final ScheduledWorkflow swf = _swf; - final ExecutionTime executionTime = ExecutionTime.forCron(swf.cron()); - final String workflowName = swf.workflow().fullyQualifiedName(); - - ZonedDateTime nextTime = getLastTime(swf); + final ScheduledWorkflow scheduledWorkflow = _scheduledWorkflow; + final ExecutionTime executionTime = ExecutionTime.forCron(scheduledWorkflow.cron()); + final String workflowName = scheduledWorkflow.workflow().fullyQualifiedName(); + ZonedDateTime nextTime = _nextTime; public void schedule() { executionTime .nextExecution(nextTime) .ifPresent( - nextTime -> { - this.nextTime = nextTime; + _nextTime -> { + this.nextTime = _nextTime; long initialDelayMs = - Duration.between(ZonedDateTime.now(ZoneOffset.UTC), nextTime) + Duration.between(ZonedDateTime.now(ZoneOffset.UTC), _nextTime) .toMillis(); // ensure scheduler hasn't been shutdown before scheduling - var localScheduler = scheduler.get(); - if (localScheduler != null) { - logger.debug("Scheduling {} @ {}", workflowName, nextTime); - - localScheduler.schedule( + if (dbosRef.get() != null) { + logger.debug("Scheduling {} @ {}", workflowName, _nextTime); + scheduler.schedule( this, initialDelayMs < 0 ? 0 : initialDelayMs, TimeUnit.MILLISECONDS); } }); @@ -185,9 +192,10 @@ public void schedule() { @Override public void run() { - // if scheduler service isn't running, the scheduler service was shut down so don't - // start the workflow or schedule the next execution - if (scheduler.get() == null) { + // if dbos is null, the scheduler service was shut down so don't start the workflow or + // schedule the next execution + var dbos = dbosRef.get(); + if (dbos == null) { return; } @@ -201,9 +209,10 @@ public void run() { String workflowId = String.format("sched-%s-%s", workflowName, scheduledTime.toString()); - var options = new StartWorkflowOptions(workflowId).withQueue(swf.queue()); - DBOS.startWorkflow(swf.workflow(), args, options); - nextTime = setLastTime(swf, scheduledTime); + var options = + new StartWorkflowOptions(workflowId).withQueue(scheduledWorkflow.queue()); + dbos.startWorkflow(scheduledWorkflow.workflow(), args, options); + nextTime = setLastTime(dbos, scheduledWorkflow, scheduledTime); } catch (Exception e) { logger.error("Scheduled task exception {}", workflowName, e); } finally { diff --git a/transact/src/main/java/dev/dbos/transact/internal/DBOSInvocationHandler.java b/transact/src/main/java/dev/dbos/transact/internal/DBOSInvocationHandler.java index 07ccbf4b..c1d8b7ce 100644 --- a/transact/src/main/java/dev/dbos/transact/internal/DBOSInvocationHandler.java +++ b/transact/src/main/java/dev/dbos/transact/internal/DBOSInvocationHandler.java @@ -46,6 +46,7 @@ public static T createProxy( new DBOSInvocationHandler(implementation, instanceName, executor)); } + @Override public Object invoke(Object proxy, Method method, Object[] args) throws Exception { var implMethod = target.getClass().getMethod(method.getName(), method.getParameterTypes()); @@ -97,17 +98,17 @@ protected Object handleWorkflow( ? classNameAnnotation.value() : target.getClass().getName(); var workflowName = workflow.name().isEmpty() ? method.getName() : workflow.name(); - if (hook != null) { - var invocation = new Invocation(className, instanceName, workflowName, args); - hook.invoke(invocation); - return defaultReturn(method); - } - var executor = executorSupplier.get(); if (executor == null) { throw new IllegalStateException("executorSupplier returned null"); } + if (hook != null) { + var invocation = new Invocation(executor, className, instanceName, workflowName, args); + hook.invoke(invocation); + return defaultReturn(method); + } + var handle = executor.invokeWorkflow(className, instanceName, workflowName, args); // This is not really a getResult call - it is part of invocation which will be written diff --git a/transact/src/main/java/dev/dbos/transact/internal/Invocation.java b/transact/src/main/java/dev/dbos/transact/internal/Invocation.java index d922184a..f6f1f426 100644 --- a/transact/src/main/java/dev/dbos/transact/internal/Invocation.java +++ b/transact/src/main/java/dev/dbos/transact/internal/Invocation.java @@ -1,4 +1,10 @@ package dev.dbos.transact.internal; +import dev.dbos.transact.execution.DBOSExecutor; + public record Invocation( - String className, String instanceName, String workflowName, Object[] args) {} + DBOSExecutor executor, + String className, + String instanceName, + String workflowName, + Object[] args) {} diff --git a/transact/src/main/java/dev/dbos/transact/internal/QueueRegistry.java b/transact/src/main/java/dev/dbos/transact/internal/QueueRegistry.java index f3ff8513..8e8a3023 100644 --- a/transact/src/main/java/dev/dbos/transact/internal/QueueRegistry.java +++ b/transact/src/main/java/dev/dbos/transact/internal/QueueRegistry.java @@ -1,7 +1,9 @@ package dev.dbos.transact.internal; +import dev.dbos.transact.Constants; import dev.dbos.transact.workflow.Queue; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -10,10 +12,16 @@ public class QueueRegistry { private final ConcurrentHashMap registry = new ConcurrentHashMap<>(); + private final Queue internalQueue = new Queue(Constants.DBOS_INTERNAL_QUEUE); private static final Logger logger = LoggerFactory.getLogger(QueueRegistry.class); public void register(Queue queue) { + if (queue.name().equals(Constants.DBOS_INTERNAL_QUEUE)) { + throw new IllegalArgumentException( + String.format("%s is a reserved queue name", Constants.DBOS_INTERNAL_QUEUE)); + } + if (queue.concurrency() != null && queue.workerConcurrency() != null && queue.workerConcurrency() > queue.concurrency()) { @@ -32,6 +40,9 @@ public void register(Queue queue) { } public Queue get(String queueName) { + if (queueName.equals(Constants.DBOS_INTERNAL_QUEUE)) { + return internalQueue; + } return registry.get(queueName); } @@ -40,6 +51,8 @@ public void clear() { } public List getSnapshot() { - return List.copyOf(registry.values()); + var queues = new ArrayList<>(registry.values()); + queues.add(internalQueue); + return List.copyOf(queues); } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleDBPoll.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleDBPoll.java index 776f6cc3..d7bd263a 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleDBPoll.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleDBPoll.java @@ -1,13 +1,15 @@ package dev.dbos.transact.workflow.internal; -import dev.dbos.transact.DBOS; +import dev.dbos.transact.execution.DBOSExecutor; import dev.dbos.transact.workflow.WorkflowHandle; import dev.dbos.transact.workflow.WorkflowStatus; public class WorkflowHandleDBPoll implements WorkflowHandle { - private String workflowId; + private final DBOSExecutor executor; + private final String workflowId; - public WorkflowHandleDBPoll(String workflowId) { + public WorkflowHandleDBPoll(DBOSExecutor executor, String workflowId) { + this.executor = executor; this.workflowId = workflowId; } @@ -18,11 +20,11 @@ public String workflowId() { @Override public T getResult() throws E { - return DBOS.getResult(this.workflowId); + return executor.getResult(this.workflowId); } @Override public WorkflowStatus getStatus() { - return DBOS.getWorkflowStatus(workflowId); + return executor.getWorkflowStatus(workflowId); } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleFuture.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleFuture.java index 7de7b4dc..dc8c9dc7 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleFuture.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleFuture.java @@ -1,6 +1,5 @@ package dev.dbos.transact.workflow.internal; -import dev.dbos.transact.DBOS; import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException; import dev.dbos.transact.exceptions.DBOSWorkflowExecutionConflictException; import dev.dbos.transact.execution.DBOSExecutor; @@ -13,11 +12,11 @@ public class WorkflowHandleFuture implements WorkflowHandle { - private String workflowId; - private Future futureResult; - private DBOSExecutor executor; + private final DBOSExecutor executor; + private final String workflowId; + private final Future futureResult; - public WorkflowHandleFuture(String workflowId, Future future, DBOSExecutor executor) { + public WorkflowHandleFuture(DBOSExecutor executor, String workflowId, Future future) { this.workflowId = workflowId; this.futureResult = future; this.executor = executor; @@ -58,6 +57,6 @@ public T getResult() throws E { @Override public WorkflowStatus getStatus() { - return DBOS.getWorkflowStatus(workflowId); + return executor.getWorkflowStatus(workflowId); } } diff --git a/transact/src/main/kotlin/dev/dbos/transact/DBOSExtensions.kt b/transact/src/main/kotlin/dev/dbos/transact/DBOSExtensions.kt index 29169026..5d174be1 100644 --- a/transact/src/main/kotlin/dev/dbos/transact/DBOSExtensions.kt +++ b/transact/src/main/kotlin/dev/dbos/transact/DBOSExtensions.kt @@ -27,7 +27,7 @@ inline fun registerWorkflows(implementation: T, instanceName: } @JvmSynthetic -fun registerQueue(queue: Queue): Queue = DBOS.registerQueue(queue) +fun registerQueue(queue: Queue) = DBOS.registerQueue(queue) @JvmSynthetic fun registerLifecycleListener(listener: DBOSLifecycleListener) = DBOS.registerLifecycleListener(listener) diff --git a/transact/src/test/java/dev/dbos/transact/DBOSTestAccess.java b/transact/src/test/java/dev/dbos/transact/DBOSTestAccess.java index f945e37f..7106948a 100644 --- a/transact/src/test/java/dev/dbos/transact/DBOSTestAccess.java +++ b/transact/src/test/java/dev/dbos/transact/DBOSTestAccess.java @@ -1,5 +1,6 @@ package dev.dbos.transact; +import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.execution.DBOSExecutor; import dev.dbos.transact.execution.DBOSExecutorTestAccess; @@ -8,14 +9,23 @@ // Helper class to retrieve DBOS internals via package private methods public class DBOSTestAccess { + + public static DBOSExecutor getDbosExecutor(DBOS.Instance instance) { + return instance.getDbosExecutor(); + } + public static DBOSExecutor getDbosExecutor() { - return DBOS.instance().getDbosExecutor(); + return DBOS.getDbosExecutor(); } - public static void clearRegistry() { - DBOS.instance().clearRegistry(); + public static void reinitialize(DBOSConfig config) { + DBOS.reinitialize(config); } + // public static void clearRegistry() { + // DBOS.instance().clearRegistry(); + // } + public static SystemDatabase getSystemDatabase() { var exec = getDbosExecutor(); return DBOSExecutorTestAccess.getSystemDatabase(exec); diff --git a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java index 2c6f49ff..a823b178 100644 --- a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java +++ b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java @@ -45,7 +45,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); DBOS.registerQueue(new Queue("testQueue")); service = DBOS.registerWorkflows(ClientService.class, new ClientServiceImpl()); DBOS.launch(); diff --git a/transact/src/test/java/dev/dbos/transact/client/PgSqlClientTest.java b/transact/src/test/java/dev/dbos/transact/client/PgSqlClientTest.java index f5f2d8f0..6a3358bd 100644 --- a/transact/src/test/java/dev/dbos/transact/client/PgSqlClientTest.java +++ b/transact/src/test/java/dev/dbos/transact/client/PgSqlClientTest.java @@ -53,7 +53,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); DBOS.registerQueue(new Queue("testQueue")); service = DBOS.registerWorkflows(ClientService.class, new ClientServiceImpl()); DBOS.launch(); diff --git a/transact/src/test/java/dev/dbos/transact/config/ConfigTest.java b/transact/src/test/java/dev/dbos/transact/config/ConfigTest.java index 56c2f365..5aedb644 100644 --- a/transact/src/test/java/dev/dbos/transact/config/ConfigTest.java +++ b/transact/src/test/java/dev/dbos/transact/config/ConfigTest.java @@ -56,7 +56,7 @@ public void configOverridesEnvAppVerAndExecutor() throws Exception { .withAppVersion("test-app-version") .withExecutorId("test-executor-id"); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); try { DBOS.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(); @@ -81,7 +81,7 @@ public void envAppVerAndExecutor() throws Exception { .withDbUser("postgres") .withDbPassword(System.getenv("PGPASSWORD")); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); try { DBOS.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(); @@ -109,7 +109,7 @@ public void dbosCloudEnvOverridesConfigAppVerAndExecutor() throws Exception { .withAppVersion("test-app-version") .withExecutorId("test-executor-id"); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); try { DBOS.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(); @@ -129,7 +129,7 @@ public void localExecutorId() throws Exception { .withDbUser("postgres") .withDbPassword(System.getenv("PGPASSWORD")); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); try { DBOS.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(); @@ -147,7 +147,7 @@ public void conductorExecutorId() throws Exception { .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys") .withConductorKey("test-conductor-key"); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); try { DBOS.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(); @@ -167,7 +167,7 @@ public void cantSetExecutorIdWhenUsingConductor() throws Exception { .withConductorKey("test-conductor-key") .withExecutorId("test-executor-id"); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); try { assertThrows(IllegalArgumentException.class, () -> DBOS.launch()); } finally { @@ -202,7 +202,7 @@ public void calcAppVersion() throws Exception { .withDbUser("postgres") .withDbPassword(System.getenv(Constants.POSTGRES_PASSWORD_ENV_VAR)); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); try { DBOS.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(); @@ -240,7 +240,7 @@ public void configPGSimpleDataSource() throws Exception { .withDbUser("invalid-user") .withDbPassword("invalid-password"); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); var impl = new HawkServiceImpl(); var proxy = DBOS.registerWorkflows(HawkService.class, impl); @@ -286,7 +286,7 @@ public void configHikariDataSource() throws Exception { .withDbUser("invalid-user") .withDbPassword("invalid-password"); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); assertFalse(dataSource.isClosed()); var impl = new HawkServiceImpl(); @@ -334,7 +334,7 @@ public void appVersion() throws Exception { DBOSConfig.defaultsFromEnv("systemdbtest") .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys"); DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var proxy = DBOS.registerWorkflows(ExecutorTestService.class, new ExecutorTestServiceImpl()); DBOS.launch(); diff --git a/transact/src/test/java/dev/dbos/transact/database/MetricsTest.java b/transact/src/test/java/dev/dbos/transact/database/MetricsTest.java index 94ba21af..1bc8847c 100644 --- a/transact/src/test/java/dev/dbos/transact/database/MetricsTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/MetricsTest.java @@ -56,7 +56,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(config); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); proxy = DBOS.registerWorkflows(MetricsService.class, new MetricsServiceImpl()); DBOS.launch(); } diff --git a/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java b/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java index c075b539..a2c5a12e 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java @@ -44,7 +44,7 @@ void setUp() throws SQLException { DBUtils.recreateDB(dbosConfig); dataSource = SystemDatabase.createDataSource(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach @@ -154,7 +154,6 @@ void executeWorkflowByIdNonExistent() throws Exception { @Test void workflowFunctionNotfound() throws Exception { - ExecutingService executingService = DBOS.registerWorkflows(ExecutingService.class, new ExecutingServiceImpl()); DBOS.launch(); @@ -172,8 +171,8 @@ void workflowFunctionNotfound() throws Exception { assertEquals(wfs.get(0).status(), WorkflowState.SUCCESS.name()); DBOS.shutdown(); - DBOSTestAccess.clearRegistry(); // clear out the registry - DBOS.launch(); // restart dbos + DBOSTestAccess.reinitialize(dbosConfig); // reinitialize to clear out the registry + DBOS.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(); boolean error = false; diff --git a/transact/src/test/java/dev/dbos/transact/execution/LifecycleTest.java b/transact/src/test/java/dev/dbos/transact/execution/LifecycleTest.java index b59d5073..d2a98a45 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/LifecycleTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/LifecycleTest.java @@ -61,6 +61,7 @@ public int doNotRunWF(int nInstances, int nWfs) { } class TestLifecycleService implements DBOSLifecycleListener { + private DBOS.Instance dbos; public int launchCount = 0; public int shutdownCount = 0; public int nInstances = 0; @@ -70,13 +71,14 @@ class TestLifecycleService implements DBOSLifecycleListener { public ArrayList wfs = new ArrayList<>(); @Override - public void dbosLaunched() { + public void dbosLaunched(DBOS.Instance dbos) { + this.dbos = dbos; var expectedParams = new Class[] {int.class, int.class}; ++launchCount; - nInstances = DBOS.getRegisteredWorkflowInstances().size(); - var wfs = DBOS.getRegisteredWorkflows(); + nInstances = dbos.getRegisteredWorkflowInstances().size(); + var wfs = dbos.getRegisteredWorkflows(); for (var wf : wfs) { var method = wf.workflowMethod(); var tag = method.getAnnotation(TestLifecycleAnnotation.class); @@ -105,7 +107,7 @@ public int runThemAll() throws Exception { int total = 0; for (var wf : wfs) { Object[] args = {nInstances, nWfs}; - var h = DBOS.startWorkflow(wf, args, new StartWorkflowOptions(UUID.randomUUID().toString())); + var h = dbos.startWorkflow(wf, args, new StartWorkflowOptions(UUID.randomUUID().toString())); total += (Integer) h.getResult(); } return total; @@ -128,7 +130,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); impl = new LifecycleTestWorkflowsImpl(); DBOS.registerWorkflows(LifecycleTestWorkflows.class, impl, "inst1"); diff --git a/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java b/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java index 3787533f..a2ff2e1b 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java @@ -53,7 +53,7 @@ void setUp() throws SQLException { DBUtils.recreateDB(dbosConfig); dataSource = SystemDatabase.createDataSource(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); executingService = DBOS.registerWorkflows( ExecutingService.class, executingServiceImpl = new ExecutingServiceImpl()); @@ -166,7 +166,7 @@ public void recoveryThreadTest() throws Exception { DBOS.shutdown(); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); // dbos = DBOS.getInstance(); // need to register again diff --git a/transact/src/test/java/dev/dbos/transact/execution/ScaleTest.java b/transact/src/test/java/dev/dbos/transact/execution/ScaleTest.java index 6e48853b..79e3a0ec 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/ScaleTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/ScaleTest.java @@ -55,7 +55,7 @@ public static void onetimeBefore() { @BeforeEach void setUp() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/execution/SingleExecutionTest.java b/transact/src/test/java/dev/dbos/transact/execution/SingleExecutionTest.java index 323f7cea..fbbe985d 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/SingleExecutionTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/SingleExecutionTest.java @@ -280,7 +280,7 @@ static void onetimeShutdown() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); execImpl = new TryConcExec(); execIfc = DBOS.registerWorkflows(TryConcExecIfc.class, execImpl); diff --git a/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java b/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java index f044ec71..01b1419b 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.utils.DBUtils; @@ -41,7 +42,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var impl = new HawkServiceImpl(); proxy = DBOS.registerWorkflows(HawkService.class, impl); impl.setProxy(proxy); diff --git a/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java b/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java index 70f0078e..7d6bd191 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; import dev.dbos.transact.database.SystemDatabase; @@ -45,7 +46,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var impl = new HawkServiceImpl(); proxy = DBOS.registerWorkflows(HawkService.class, impl); impl.setProxy(proxy); diff --git a/transact/src/test/java/dev/dbos/transact/invocation/InstanceTest.java b/transact/src/test/java/dev/dbos/transact/invocation/InstanceTest.java new file mode 100644 index 00000000..9bbb3941 --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/invocation/InstanceTest.java @@ -0,0 +1,511 @@ +package dev.dbos.transact.invocation; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException; +import dev.dbos.transact.utils.DBUtils; +import dev.dbos.transact.workflow.Step; +import dev.dbos.transact.workflow.Timeout; +import dev.dbos.transact.workflow.Workflow; + +import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.UUID; + +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class HawkServiceInstanceImpl implements HawkService { + private final DBOS.Instance dbos; + private HawkService proxy; + + public HawkServiceInstanceImpl(DBOS.Instance dbos) { + this.dbos = dbos; + } + + public void setProxy(HawkService proxy) { + this.proxy = proxy; + } + + @Workflow + @Override + public String simpleWorkflow() { + return LocalDate.now().format(DateTimeFormatter.ISO_DATE); + } + + @Workflow + @Override + public String sleepWorkflow(long sleepSec) { + var duration = Duration.ofSeconds(sleepSec); + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return LocalDate.now().format(DateTimeFormatter.ISO_DATE); + } + + @Workflow + @Override + public String parentWorkflow() { + return proxy.simpleWorkflow(); + } + + @Workflow + @Override + public String parentStartWorkflow() { + var handle = dbos.startWorkflow(() -> proxy.simpleWorkflow()); + return handle.getResult(); + } + + @Workflow + @Override + public String parentSleepWorkflow(Long timeoutSec, long sleepSec) { + var duration = + timeoutSec == null + ? Timeout.inherit() + : timeoutSec == 0L ? Timeout.none() : Timeout.of(Duration.ofSeconds(timeoutSec)); + var options = new WorkflowOptions().withTimeout(duration); + try (var o = options.setContext()) { + return proxy.sleepWorkflow(sleepSec); + } + } + + @Step + @Override + public Instant nowStep() { + return Instant.now(); + } + + @Workflow + @Override + public Instant stepWorkflow() { + return proxy.nowStep(); + } + + @Step + @Override + public String illegalStep() { + return proxy.simpleWorkflow(); + } + + @Workflow + @Override + public String illegalWorkflow() { + return proxy.illegalStep(); + } +} + +@org.junit.jupiter.api.Timeout(value = 2, unit = java.util.concurrent.TimeUnit.MINUTES) +public class InstanceTest { + private static DBOSConfig dbosConfig; + private DBOS.Instance dbos; + private HawkService proxy; + private HikariDataSource dataSource; + private String localDate = LocalDate.now().format(DateTimeFormatter.ISO_DATE); + + @BeforeAll + static void onetimeSetup() throws Exception { + + dbosConfig = + DBOSConfig.defaultsFromEnv("systemdbtest") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys"); + } + + @BeforeEach + void beforeEachTest() throws SQLException { + DBUtils.recreateDB(dbosConfig); + + // Note, manually injecting the DBOS instance here is a poor developer experience + // Opened https://github.com/dbos-inc/dbos-transact-java/issues/296 to track improving this + + dbos = new DBOS.Instance(dbosConfig); + var impl = new HawkServiceInstanceImpl(dbos); + proxy = dbos.registerWorkflows(HawkService.class, impl); + impl.setProxy(proxy); + + dbos.launch(); + + dataSource = SystemDatabase.createDataSource(dbosConfig); + } + + @AfterEach + void afterEachTest() throws Exception { + dataSource.close(); + dbos.shutdown(); + } + + @Test + void directInvoke() throws Exception { + + var result = proxy.simpleWorkflow(); + assertEquals(localDate, result); + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertDoesNotThrow(() -> UUID.fromString((String) row.workflowId())); + assertEquals("SUCCESS", row.status()); + assertEquals("simpleWorkflow", row.name()); + assertEquals("dev.dbos.transact.invocation.HawkServiceInstanceImpl", row.className()); + assertNotNull(row.output()); + assertNull(row.error()); + assertNull(row.timeoutMs()); + assertNull(row.deadlineEpochMs()); + } + + @Test + void directInvokeSetWorkflowId() throws Exception { + + String workflowId = "directInvokeSetWorkflowId"; + try (var _o = new WorkflowOptions(workflowId).setContext()) { + var result = proxy.simpleWorkflow(); + assertEquals(localDate, result); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertEquals(workflowId, row.workflowId()); + } + + @Test + void directInvokeSetTimeout() throws Exception { + + var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); + try (var _o = options.setContext()) { + var result = proxy.sleepWorkflow(1); + assertEquals(localDate, result); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertEquals(10000L, row.timeoutMs()); + assertNotNull(row.deadlineEpochMs()); + } + + @Test + void directInvokeSetZeroTimeout() throws Exception { + + var options = new WorkflowOptions().withTimeout(Timeout.none()); + try (var _o = options.setContext()) { + var result = proxy.sleepWorkflow(1); + assertEquals(localDate, result); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertNull(row.timeoutMs()); + assertNull(row.deadlineEpochMs()); + } + + @Test + void directInvokeSetWorkflowIdAndTimeout() throws Exception { + + String workflowId = "directInvokeSetWorkflowIdAndTimeout"; + var options = new WorkflowOptions(workflowId).withTimeout(Duration.ofSeconds(10)); + try (var _o = options.setContext()) { + var result = proxy.sleepWorkflow(1); + assertEquals(localDate, result); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertEquals(workflowId, row.workflowId()); + assertEquals(10000L, row.timeoutMs()); + assertNotNull(row.deadlineEpochMs()); + } + + @Test + void directInvokeTimeoutCancellation() throws Exception { + + var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(1)); + try (var _o = options.setContext()) { + assertThrows(DBOSAwaitedWorkflowCancelledException.class, () -> proxy.sleepWorkflow(10L)); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertEquals("CANCELLED", row.status()); + assertNull(row.output()); + assertNull(row.error()); + } + + @Test + void directInvokeTimeoutDeadline() throws Exception { + + var options = + new WorkflowOptions().withDeadline(Instant.ofEpochMilli(System.currentTimeMillis() + 1000)); + try (var _o = options.setContext()) { + assertThrows(DBOSAwaitedWorkflowCancelledException.class, () -> proxy.sleepWorkflow(10L)); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertEquals("CANCELLED", row.status()); + assertNull(row.output()); + assertNull(row.error()); + } + + @Test + void directInvokeSetWorkflowIdTimeoutCancellation() throws Exception { + + var workflowId = "directInvokeSetWorkflowIdTimeoutCancellation"; + var options = new WorkflowOptions(workflowId).withTimeout(Duration.ofSeconds(1)); + try (var _o = options.setContext()) { + assertThrows(DBOSAwaitedWorkflowCancelledException.class, () -> proxy.sleepWorkflow(10L)); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertEquals(workflowId, row.workflowId()); + assertEquals("CANCELLED", row.status()); + assertNull(row.output()); + assertNull(row.error()); + } + + @Test + void directInvokeParent() throws Exception { + + var result = proxy.parentWorkflow(); + assertEquals(localDate, result); + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, rows.size()); + var row0 = rows.get(0); + var row1 = rows.get(1); + assertDoesNotThrow(() -> UUID.fromString(row0.workflowId())); + assertEquals(row0.workflowId() + "-0", row1.workflowId()); + assertEquals("SUCCESS", row0.status()); + assertEquals("SUCCESS", row1.status()); + assertEquals("parentWorkflow", row0.name()); + assertEquals("simpleWorkflow", row1.name()); + assertEquals(row0.output(), row1.output()); + assertNull(row0.timeoutMs()); + assertNull(row1.timeoutMs()); + assertNull(row0.deadlineEpochMs()); + assertNull(row1.deadlineEpochMs()); + assertNull(row0.parentWorkflowId()); + assertEquals(row0.workflowId(), row1.parentWorkflowId()); + + var steps = DBUtils.getStepRows(dataSource, row0.workflowId()); + assertEquals(1, steps.size()); + var step = steps.get(0); + assertEquals(row0.workflowId(), step.workflowId()); + assertEquals(0, step.functionId()); + assertNull(step.output()); + assertNull(step.error()); + assertEquals("simpleWorkflow", step.functionName()); + assertEquals(row1.workflowId(), step.childWorkflowId()); + } + + @Test + void directInvokeParentStartWorkflow() throws Exception { + var result = proxy.parentStartWorkflow(); + assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, rows.size()); + var row0 = rows.get(0); + var row1 = rows.get(1); + assertDoesNotThrow(() -> UUID.fromString(row0.workflowId())); + assertEquals(row0.workflowId() + "-0", row1.workflowId()); + assertEquals("SUCCESS", row0.status()); + assertEquals("SUCCESS", row1.status()); + assertEquals("parentStartWorkflow", row0.name()); + assertEquals("simpleWorkflow", row1.name()); + assertEquals(row0.output(), row1.output()); + assertNull(row0.timeoutMs()); + assertNull(row1.timeoutMs()); + assertNull(row0.deadlineEpochMs()); + assertNull(row1.deadlineEpochMs()); + + var steps = DBUtils.getStepRows(dataSource, row0.workflowId()); + assertEquals(2, steps.size()); + var step = steps.get(0); + var gr = steps.get(1); + assertEquals(row0.workflowId(), step.workflowId()); + assertEquals(0, step.functionId()); + assertNull(step.output()); + assertNull(step.error()); + assertEquals("simpleWorkflow", step.functionName()); + assertEquals(row1.workflowId(), step.childWorkflowId()); + assertEquals("DBOS.getResult", gr.functionName()); + } + + @Test + void directInvokeParentSetWorkflowId() throws Exception { + + String workflowId = "directInvokeParentSetWorkflowId"; + try (var _o = new WorkflowOptions(workflowId).setContext()) { + var result = proxy.parentWorkflow(); + assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, rows.size()); + var row0 = rows.get(0); + var row1 = rows.get(1); + assertEquals(workflowId, row0.workflowId()); + assertEquals(workflowId + "-0", row1.workflowId()); + assertEquals(workflowId, row1.parentWorkflowId()); + } + + @Test + void directInvokeParentSetTimeout() throws Exception { + + var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); + try (var _o = options.setContext()) { + var result = proxy.parentSleepWorkflow(null, 1); + assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, rows.size()); + var row0 = rows.get(0); + var row1 = rows.get(1); + assertEquals(10000L, row0.timeoutMs()); + assertEquals(10000L, row1.timeoutMs()); + assertNotNull(row0.deadlineEpochMs()); + assertNotNull(row1.deadlineEpochMs()); + assertEquals(row0.deadlineEpochMs(), row1.deadlineEpochMs()); + } + + @Test + void directInvokeParentSetTimeoutParent() throws Exception { + + var result = proxy.parentSleepWorkflow(5L, 1); + assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, rows.size()); + var row0 = rows.get(0); + var row1 = rows.get(1); + assertNull(row0.timeoutMs()); + assertNull(row0.deadlineEpochMs()); + assertEquals(5000L, row1.timeoutMs()); + assertNotNull(row1.deadlineEpochMs()); + } + + @Test + void directInvokeParentSetTimeoutParent2() throws Exception { + + var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); + try (var _o = options.setContext()) { + var result = proxy.parentSleepWorkflow(5L, 1); + assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, rows.size()); + var row0 = rows.get(0); + var row1 = rows.get(1); + + assertEquals(10000L, row0.timeoutMs()); + assertNotNull(row0.deadlineEpochMs()); + + assertEquals(5000L, row1.timeoutMs()); + assertNotNull(row1.deadlineEpochMs()); + } + + @Test + void directInvokeParentSetTimeoutParent3() throws Exception { + + var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); + try (var _o = options.setContext()) { + var result = proxy.parentSleepWorkflow(0L, 1); + assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); + } + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, rows.size()); + var row0 = rows.get(0); + var row1 = rows.get(1); + + assertEquals(10000L, row0.timeoutMs()); + assertNotNull(row0.deadlineEpochMs()); + + assertNull(row1.timeoutMs()); + assertNull(row1.deadlineEpochMs()); + } + + @Test + void invokeWorkflowFromStepThrows() throws Exception { + var ise = assertThrows(IllegalStateException.class, () -> proxy.illegalWorkflow()); + assertEquals("cannot invoke a workflow from a step", ise.getMessage()); + + var wfs = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, wfs.size()); + var wf = wfs.get(0); + assertNotNull(wf.workflowId()); + + var steps = dbos.listWorkflowSteps(wf.workflowId()); + assertEquals(1, steps.size()); + var step = steps.get(0); + assertEquals(0, step.functionId()); + assertNull(step.output()); + assertEquals("cannot invoke a workflow from a step", step.error().message()); + assertEquals("cannot invoke a workflow from a step", step.error().throwable().getMessage()); + assertEquals("illegalStep", step.functionName()); + } + + @Test + void directInvokeStep() throws Exception { + var result = proxy.stepWorkflow(); + assertNotNull(result); + + var wfs = DBUtils.getWorkflowRows(dataSource); + assertEquals(1, wfs.size()); + var wf = wfs.get(0); + assertNotNull(wf.workflowId()); + + var steps = DBUtils.getStepRows(dataSource, wf.workflowId()); + assertEquals(1, steps.size()); + var step = steps.get(0); + assertEquals(0, step.functionId()); + assertNotNull(step.output()); + assertNull(step.error()); + assertEquals("nowStep", step.functionName()); + } + + @Test + void directInvokeParentSetParentTimeout() throws Exception { + + var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); + try (var _o = options.setContext()) { + var result = proxy.parentWorkflow(); + assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); + } + + var table = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, table.size()); + var row0 = table.get(0); + var row1 = table.get(1); + assertEquals(10000L, row0.timeoutMs()); + assertEquals(10000L, row1.timeoutMs()); + assertNotNull(row0.deadlineEpochMs()); + assertNotNull(row1.deadlineEpochMs()); + assertEquals(row0.deadlineEpochMs(), row1.deadlineEpochMs()); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/invocation/MockDbosInstanceTest.java b/transact/src/test/java/dev/dbos/transact/invocation/MockDbosInstanceTest.java new file mode 100644 index 00000000..21bb5ff1 --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/invocation/MockDbosInstanceTest.java @@ -0,0 +1,71 @@ +package dev.dbos.transact.invocation; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.execution.ThrowingSupplier; +import dev.dbos.transact.workflow.Workflow; + +import java.time.Duration; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; + +interface MockTestService { + String testWorkflow(); +} + +class MockTestServiceImpl implements MockTestService { + private final DBOS.Instance dbos; + + public MockTestServiceImpl(DBOS.Instance instance) { + this.dbos = instance; + } + + @Workflow + @Override + public String testWorkflow() { + var today = dbos.runStep(() -> LocalDate.now(), "todaysDate"); + dbos.setEvent("greetEvent", today); + var name = dbos.recv("greetTopic", Duration.ofSeconds(30)); + return String.format("Hello %s, today is %s", name, today.format(DateTimeFormatter.ISO_DATE)); + } +} + +@org.junit.jupiter.api.Timeout(value = 2, unit = java.util.concurrent.TimeUnit.MINUTES) +public class MockDbosInstanceTest { + @Test + public void testMockInstance() throws Exception { + var mockDBOS = mock(DBOS.Instance.class); + var impl = new MockTestServiceImpl(mockDBOS); + + var date = LocalDate.of(2024, 1, 1); + + when(mockDBOS.runStep( + ArgumentMatchers.>any(), + ArgumentMatchers.eq("todaysDate"))) + .thenReturn(date); + when(mockDBOS.recv( + ArgumentMatchers.eq("greetTopic"), ArgumentMatchers.eq(Duration.ofSeconds(30)))) + .thenReturn("Alice"); + + // Call the workflow + String result = impl.testWorkflow(); + + // Verify output + assertEquals("Hello Alice, today is 2024-01-01", result); + + verify(mockDBOS) + .runStep( + ArgumentMatchers.>any(), + ArgumentMatchers.eq("todaysDate")); + verify(mockDBOS).setEvent(ArgumentMatchers.eq("greetEvent"), ArgumentMatchers.eq(date)); + verify(mockDBOS) + .recv(ArgumentMatchers.eq("greetTopic"), ArgumentMatchers.eq(Duration.ofSeconds(30))); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/invocation/MultiInstTest.java b/transact/src/test/java/dev/dbos/transact/invocation/MultiClassInstanceTest.java similarity index 98% rename from transact/src/test/java/dev/dbos/transact/invocation/MultiInstTest.java rename to transact/src/test/java/dev/dbos/transact/invocation/MultiClassInstanceTest.java index 0e639587..e8a1523f 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/MultiInstTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/MultiClassInstanceTest.java @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Test; @org.junit.jupiter.api.Timeout(value = 2, unit = java.util.concurrent.TimeUnit.MINUTES) -public class MultiInstTest { +public class MultiClassInstanceTest { private static DBOSConfig dbosConfig; HawkServiceImpl himpl; BearServiceImpl bimpla; @@ -44,7 +44,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); himpl = new HawkServiceImpl(); bimpla = new BearServiceImpl(); bimpl1 = new BearServiceImpl(); diff --git a/transact/src/test/java/dev/dbos/transact/invocation/MultiDbosInstanceTest.java b/transact/src/test/java/dev/dbos/transact/invocation/MultiDbosInstanceTest.java new file mode 100644 index 00000000..f1cdfacd --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/invocation/MultiDbosInstanceTest.java @@ -0,0 +1,151 @@ +package dev.dbos.transact.invocation; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.StartWorkflowOptions; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.utils.DBUtils; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.Workflow; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.UUID; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +interface TestService { + String testWorkflow(String name); +} + +class TestServiceImpl implements TestService { + private final DBOS.Instance dbos; + + public TestServiceImpl(DBOS.Instance instance) { + this.dbos = instance; + } + + @Workflow + public String testWorkflow(String name) { + var today = dbos.runStep(() -> LocalDate.now(), "todaysDate"); + return String.format("Hello %s, today is %s", name, today.format(DateTimeFormatter.ISO_DATE)); + } +} + +@org.junit.jupiter.api.Timeout(value = 2, unit = java.util.concurrent.TimeUnit.MINUTES) +public class MultiDbosInstanceTest { + + private static DBOSConfig dbosConfigA; + private DBOS.Instance dbosA; + private TestService proxyA; + private TestServiceImpl implA; + private Queue queueA; + + private static DBOSConfig dbosConfigB; + private DBOS.Instance dbosB; + private TestServiceImpl implB; + private TestService proxyB; + private Queue queueB; + + @BeforeAll + static void onetimeSetup() throws Exception { + dbosConfigA = + DBOSConfig.defaultsFromEnv("MultiDbosInstanceTestA") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_multi_a"); + dbosConfigB = + DBOSConfig.defaultsFromEnv("MultiDbosInstanceTestB") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_multi_b"); + } + + @BeforeEach + void beforeEachTest() throws Exception { + DBUtils.recreateDB(dbosConfigA); + dbosA = new DBOS.Instance(dbosConfigA); + implA = new TestServiceImpl(dbosA); + proxyA = dbosA.registerWorkflows(TestService.class, implA); + queueA = new Queue("queueA"); + dbosA.registerQueue(queueA); + + dbosA.launch(); + + DBUtils.recreateDB(dbosConfigB); + dbosB = new DBOS.Instance(dbosConfigB); + implB = new TestServiceImpl(dbosB); + proxyB = dbosB.registerWorkflows(TestService.class, implB); + queueB = new Queue("queueB"); + dbosB.registerQueue(queueB); + dbosB.launch(); + } + + @AfterEach + void afterEachTest() throws Exception { + dbosA.shutdown(); + dbosB.shutdown(); + } + + @Test + public void testDirectMultipleInstances() throws Exception { + var wfidA = UUID.randomUUID().toString(); + String resultA; + try (var o = new WorkflowOptions(wfidA).setContext()) { + resultA = proxyA.testWorkflow("hawk"); + } + + var wfidB = UUID.randomUUID().toString(); + String resultB; + try (var o = new WorkflowOptions(wfidB).setContext()) { + resultB = proxyB.testWorkflow("bear"); + } + + String formattedCurrentDate = LocalDate.now().format(DateTimeFormatter.ISO_DATE); + assertEquals("Hello hawk, today is " + formattedCurrentDate, resultA); + assertEquals("Hello bear, today is " + formattedCurrentDate, resultB); + + var rowsA = DBUtils.getWorkflowRows(dbosConfigA); + var rowsB = DBUtils.getWorkflowRows(dbosConfigB); + assertEquals(1, rowsA.size()); + assertEquals(1, rowsB.size()); + assertEquals(wfidA, rowsA.get(0).workflowId()); + assertEquals(wfidB, rowsB.get(0).workflowId()); + } + + @Test + public void testEnqueueMultipleInstances() throws Exception { + var handleA = + dbosA.startWorkflow(() -> proxyA.testWorkflow("hawk"), new StartWorkflowOptions(queueA)); + var handleB = + dbosB.startWorkflow(() -> proxyB.testWorkflow("bear"), new StartWorkflowOptions(queueB)); + + String formattedCurrentDate = LocalDate.now().format(DateTimeFormatter.ISO_DATE); + assertEquals("Hello hawk, today is " + formattedCurrentDate, handleA.getResult()); + assertEquals("Hello bear, today is " + formattedCurrentDate, handleB.getResult()); + + var rowsA = DBUtils.getWorkflowRows(dbosConfigA); + var rowsB = DBUtils.getWorkflowRows(dbosConfigB); + assertEquals(1, rowsA.size()); + assertEquals(1, rowsB.size()); + assertEquals(handleA.workflowId(), rowsA.get(0).workflowId()); + assertEquals(handleB.workflowId(), rowsB.get(0).workflowId()); + } + + @Test + public void cantStartOnWrongInstance() throws Exception { + assertThrows( + IllegalStateException.class, () -> dbosA.startWorkflow(() -> proxyB.testWorkflow("bear"))); + } + + @Test + public void cantEnqueueOnWrongQueueInstance() throws Exception { + assertThrows( + IllegalArgumentException.class, + () -> + dbosA.startWorkflow( + () -> proxyA.testWorkflow("hawk"), new StartWorkflowOptions(queueB))); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java b/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java index ec8c6c50..dabd470d 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java @@ -125,7 +125,7 @@ public void testPatch() throws Exception { try (var dataSource = SystemDatabase.createDataSource(dbosConfig)) { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var proxy1 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplOne()); DBOS.launch(); @@ -140,7 +140,7 @@ public void testPatch() throws Exception { // Recreate DBOS with a new (patched) version of a workflow DBOS.shutdown(); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var proxy2 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplTwo()); DBOS.launch(); @@ -165,7 +165,7 @@ public void testPatch() throws Exception { // Recreate DBOS with another new (patched) version of a workflow DBOS.shutdown(); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var proxy3 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplThree()); DBOS.launch(); @@ -197,7 +197,7 @@ public void testPatch() throws Exception { // Now, let's deprecate the patch DBOS.shutdown(); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var proxy4 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplFour()); DBOS.launch(); @@ -228,7 +228,7 @@ public void testPatch() throws Exception { // Now, let's deprecate the patch DBOS.shutdown(); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var proxy5 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplFive()); DBOS.launch(); @@ -264,7 +264,7 @@ public void patchThrowsNotConfigured() throws Exception { .withAppVersion("test-version"); DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var proxy2 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplTwo()); DBOS.launch(); @@ -280,7 +280,7 @@ public void deprecatePatchThrowsNotConfigured() throws Exception { .withAppVersion("test-version"); DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var proxy4 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplFour()); DBOS.launch(); @@ -296,7 +296,7 @@ public void mulipleDefinitions() throws Exception { .withAppVersion("test-version"); DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); @SuppressWarnings("unused") var proxy5 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplFive()); diff --git a/transact/src/test/java/dev/dbos/transact/invocation/StartWorkflowTest.java b/transact/src/test/java/dev/dbos/transact/invocation/StartWorkflowTest.java index 930acb44..6c79ec92 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/StartWorkflowTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/StartWorkflowTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.utils.DBUtils; @@ -39,7 +40,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); var impl = new HawkServiceImpl(); proxy = DBOS.registerWorkflows(HawkService.class, impl); impl.setProxy(proxy); diff --git a/transact/src/test/java/dev/dbos/transact/issues/Issue218.java b/transact/src/test/java/dev/dbos/transact/issues/Issue218.java index 72db377f..86b3a555 100644 --- a/transact/src/test/java/dev/dbos/transact/issues/Issue218.java +++ b/transact/src/test/java/dev/dbos/transact/issues/Issue218.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.database.SystemDatabase; @@ -92,7 +93,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); dataSource = SystemDatabase.createDataSource(dbosConfig); } diff --git a/transact/src/test/java/dev/dbos/transact/json/InteropTest.java b/transact/src/test/java/dev/dbos/transact/json/InteropTest.java index 5c12ce57..cad83203 100644 --- a/transact/src/test/java/dev/dbos/transact/json/InteropTest.java +++ b/transact/src/test/java/dev/dbos/transact/json/InteropTest.java @@ -4,6 +4,7 @@ import dev.dbos.transact.DBOS; import dev.dbos.transact.DBOSClient; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.utils.DBUtils; @@ -110,7 +111,7 @@ static void onetimeSetup() throws Exception { void beforeEachTest() throws Exception { DBUtils.recreateDB(dbosConfig); dataSource = SystemDatabase.createDataSource(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/json/PortableSerializationTest.java b/transact/src/test/java/dev/dbos/transact/json/PortableSerializationTest.java index c0c1bd1d..cec13a17 100644 --- a/transact/src/test/java/dev/dbos/transact/json/PortableSerializationTest.java +++ b/transact/src/test/java/dev/dbos/transact/json/PortableSerializationTest.java @@ -4,6 +4,7 @@ import dev.dbos.transact.DBOS; import dev.dbos.transact.DBOSClient; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.database.SystemDatabase; @@ -56,7 +57,7 @@ static void onetimeSetup() throws Exception { void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); dataSource = SystemDatabase.createDataSource(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach @@ -798,7 +799,7 @@ public void testCustomSerializer() throws Exception { // Reinitialize with custom serializer var customConfig = dbosConfig.withSerializer(new TestBase64Serializer()); - DBOS.reinitialize(customConfig); + DBOSTestAccess.reinitialize(customConfig); Queue testQueue = new Queue("testq"); DBOS.registerQueue(testQueue); @@ -871,7 +872,7 @@ public void testCustomSerializerInterop() throws Exception { // Phase 2: Relaunch with custom serializer var customConfig = dbosConfig.withSerializer(new TestBase64Serializer()); - DBOS.reinitialize(customConfig); + DBOSTestAccess.reinitialize(customConfig); DBOS.registerQueue(testQueue); DBOS.registerWorkflows(EventSetterService.class, new EventSetterServiceImpl()); DBOS.launch(); @@ -901,7 +902,7 @@ public void testCustomSerializerInterop() throws Exception { DBOS.shutdown(); // Phase 3: Relaunch with custom serializer again, verify Phase 2 data still readable - DBOS.reinitialize(customConfig); + DBOSTestAccess.reinitialize(customConfig); DBOS.registerQueue(testQueue); DBOS.registerWorkflows(EventSetterService.class, new EventSetterServiceImpl()); DBOS.launch(); @@ -923,7 +924,7 @@ public void testCustomSerializerRemoved() throws Exception { // Launch with custom serializer var customConfig = dbosConfig.withSerializer(new TestBase64Serializer()); - DBOS.reinitialize(customConfig); + DBOSTestAccess.reinitialize(customConfig); Queue testQueue = new Queue("testq"); DBOS.registerQueue(testQueue); @@ -947,7 +948,7 @@ public void testCustomSerializerRemoved() throws Exception { DBOS.shutdown(); // Relaunch WITHOUT custom serializer - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); DBOS.registerQueue(testQueue); DBOS.registerWorkflows(EventSetterService.class, new EventSetterServiceImpl()); DBOS.launch(); diff --git a/transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java b/transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java index 1acc24f3..e0743932 100644 --- a/transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java +++ b/transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java @@ -179,7 +179,7 @@ static void oneTimeShutdown() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); impl = new EventsServiceImpl(); proxy = DBOS.registerWorkflows(EventsService.class, impl); DBOS.launch(); diff --git a/transact/src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java b/transact/src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java index 74c241d8..ef00f5d1 100644 --- a/transact/src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java +++ b/transact/src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.*; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; @@ -152,7 +153,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/queue/PartitionedQueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/PartitionedQueuesTest.java index b2029de5..0e19af77 100644 --- a/transact/src/test/java/dev/dbos/transact/queue/PartitionedQueuesTest.java +++ b/transact/src/test/java/dev/dbos/transact/queue/PartitionedQueuesTest.java @@ -7,6 +7,7 @@ import dev.dbos.transact.DBOS; import dev.dbos.transact.DBOSClient; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.database.SystemDatabase; @@ -97,7 +98,7 @@ void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); dataSource = SystemDatabase.createDataSource(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java index 080568eb..309c2262 100644 --- a/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java +++ b/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java @@ -52,7 +52,7 @@ void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); dataSource = SystemDatabase.createDataSource(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach @@ -657,7 +657,7 @@ public void testQueueConcurrencyUnderRecovery() throws Exception { @Test public void testListenQueue() throws Exception { var config = dbosConfig.withListenQueue("queueOne"); - DBOS.reinitialize(config); + DBOSTestAccess.reinitialize(config); Queue queueOne = new Queue("queueOne"); Queue queueTwo = new Queue("queueTwo"); diff --git a/transact/src/test/java/dev/dbos/transact/scheduled/SchedulerServiceTest.java b/transact/src/test/java/dev/dbos/transact/scheduled/SchedulerServiceTest.java index 0577c9fa..22575a96 100644 --- a/transact/src/test/java/dev/dbos/transact/scheduled/SchedulerServiceTest.java +++ b/transact/src/test/java/dev/dbos/transact/scheduled/SchedulerServiceTest.java @@ -33,7 +33,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach @@ -97,10 +97,12 @@ public void simpleScheduledWorkflow() throws Exception { assertTrue(q2workflows.size() >= 1); assertEquals("q2", q2workflows.get(0).queueName()); + DBOS.shutdown(); + // See about makeup work (ignore missed) var timeToSleep = 5000 - (System.currentTimeMillis() - timeAsOfShutdown); Thread.sleep(timeToSleep < 0 ? 0 : timeToSleep); - schedulerService.dbosLaunched(); + DBOS.launch(); Thread.sleep(2000); int count1imb = impl.everySecondCounterIgnoreMissed; diff --git a/transact/src/test/java/dev/dbos/transact/step/StepsTest.java b/transact/src/test/java/dev/dbos/transact/step/StepsTest.java index dd772514..347de35a 100644 --- a/transact/src/test/java/dev/dbos/transact/step/StepsTest.java +++ b/transact/src/test/java/dev/dbos/transact/step/StepsTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.*; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; import dev.dbos.transact.utils.DBUtils; @@ -30,7 +31,7 @@ static void onetimeSetup() throws Exception { void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java index 33aca87a..f527dc27 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java +++ b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java @@ -208,6 +208,12 @@ public static Connection getConnection(DBOSConfig config) throws SQLException { return DriverManager.getConnection(config.databaseUrl(), config.dbUser(), config.dbPassword()); } + public static List getWorkflowRows(DBOSConfig config) throws SQLException { + try (var ds = SystemDatabase.createDataSource(config)) { + return getWorkflowRows(ds); + } + } + public static List getWorkflowRows(DataSource ds) throws SQLException { return getWorkflowRows(ds, null); } diff --git a/transact/src/test/java/dev/dbos/transact/workflow/AsyncWorkflowTest.java b/transact/src/test/java/dev/dbos/transact/workflow/AsyncWorkflowTest.java index 6ac9df2d..eb45713b 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/AsyncWorkflowTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/AsyncWorkflowTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; @@ -33,7 +34,7 @@ static void onetimeSetup() throws Exception { void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/workflow/ForkTest.java b/transact/src/test/java/dev/dbos/transact/workflow/ForkTest.java index 6235b555..a25d198d 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/ForkTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/ForkTest.java @@ -188,7 +188,7 @@ void beforeEachTest() throws SQLException { dataSource = SystemDatabase.createDataSource(dbosConfig); DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); impl = new ForkTestServiceImpl(); proxy = DBOS.registerWorkflows(ForkTestService.class, impl); diff --git a/transact/src/test/java/dev/dbos/transact/workflow/GarbageCollectionTest.java b/transact/src/test/java/dev/dbos/transact/workflow/GarbageCollectionTest.java index 0da5056a..cdaca90c 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/GarbageCollectionTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/GarbageCollectionTest.java @@ -73,7 +73,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); impl = new GCTestServiceImpl(); proxy = DBOS.registerWorkflows(GCTestService.class, impl); diff --git a/transact/src/test/java/dev/dbos/transact/workflow/ListWorkflowsTest.java b/transact/src/test/java/dev/dbos/transact/workflow/ListWorkflowsTest.java index a9696957..61af0f88 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/ListWorkflowsTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/ListWorkflowsTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.*; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.utils.DBUtils; @@ -57,7 +58,7 @@ static void onetimeSetup() throws Exception { DBOSConfig.defaultsFromEnv("systemdbtest") .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys"); DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); DBOS.launch(); baseTime = System.currentTimeMillis(); populateWorkflowsStatic(); diff --git a/transact/src/test/java/dev/dbos/transact/workflow/QueueChildWorkflowTest.java b/transact/src/test/java/dev/dbos/transact/workflow/QueueChildWorkflowTest.java index 8dfa1b57..88dcc212 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/QueueChildWorkflowTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/QueueChildWorkflowTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.utils.DBUtils; @@ -32,7 +33,7 @@ static void onetimeSetup() throws Exception { void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/workflow/SyncWorkflowTest.java b/transact/src/test/java/dev/dbos/transact/workflow/SyncWorkflowTest.java index d4e7a57b..dc267257 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/SyncWorkflowTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/SyncWorkflowTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.*; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; import dev.dbos.transact.utils.DBUtils; @@ -30,7 +31,7 @@ static void onetimeSetup() throws Exception { void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/workflow/TimeoutTest.java b/transact/src/test/java/dev/dbos/transact/workflow/TimeoutTest.java index 9963a446..24fcfea6 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/TimeoutTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/TimeoutTest.java @@ -45,7 +45,7 @@ void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); dataSource = SystemDatabase.createDataSource(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/workflow/UnifiedProxyTest.java b/transact/src/test/java/dev/dbos/transact/workflow/UnifiedProxyTest.java index 7a254564..a7d5c6f4 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/UnifiedProxyTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/UnifiedProxyTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; @@ -33,7 +34,7 @@ static void onetimeSetup() throws Exception { void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); } @AfterEach diff --git a/transact/src/test/java/dev/dbos/transact/workflow/WorkflowMgmtTest.java b/transact/src/test/java/dev/dbos/transact/workflow/WorkflowMgmtTest.java index 790e365f..300382e7 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/WorkflowMgmtTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/WorkflowMgmtTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.*; import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; @@ -91,7 +92,7 @@ static void onetimeSetup() throws Exception { @BeforeEach void beforeEachTest() throws SQLException { DBUtils.recreateDB(dbosConfig); - DBOS.reinitialize(dbosConfig); + DBOSTestAccess.reinitialize(dbosConfig); impl = new MgmtServiceImpl(); proxy = DBOS.registerWorkflows(MgmtService.class, impl);