Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
926 changes: 509 additions & 417 deletions api/protos/history_events.pb.go

Large diffs are not rendered by default.

592 changes: 402 additions & 190 deletions api/protos/orchestrator_actions.pb.go

Large diffs are not rendered by default.

81 changes: 81 additions & 0 deletions backend/runtimestate/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,87 @@ func (a *Applier) Actions(s *protos.WorkflowRuntimeState, customStatus *wrappers
msg.PropagatedHistory = ph
}
s.PendingMessages = append(s.PendingMessages, msg)
} else if createDetached := action.GetCreateDetachedWorkflow(); createDetached != nil {
Comment thread
acroca marked this conversation as resolved.
// Detached workflows are fully decoupled from the caller: the
// spawned ExecutionStartedEvent carries no ParentInstanceInfo,
// so completion and failure do not flow back to the caller's
// history. Recursive purge / terminate enumerate only
// ChildWorkflowInstanceCreated events, so detached spawns are
// correctly excluded from those traversals.
if createDetached.InstanceId == "" {
return result, fmt.Errorf("CreateDetachedWorkflowAction requires an instance ID")
}

_ = AddEvent(s, &protos.HistoryEvent{
EventId: action.Id,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_DetachedWorkflowInstanceCreated{
DetachedWorkflowInstanceCreated: &protos.DetachedWorkflowInstanceCreatedEvent{
InstanceId: createDetached.InstanceId,
},
},
Router: action.Router,
})

// Mint an execution ID if the action did not supply one,
// matching the client ScheduleNewWorkflow behavior so the new
// instance always has a stable executionId on its start event.
executionID := createDetached.GetExecutionId()
if executionID == nil {
executionID = wrapperspb.String(uuid.New().String())
}

// Prefer the trace context the workflow author attached to
// the action; fall back to the caller's current trace context
// so a detached workflow scheduled without explicit tracing
// still inherits a parent span.
traceContext := createDetached.GetParentTraceContext()
if traceContext == nil {
traceContext = currentTraceContext
}

// Build a target-only router for the spawned StartEvent so the
// new instance carries no back-reference to the caller. The
// applier loop stamps action.Router.SourceAppID = a.appID, which
// is what CallChildWorkflow propagates to its child so completion
// can flow back. Detached workflows do not flow completion back,
// so dropping SourceAppID gives the spawned instance the same
// router shape a client-scheduled top-level workflow would have.
var spawnedRouter *protos.TaskRouter
if r := action.Router; r != nil && r.GetTargetAppID() != "" {
spawnedRouter = &protos.TaskRouter{
TargetAppID: ptr.Of(r.GetTargetAppID()),
}
if ns := r.GetTargetAppNamespace(); ns != "" {
spawnedRouter.TargetAppNamespace = ptr.Of(ns)
}
}

startEvent := &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_ExecutionStarted{
ExecutionStarted: &protos.ExecutionStartedEvent{
Name: createDetached.Name,
Input: createDetached.Input,
ScheduledStartTimestamp: createDetached.ScheduledStartTimestamp,
Tags: createDetached.Tags,
WorkflowInstance: &protos.WorkflowInstance{
InstanceId: createDetached.InstanceId,
ExecutionId: executionID,
},
ParentTraceContext: traceContext,
// ParentInstance is intentionally left nil: the
// detached workflow has no parent linkage.
},
},
Router: spawnedRouter,
}

s.PendingMessages = append(s.PendingMessages, &protos.WorkflowRuntimeStateMessage{
HistoryEvent: startEvent,
TargetInstanceId: createDetached.InstanceId,
})
} else if sendEvent := action.GetSendEvent(); sendEvent != nil {
e := &protos.HistoryEvent{
EventId: action.Id,
Expand Down
232 changes: 232 additions & 0 deletions task/detached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package task

import (
"fmt"
"strconv"
"time"

"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
)

// scheduleNewWorkflowOptions is a struct that holds the options for the
// ScheduleNewWorkflow workflow method. It mirrors the fields of
// CreateInstanceRequest so that a workflow author can spawn a fully
// decoupled instance with the same surface area as the client API.
type scheduleNewWorkflowOptions struct {
instanceID *string
rawInput *wrapperspb.StringValue
version *wrapperspb.StringValue
executionID *wrapperspb.StringValue
scheduledStartTimestamp *timestamppb.Timestamp
tags map[string]string
parentTraceContext *protos.TraceContext
targetAppID *string
targetAppNamespace *string
}

// DetachedWorkflowOptions is the interface for options passed to
// ScheduleNewWorkflow. Unlike CallChildWorkflow, the spawned instance is
// fire-and-forget: the caller receives the instance ID synchronously but
// does not wait on (and is not notified of) the spawned workflow's
// completion.
type DetachedWorkflowOptions interface {
applyDetachedWorkflowOption(*scheduleNewWorkflowOptions) error
}

// DetachedWorkflowOptionsFunc adapts a function to the
// DetachedWorkflowOptions interface.
type DetachedWorkflowOptionsFunc func(*scheduleNewWorkflowOptions) error

func (f DetachedWorkflowOptionsFunc) applyDetachedWorkflowOption(opts *scheduleNewWorkflowOptions) error {
return f(opts)
}

// WithDetachedWorkflowInstanceID sets the instance ID of the detached
// workflow. When omitted, ScheduleNewWorkflow generates a deterministic
// ID of the form "<callerInstanceID>-<n>" where n increments per
// default-ID spawn within the execution. The '-' separator keeps the
// generated ID safe for consumers (e.g. dapr) that propagate the
// instance id into RFC 1123 subdomain identifiers. Passing an empty
// string is rejected as an error: callers either set a non-empty ID or
// omit the option entirely to opt into the default.
func WithDetachedWorkflowInstanceID(instanceID string) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
opts.instanceID = &instanceID
return nil
}
}

// WithDetachedWorkflowInput sets the input for the detached workflow,
// marshalling it to JSON.
func WithDetachedWorkflowInput(input any) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
bytes, err := marshalData(input)
if err != nil {
return fmt.Errorf("failed to marshal input to JSON: %w", err)
}
opts.rawInput = wrapperspb.String(string(bytes))
return nil
}
}

// WithRawDetachedWorkflowInput sets a pre-marshalled raw input on the
// detached workflow.
func WithRawDetachedWorkflowInput(input *wrapperspb.StringValue) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
opts.rawInput = input
return nil
}
}

// WithDetachedWorkflowVersion sets a version label on the detached
// workflow. The semantics mirror CreateInstanceRequest.version.
func WithDetachedWorkflowVersion(version string) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
opts.version = wrapperspb.String(version)
return nil
}
}

// WithDetachedWorkflowExecutionID sets an explicit execution ID on the
// detached workflow's first execution. When unset, the runtime mints a
// fresh UUID, matching the client ScheduleNewWorkflow behavior. Passing
// an empty string is rejected as an error: callers either set a
// non-empty execution ID or omit the option to opt into UUID minting.
func WithDetachedWorkflowExecutionID(executionID string) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
if executionID == "" {
return fmt.Errorf("WithDetachedWorkflowExecutionID was passed an empty string; omit the option to let the runtime mint a UUID")
}
opts.executionID = wrapperspb.String(executionID)
return nil
}
}

// WithDetachedWorkflowStartTime defers the start of the detached workflow
// until the given time. Mirrors api.WithStartTime on the client API.
func WithDetachedWorkflowStartTime(startTime time.Time) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
opts.scheduledStartTimestamp = timestamppb.New(startTime)
return nil
}
}

// WithDetachedWorkflowTags sets the tag map on the detached workflow.
func WithDetachedWorkflowTags(tags map[string]string) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
opts.tags = tags
return nil
}
}

// WithDetachedWorkflowAppID specifies the app ID hosting the detached
// workflow. When set, the action carries a routing envelope so the
// dispatcher delivers the new instance to the target app.
func WithDetachedWorkflowAppID(appID string) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
opts.targetAppID = &appID
return nil
}
}

// WithDetachedWorkflowAppNamespace specifies the Dapr namespace hosting
// the detached workflow. Must be combined with WithDetachedWorkflowAppID;
// see WithChildWorkflowAppNamespace for full semantics.
func WithDetachedWorkflowAppNamespace(namespace string) DetachedWorkflowOptionsFunc {
return func(opts *scheduleNewWorkflowOptions) error {
opts.targetAppNamespace = &namespace
return nil
}
}

// ScheduleNewWorkflow schedules a new, fully decoupled workflow instance
// from the calling workflow. Unlike CallChildWorkflow, the spawned
// workflow has no parent linkage: its history's ExecutionStartedEvent
// carries no ParentInstanceInfo, completion and failure do not flow back
// to the caller, and the caller receives the new instance ID
// synchronously rather than an awaitable Task.
//
// If WithDetachedWorkflowInstanceID is not provided, a deterministic ID
// of the form "<callerInstanceID>-<n>" is generated, where n increments
// only for default-ID spawns within this execution (so the suffix
// reflects the order of those calls and is stable across replays). The
// '-' separator keeps the generated ID safe for consumers that
// propagate the instance id into RFC 1123 subdomain identifiers.
//
// If the workflow author needs the spawned workflow's result, they should
// model the dependency through external events (RaiseEvent / WaitForEvent)
// or shared state — there is no built-in completion channel.
func (ctx *WorkflowContext) ScheduleNewWorkflow(workflow any, opts ...DetachedWorkflowOptions) (api.InstanceID, error) {
options := new(scheduleNewWorkflowOptions)
for _, configure := range opts {
if err := configure.applyDetachedWorkflowOption(options); err != nil {
return api.EmptyInstanceID, err
}
}

if options.targetAppNamespace != nil && options.targetAppID == nil {
return api.EmptyInstanceID, fmt.Errorf("WithDetachedWorkflowAppNamespace requires WithDetachedWorkflowAppID to also be set")
}

var instanceID string
if options.instanceID == nil {
instanceID = string(ctx.ID) + "-" + strconv.FormatInt(int64(ctx.defaultDetachedWorkflowCounter), 10)
ctx.defaultDetachedWorkflowCounter++
} else if *options.instanceID == "" {
return api.EmptyInstanceID, fmt.Errorf("WithDetachedWorkflowInstanceID was passed an empty string; omit the option to opt into the default ID")
} else {
instanceID = *options.instanceID
}

workflowName := helpers.GetTaskFunctionName(workflow)

action := &protos.WorkflowAction{
Id: ctx.getNextSequenceNumber(),
WorkflowActionType: &protos.WorkflowAction_CreateDetachedWorkflow{
CreateDetachedWorkflow: &protos.CreateDetachedWorkflowAction{
InstanceId: instanceID,
Name: workflowName,
Version: options.version,
Input: options.rawInput,
ScheduledStartTimestamp: options.scheduledStartTimestamp,
ExecutionId: options.executionID,
Tags: options.tags,
ParentTraceContext: options.parentTraceContext,
},
},
}

if r := taskRouterFromTarget(options.targetAppID, options.targetAppNamespace); r != nil {
action.Router = r
}

ctx.pendingActions[action.Id] = action
return api.InstanceID(instanceID), nil
}

// onDetachedWorkflowCreated retires the pending CreateDetachedWorkflowAction
// at this sequence position. Detached workflows are fire-and-forget, so
// there is no associated pending Task to complete: the call returned the
// instance ID synchronously when the action was emitted.
func (ctx *WorkflowContext) onDetachedWorkflowCreated(taskID int32, dw *protos.DetachedWorkflowInstanceCreatedEvent) error {
a, ok := ctx.pendingActions[taskID]
if !ok || a.GetCreateDetachedWorkflow() == nil {
if ctx.dropOptionalExternalEventTimerAt(taskID) {
a, ok = ctx.pendingActions[taskID]
}
}
if !ok || a.GetCreateDetachedWorkflow() == nil {
return fmt.Errorf(
"a previous execution called ScheduleNewWorkflow for instance ID '%s' and sequence number %d at this point in the workflow logic, but the current execution doesn't have this action with this sequence number",
dw.InstanceId,
taskID,
)
}
delete(ctx.pendingActions, taskID)
return nil
}
Loading
Loading