Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion temporalio/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ group :development do
gem 'rbs', '~> 3.10'
gem 'rb_sys', '~> 0.9'
gem 'rdoc'
gem 'rubocop'
# TODO: Unpin when https://github.com/rubocop/rubocop/issues/14837 is fixed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive turnaround

Suggested change
# TODO: Unpin when https://github.com/rubocop/rubocop/issues/14837 is fixed
# TODO: Unpin when https://github.com/rubocop/rubocop/pull/14838 is released

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It broke everyone, they had to, heh

gem 'rubocop', '1.84.0'
gem 'sqlite3'
gem 'steep', '~> 1.10'
gem 'yard'
Expand Down
21 changes: 21 additions & 0 deletions temporalio/lib/temporalio/contrib/open_telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def _apply_context_to_headers(headers, context: ::OpenTelemetry::Context.current
headers[@header_key] = carrier unless carrier.empty?
end

# @!visibility private
def _propagator
@propagator
end

# @!visibility private
def _attach_context(headers)
context = _context_from_headers(headers)
Expand Down Expand Up @@ -402,6 +407,22 @@ def start_child_workflow(input)
super
end

# @!visibility private
def start_nexus_operation(input)
# Nexus headers are string-to-string maps (not payload-based like activity/workflow headers)
# so we inject the tracing context directly into the headers instead of nesting under a key
span = Workflow.completed_span("StartNexusOperation:#{input.service}/#{input.operation}", kind: :client)
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
if span
@root._propagator.inject(
input.headers,
context: ::OpenTelemetry::Trace.context_with_span(span)
)
end
end
super
end

# @!visibility private
def _apply_span_to_headers(headers, span)
# See WorkflowInbound#_attach_context comments for why we have to disable scheduler even for these simple
Expand Down
30 changes: 30 additions & 0 deletions temporalio/lib/temporalio/converters/failure_converter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ def to_failure(error, converter)
started_event_id: error.started_event_id,
retry_state: error.retry_state
)
when Error::NexusOperationError
failure.nexus_operation_execution_failure_info = Api::Failure::V1::NexusOperationFailureInfo.new(
endpoint: error.endpoint,
service: error.service,
operation: error.operation,
operation_token: error.operation_token || ''
)
when Error::NexusHandlerError
failure.nexus_handler_failure_info = Api::Failure::V1::NexusHandlerFailureInfo.new(
type: error.error_type.to_s,
retry_behavior: error.retry_behavior
)
else
failure.application_failure_info = Api::Failure::V1::ApplicationFailureInfo.new(
type: error.class.name.to_s.split('::').last
Expand Down Expand Up @@ -190,6 +202,24 @@ def from_failure(failure, converter)
zero_means_nil: true
)
)
elsif failure.nexus_operation_execution_failure_info
token = failure.nexus_operation_execution_failure_info.operation_token
Error::NexusOperationError.new(
Internal::ProtoUtils.string_or(failure.message, 'Nexus operation error'),
endpoint: failure.nexus_operation_execution_failure_info.endpoint,
service: failure.nexus_operation_execution_failure_info.service,
operation: failure.nexus_operation_execution_failure_info.operation,
operation_token: token.empty? ? nil : token
)
elsif failure.nexus_handler_failure_info
Error::NexusHandlerError.new(
Internal::ProtoUtils.string_or(failure.message, 'Nexus handler error'),
error_type: failure.nexus_handler_failure_info.type,
retry_behavior: Internal::ProtoUtils.enum_to_int(
Api::Enums::V1::NexusHandlerErrorRetryBehavior,
failure.nexus_handler_failure_info.retry_behavior
)
)
else
Error::Failure.new(Internal::ProtoUtils.string_or(failure.message, 'Failure error'))
end
Expand Down
66 changes: 66 additions & 0 deletions temporalio/lib/temporalio/error/failure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,71 @@ def initialize(
@retry_state = retry_state
end
end

# Error raised on Nexus operation failure.
#
# WARNING: Nexus support is experimental.
class NexusOperationError < Failure
# @return [String] Nexus endpoint.
attr_reader :endpoint
# @return [String] Nexus service.
attr_reader :service
# @return [String] Nexus operation.
attr_reader :operation
# @return [String, nil] Operation token for async operations.
attr_reader :operation_token

# @!visibility private
def initialize(
message,
endpoint:,
service:,
operation:,
operation_token:
)
super(message)
@endpoint = endpoint
@service = service
@operation = operation
@operation_token = operation_token
end
end

# Error raised from a Nexus handler.
#
# WARNING: Nexus support is experimental.
class NexusHandlerError < Failure
# @return [Symbol] Error type from the handler.
attr_reader :error_type

# @return [RetryBehavior] Retry behavior for the error.
attr_reader :retry_behavior

# @!visibility private
def initialize(
message,
error_type:,
retry_behavior:
)
super(message)
@error_type = error_type.to_sym
@retry_behavior = retry_behavior
end

# Nexus handler error retry behavior.
module RetryBehavior
# Unspecified retry behavior.
UNSPECIFIED =
Api::Enums::V1::NexusHandlerErrorRetryBehavior::NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED

# Retryable error.
RETRYABLE =
Api::Enums::V1::NexusHandlerErrorRetryBehavior::NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE

# Non-retryable error.
NON_RETRYABLE =
Api::Enums::V1::NexusHandlerErrorRetryBehavior::NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE
end
end
end
end
11 changes: 11 additions & 0 deletions temporalio/lib/temporalio/internal/worker/workflow_instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa

attr_reader :context, :logger, :info, :scheduler, :disable_eager_activity_execution, :pending_activities,
:pending_timers, :pending_child_workflow_starts, :pending_child_workflows,
:pending_nexus_operation_starts, :pending_nexus_operations,
:pending_external_signals, :pending_external_cancels, :in_progress_handlers, :payload_converter,
:failure_converter, :cancellation, :continue_as_new_suggested, :current_deployment_version,
:current_history_length, :current_history_size, :replaying, :random,
Expand All @@ -79,6 +80,8 @@ def initialize(details)
@pending_timers = {} # Keyed by sequence, value is fiber to resume with proto result
@pending_child_workflow_starts = {} # Keyed by sequence, value is fiber to resume with proto result
@pending_child_workflows = {} # Keyed by sequence, value is ChildWorkflowHandle to resolve with proto result
@pending_nexus_operation_starts = {} # Keyed by sequence, value is fiber to resume with proto result
@pending_nexus_operations = {} # Keyed by sequence, value is NexusOperationHandle to resolve with proto result
@pending_external_signals = {} # Keyed by sequence, value is fiber to resume with proto result
@pending_external_cancels = {} # Keyed by sequence, value is fiber to resume with proto result
@buffered_signals = {} # Keyed by signal name, value is array of signal jobs
Expand Down Expand Up @@ -369,6 +372,14 @@ def apply(job)
pending_child_workflows[job.resolve_child_workflow_execution.seq]&._resolve(
job.resolve_child_workflow_execution.result
)
when :resolve_nexus_operation_start
pending_nexus_operation_starts[job.resolve_nexus_operation_start.seq]&.resume(
job.resolve_nexus_operation_start
)
when :resolve_nexus_operation
pending_nexus_operations[job.resolve_nexus_operation.seq]&._resolve(
job.resolve_nexus_operation.result
)
when :resolve_signal_external_workflow
pending_external_signals[job.resolve_signal_external_workflow.seq]&.resume(
job.resolve_signal_external_workflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'temporalio/internal/proto_utils'
require 'temporalio/internal/worker/workflow_instance'
require 'temporalio/internal/worker/workflow_instance/external_workflow_handle'
require 'temporalio/internal/worker/workflow_instance/nexus_client'
require 'temporalio/worker/interceptor'
require 'temporalio/workflow'

Expand All @@ -32,6 +33,10 @@ def continue_as_new_suggested
@instance.continue_as_new_suggested
end

def create_nexus_client(endpoint:, service:)
NexusClient.new(endpoint:, service:, outbound: @outbound)
end

def current_details
@instance.current_details || ''
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

require 'temporalio/workflow'
require 'temporalio/workflow/nexus_client'

module Temporalio
module Internal
module Worker
class WorkflowInstance
# Implementation of the Nexus client.
class NexusClient < Workflow::NexusClient
attr_reader :endpoint, :service

def initialize(endpoint:, service:, outbound:) # rubocop:disable Lint/MissingSuper
@endpoint = endpoint.to_s
@service = service.to_s
@outbound = outbound
end

def start_operation(operation, arg, schedule_to_close_timeout: nil, cancellation_type: nil, summary: nil,
cancellation: Workflow.cancellation, arg_hint: nil, result_hint: nil)
@outbound.start_nexus_operation(
Temporalio::Worker::Interceptor::Workflow::StartNexusOperationInput.new(
endpoint: @endpoint,
service: @service,
operation: operation.to_s,
arg:,
schedule_to_close_timeout:,
cancellation_type:,
summary:,
cancellation:,
arg_hint:,
result_hint:,
headers: {}
)
)
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

require 'temporalio/cancellation'
require 'temporalio/workflow'
require 'temporalio/workflow/nexus_operation_handle'

module Temporalio
module Internal
module Worker
class WorkflowInstance
# Implementation of the Nexus operation handle.
class NexusOperationHandle < Workflow::NexusOperationHandle
attr_reader :operation_token, :result_hint

def initialize(operation_token:, instance:, cancellation:, cancel_callback_key:, # rubocop:disable Lint/MissingSuper
result_hint:)
@operation_token = operation_token
@instance = instance
@cancellation = cancellation
@cancel_callback_key = cancel_callback_key
@result_hint = result_hint
@resolution = nil
end

def result(result_hint: nil)
# Use detached cancellation like child workflow to avoid interrupting result wait
Workflow.wait_condition(cancellation: Cancellation.new) { @resolution }

case @resolution.status
when :completed
@instance.payload_converter.from_payload(@resolution.completed, hint: result_hint || @result_hint)
when :failed
raise @instance.failure_converter.from_failure(@resolution.failed, @instance.payload_converter)
when :cancelled
raise @instance.failure_converter.from_failure(@resolution.cancelled, @instance.payload_converter)
when :timed_out
raise @instance.failure_converter.from_failure(@resolution.timed_out, @instance.payload_converter)
else
raise "Unrecognized Nexus operation result status: #{@resolution.status}"
end
end

def _resolve(resolution)
@cancellation.remove_cancel_callback(@cancel_callback_key)
@resolution = resolution
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'temporalio/internal/bridge/api'
require 'temporalio/internal/proto_utils'
require 'temporalio/internal/worker/workflow_instance'
require 'temporalio/internal/worker/workflow_instance/nexus_operation_handle'
require 'temporalio/worker/interceptor'
require 'temporalio/workflow'
require 'temporalio/workflow/child_workflow_handle'
Expand All @@ -22,6 +23,7 @@ def initialize(instance)
@activity_counter = 0
@timer_counter = 0
@child_counter = 0
@nexus_operation_counter = 0
@external_signal_counter = 0
@external_cancel_counter = 0
end
Expand Down Expand Up @@ -429,6 +431,72 @@ def start_child_workflow(input)
raise "Unknown resolution status: #{resolution.status}"
end
end

def start_nexus_operation(input)
raise Error::CanceledError, 'Nexus operation canceled before scheduled' if input.cancellation.canceled?

# Add the command
seq = (@nexus_operation_counter += 1)
@instance.add_command(
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
schedule_nexus_operation: Bridge::Api::WorkflowCommands::ScheduleNexusOperation.new(
seq:,
endpoint: input.endpoint,
service: input.service,
operation: input.operation,
input: @instance.payload_converter.to_payload(input.arg, hint: input.arg_hint),
schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout),
nexus_header: input.headers,
cancellation_type: input.cancellation_type
),
user_metadata: ProtoUtils.to_user_metadata(input.summary, nil, @instance.payload_converter)
)
)

# Set as pending start
@instance.pending_nexus_operation_starts[seq] = Fiber.current

# Register cancel callback
cancel_callback_key = input.cancellation.add_cancel_callback do
# Send cancel if in start or pending
if @instance.pending_nexus_operation_starts.include?(seq) ||
@instance.pending_nexus_operations.include?(seq)
@instance.add_command(
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
request_cancel_nexus_operation: Bridge::Api::WorkflowCommands::RequestCancelNexusOperation.new(
seq:
)
)
)
end
end

# Wait for start resolution
resolution = begin
Fiber.yield
ensure
# Remove pending start
@instance.pending_nexus_operation_starts.delete(seq)
end

# Handle start failure
if resolution.failed
input.cancellation.remove_cancel_callback(cancel_callback_key)
raise @instance.failure_converter.from_failure(resolution.failed, @instance.payload_converter)
end

# Create handle and add to pending operations (result will come via resolve_nexus_operation)
handle = NexusOperationHandle.new(
operation_token: resolution.operation_token,
instance: @instance,
cancellation: input.cancellation,
cancel_callback_key:,
result_hint: input.result_hint
)
@instance.pending_nexus_operations[seq] = handle

handle
end
end
end
end
Expand Down
Loading
Loading