Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ func createGetInstanceResponse(req *protos.GetInstanceRequest, metadata *Workflo
CreatedTimestamp: metadata.CreatedAt,
LastUpdatedTimestamp: metadata.LastUpdatedAt,
Version: metadata.Version,
StartedAt: metadata.StartedAt,
}

if metadata.ParentInstanceId != "" {
Expand Down
44 changes: 43 additions & 1 deletion backend/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,11 +561,19 @@ func (be *postgresBackend) CreateWorkflowInstance(ctx context.Context, e *backen
return err
}

// Honour ScheduledStartTimestamp by deferring the start event's
// visibility. NULL VisibleTime means immediately visible.
var visibleTime any
if ts := e.GetExecutionStarted().GetScheduledStartTimestamp(); ts != nil {
visibleTime = ts.AsTime()
}

_, err = tx.Exec(
ctx,
`INSERT INTO NewEvents (InstanceID, EventPayload) VALUES ($1, $2)`,
`INSERT INTO NewEvents (InstanceID, EventPayload, VisibleTime) VALUES ($1, $2, $3)`,
instanceID,
eventPayload,
visibleTime,
)

if err != nil {
Expand Down Expand Up @@ -787,6 +795,11 @@ func (be *postgresBackend) GetWorkflowMetadata(ctx context.Context, iid api.Inst
versionw = wrapperspb.String(*version)
}

startedAt, err := be.getStartedAt(ctx, iid)
if err != nil {
return nil, err
}

startEvent, err := be.getStartEvent(ctx, iid)
if err != nil {
return nil, err
Expand Down Expand Up @@ -814,6 +827,7 @@ func (be *postgresBackend) GetWorkflowMetadata(ctx context.Context, iid api.Inst
Version: versionw,
ParentInstanceId: parentInstanceID,
ParentAppId: parentAppIDw,
StartedAt: startedAt,
}, nil
}

Expand Down Expand Up @@ -843,6 +857,34 @@ func (be *postgresBackend) getStartEvent(ctx context.Context, iid api.InstanceID
return e.GetExecutionStarted(), nil
}

// getStartedAt returns the timestamp of the first history event for the
// instance, or nil if the workflow has not yet been picked up by a worker
// (History is empty).
//
// In History, row 0 is the WorkflowStartedEvent injected by the engine in
// workflowProcessor.applyWorkItem; its Timestamp is the moment the worker
// first picked the workflow up — distinct from the ExecutionStartedEvent's
// creation timestamp.
func (be *postgresBackend) getStartedAt(ctx context.Context, iid api.InstanceID) (*timestamppb.Timestamp, error) {
var payload []byte
err := be.db.QueryRow(
ctx,
"SELECT EventPayload FROM History WHERE InstanceID = $1 ORDER BY SequenceNumber ASC LIMIT 1",
iid,
).Scan(&payload)
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to query first history event: %w", err)
}
e, err := backend.UnmarshalHistoryEvent(payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal first history event: %w", err)
}
return e.GetTimestamp(), nil
}

// GetWorkflowRuntimeState implements backend.Backend
func (be *postgresBackend) GetWorkflowRuntimeState(ctx context.Context, wi *backend.WorkflowWorkItem) (*backend.WorkflowRuntimeState, error) {
if err := be.ensureDB(); err != nil {
Expand Down
66 changes: 66 additions & 0 deletions backend/runtimestate/runtimestate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runtimestate

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -414,3 +415,68 @@ func TestAddEvent_NewOrchestrationRuntimeStateDropsHistoryDuplicates(t *testing.
assert.True(t, dedup.IsPresent(s.OldEvents, dedup.KindTask, 1))
assert.True(t, dedup.IsPresent(s.OldEvents, dedup.KindTask, 2))
}

func TestGetStartedTime(t *testing.T) {
t.Parallel()

// Mirrors what the engine produces in applyWorkItem: a WorkflowStartedEvent
// is appended first (timestamp = first-execution time), then the work
// item's own events (the original ExecutionStartedEvent at creation time).
firstRun := timestamppb.New(timestamppb.Now().AsTime())
creation := timestamppb.New(firstRun.AsTime().Add(-time.Second))
history := []*protos.HistoryEvent{
Comment thread
pablochacin marked this conversation as resolved.
{
EventId: -1,
Timestamp: firstRun,
EventType: &protos.HistoryEvent_WorkflowStarted{
WorkflowStarted: &protos.WorkflowStartedEvent{},
},
},
{
EventId: -1,
Timestamp: creation,
EventType: &protos.HistoryEvent_ExecutionStarted{
ExecutionStarted: &protos.ExecutionStartedEvent{Name: "wf"},
},
},
}
s := NewWorkflowRuntimeState("abc", nil, history)

got := GetStartedTime(s)
require.False(t, got.IsZero())
assert.Equal(t, firstRun.AsTime(), got, "must return first event's timestamp, not the ExecutionStarted's")
}

func TestGetStartedTime_EmptyState(t *testing.T) {
t.Parallel()

// Workflow created but not yet executed -> no events in either OldEvents
// or NewEvents. The dapr orchestrator's `if !t.IsZero()` guard depends on
// this returning a zero time so StartedAt stays nil.
s := NewWorkflowRuntimeState("abc", nil, nil)

got := GetStartedTime(s)
assert.True(t, got.IsZero(), "empty rstate must return zero time")
}

func TestGetStartedTime_NewEventsOnly(t *testing.T) {
t.Parallel()

// Mid-processing: events have been added via AddEvent but not yet moved
// to OldEvents (which happens when the runtime state is reloaded after a
// save). GetStartedTime must still return the first event's timestamp.
ts := timestamppb.Now()
s := NewWorkflowRuntimeState("abc", nil, nil)
require.NoError(t, AddEvent(s, &protos.HistoryEvent{
EventId: -1,
Timestamp: ts,
EventType: &protos.HistoryEvent_WorkflowStarted{
WorkflowStarted: &protos.WorkflowStartedEvent{},
},
}))
require.Empty(t, s.OldEvents)
require.Len(t, s.NewEvents, 1)

got := GetStartedTime(s)
assert.Equal(t, ts.AsTime(), got)
}
45 changes: 44 additions & 1 deletion backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,19 @@ func (be *sqliteBackend) CreateWorkflowInstance(ctx context.Context, e *backend.
return err
}

// Honour ScheduledStartTimestamp by deferring the start event's
// visibility. NULL VisibleTime means immediately visible.
var visibleTime any
if ts := e.GetExecutionStarted().GetScheduledStartTimestamp(); ts != nil {
visibleTime = ts.AsTime()
}

_, err = tx.ExecContext(
ctx,
`INSERT INTO NewEvents ([InstanceID], [EventPayload]) VALUES (?, ?)`,
`INSERT INTO NewEvents ([InstanceID], [EventPayload], [VisibleTime]) VALUES (?, ?, ?)`,
instanceID,
eventPayload,
visibleTime,
)

if err != nil {
Expand Down Expand Up @@ -695,6 +703,11 @@ func (be *sqliteBackend) GetWorkflowMetadata(ctx context.Context, iid api.Instan
}
}

startedAt, err := be.getStartedAt(ctx, iid)
if err != nil {
return nil, err
}


startEvent, err := be.getStartEvent(ctx, iid)
if err != nil {
Expand Down Expand Up @@ -723,8 +736,10 @@ func (be *sqliteBackend) GetWorkflowMetadata(ctx context.Context, iid api.Instan
Version: versionw,
ParentInstanceId: parentInstanceID,
ParentAppId: parentAppIDw,
StartedAt: startedAt,
}, nil
}

// getStartEvent loads the ExecutionStarted event for an instance, or nil
// if the workflow has not yet been picked up by a worker.
//
Expand All @@ -751,6 +766,34 @@ func (be *sqliteBackend) getStartEvent(ctx context.Context, iid api.InstanceID)
return e.GetExecutionStarted(), nil
}

// getStartedAt returns the timestamp of the first history event for the
// instance, or nil if the workflow has not yet been picked up by a worker
// (History is empty).
//
// In History, row 0 is the WorkflowStartedEvent injected by the engine in
// workflowProcessor.applyWorkItem; its Timestamp is the moment the worker
// first picked the workflow up — distinct from the ExecutionStartedEvent's
// creation timestamp.
func (be *sqliteBackend) getStartedAt(ctx context.Context, iid api.InstanceID) (*timestamppb.Timestamp, error) {
var payload []byte
err := be.db.QueryRowContext(
ctx,
"SELECT [EventPayload] FROM History WHERE [InstanceID] = ? ORDER BY [SequenceNumber] ASC LIMIT 1",
iid,
).Scan(&payload)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to query first history event: %w", err)
}
e, err := backend.UnmarshalHistoryEvent(payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal first history event: %w", err)
}
return e.GetTimestamp(), nil
}

// GetWorkflowRuntimeState implements backend.Backend
func (be *sqliteBackend) GetWorkflowRuntimeState(ctx context.Context, wi *backend.WorkflowWorkItem) (*backend.WorkflowRuntimeState, error) {
if err := be.ensureDB(); err != nil {
Expand Down
21 changes: 11 additions & 10 deletions client/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,17 @@ func makeWorkflowMetadata(resp *protos.GetInstanceResponse) (*backend.WorkflowMe
return nil, fmt.Errorf("workflow state is nil")
}
metadata := &backend.WorkflowMetadata{
InstanceId: resp.WorkflowState.InstanceId,
Name: resp.WorkflowState.Name,
RuntimeStatus: resp.WorkflowState.WorkflowStatus,
Input: resp.WorkflowState.Input,
CustomStatus: resp.WorkflowState.CustomStatus,
Output: resp.WorkflowState.Output,
CreatedAt: resp.WorkflowState.CreatedTimestamp,
LastUpdatedAt: resp.WorkflowState.LastUpdatedTimestamp,
FailureDetails: resp.WorkflowState.FailureDetails,
Version: resp.WorkflowState.Version,
InstanceId: resp.GetWorkflowState().GetInstanceId(),
Name: resp.GetWorkflowState().GetName(),
RuntimeStatus: resp.GetWorkflowState().GetWorkflowStatus(),
Input: resp.GetWorkflowState().GetInput(),
CustomStatus: resp.GetWorkflowState().GetCustomStatus(),
Output: resp.GetWorkflowState().GetOutput(),
CreatedAt: resp.GetWorkflowState().GetCreatedTimestamp(),
LastUpdatedAt: resp.GetWorkflowState().GetLastUpdatedTimestamp(),
FailureDetails: resp.GetWorkflowState().GetFailureDetails(),
Version: resp.GetWorkflowState().GetVersion(),
StartedAt: resp.GetWorkflowState().GetStartedAt(),
}
return metadata, nil
}
83 changes: 73 additions & 10 deletions tests/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,66 @@ func Test_GetWorkflowMetadata_NoParent(t *testing.T) {
}
}


func Test_GetWorkflowMetadata_StartedAt(t *testing.T) {
for i, be := range backends {
initTest(t, be, i, true)

const iid = "startedat-instance"

// ExecutionStartedEvent's timestamp — captured before CreateWorkflowInstance
// so we can later prove StartedAt is strictly later than this value
// (row-ordering check; see assertions below).
startTS := time.Now().UTC().Truncate(time.Microsecond)
e := &protos.HistoryEvent{
Timestamp: timestamppb.New(startTS),
EventType: &protos.HistoryEvent_ExecutionStarted{
ExecutionStarted: &protos.ExecutionStartedEvent{
Name: defaultName,
WorkflowInstance: &protos.WorkflowInstance{InstanceId: iid},
Input: wrapperspb.String(defaultInput),
},
},
}
if !assert.NoError(t, be.CreateWorkflowInstance(ctx, e)) {
continue
}

// Pre-execution: instance row exists but History is empty.
// getStartedAt must hit the no-rows branch and StartedAt stays nil.
md, err := be.GetWorkflowMetadata(ctx, api.InstanceID(iid))
if assert.NoError(t, err) {
assert.Nil(t, md.StartedAt, "StartedAt should be nil before the first work item is processed")
}

// Drive the work item using the shared harness, which mirrors
// workflowProcessor.applyWorkItem by prepending a WorkflowStartedEvent
// to NewEvents. After this, History row 0 = WorkflowStarted (timestamp
// captured inside the helper), row 1 = ExecutionStarted (startTS).
beforeProcess := time.Now().UTC()
if !processFirstWorkItem(t, be, iid) {
continue
}
afterProcess := time.Now().UTC()


// after processing the work item startAt should return a non-nil value not earlier than the time the
// work item was processed and not earlier than the start time
md, err = be.GetWorkflowMetadata(ctx, api.InstanceID(iid))
if assert.NoError(t, err) {
if assert.NotNil(t, md.StartedAt, "StartedAt must be populated once History has a row") {
started := md.StartedAt.AsTime()
assert.False(t, started.Before(beforeProcess),
"StartedAt %v should be >= %v (start of work-item processing)", started, beforeProcess)
assert.False(t, started.After(afterProcess),
"StartedAt %v should be <= %v (end of work-item processing)", started, afterProcess)
assert.False(t, started.Before(startTS),
"StartedAt %v should be >= %v (ExecutionStarted timestamp)", started, startTS)
}
}
}
}

func Test_PurgeWorkflowState(t *testing.T) {
for i, be := range backends {
initTest(t, be, i, true)
Expand Down Expand Up @@ -586,19 +646,12 @@ func getWorkflowRuntimeState(t assert.TestingT, be backend.Backend, wi *backend.
return nil, false
}

func getWorkflowMetadata(t assert.TestingT, be backend.Backend, iid api.InstanceID) (*backend.WorkflowMetadata, bool) {
metadata, err := be.GetWorkflowMetadata(ctx, iid)
if assert.NoError(t, err) && assert.NotNil(t, metadata) {
return metadata, assert.Equal(t, iid, api.InstanceID(metadata.InstanceId))
}

return nil, false
}

// processFirstWorkItem fetches the first work item for the given instance,
// prepends a WorkflowStartedEvent to NewEvents before the work item's own events,
// applies its NewEvents to the runtime state, and completes the work item
// without producing any additional workflow actions.
// without producing any additional workflow actions. This mirrors what
// workflowProcessor.applyWorkItem does in production, so the persisted
// History layout matches: row 0 = WorkflowStarted, row 1 = ExecutionStarted.
func processFirstWorkItem(t assert.TestingT, be backend.Backend, instanceID string) bool {
wi, ok := getWorkflowWorkItem(t, be, instanceID)
if !ok {
Expand All @@ -621,3 +674,13 @@ func processFirstWorkItem(t assert.TestingT, be backend.Backend, instanceID stri
wi.State = state
return assert.NoError(t, be.CompleteWorkflowWorkItem(ctx, wi))
}

func getWorkflowMetadata(t assert.TestingT, be backend.Backend, iid api.InstanceID) (*backend.WorkflowMetadata, bool) {
metadata, err := be.GetWorkflowMetadata(ctx, iid)
if assert.NoError(t, err) && assert.NotNil(t, metadata) {
return metadata, assert.Equal(t, iid, api.InstanceID(metadata.InstanceId))
}

return nil, false
}

Loading
Loading