From a765f1b37fdbfeb0c6c2c23c2fe4b957761cfe67 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 30 Jan 2026 11:09:00 -0600 Subject: [PATCH] Nexus support --- temporalio/Gemfile | 3 +- .../lib/temporalio/contrib/open_telemetry.rb | 21 + .../converters/failure_converter.rb | 30 ++ temporalio/lib/temporalio/error/failure.rb | 66 +++ .../internal/worker/workflow_instance.rb | 11 + .../worker/workflow_instance/context.rb | 5 + .../worker/workflow_instance/nexus_client.rb | 42 ++ .../nexus_operation_handle.rb | 51 ++ .../outbound_implementation.rb | 68 +++ .../testing/workflow_environment.rb | 40 ++ .../lib/temporalio/worker/interceptor.rb | 27 + temporalio/lib/temporalio/workflow.rb | 14 + .../lib/temporalio/workflow/nexus_client.rb | 81 +++ .../nexus_operation_cancellation_type.rb | 21 + .../workflow/nexus_operation_handle.rb | 37 ++ temporalio/sig/temporalio/error/failure.rbs | 34 ++ .../internal/worker/workflow_instance.rbs | 2 + .../worker/workflow_instance/context.rbs | 2 + .../worker/workflow_instance/nexus_client.rbs | 25 + .../nexus_operation_handle.rbs | 19 + .../testing/workflow_environment.rbs | 4 + .../sig/temporalio/worker/interceptor.rbs | 30 ++ temporalio/sig/temporalio/workflow.rbs | 2 + .../sig/temporalio/workflow/nexus_client.rbs | 30 ++ .../nexus_operation_cancellation_type.rbs | 12 + .../workflow/nexus_operation_handle.rbs | 10 + temporalio/test/golangworker/main.go | 64 +++ temporalio/test/sig/test.rbs | 3 +- temporalio/test/sig/workflow_utils.rbs | 2 +- temporalio/test/test.rb | 10 +- temporalio/test/worker_workflow_nexus_test.rb | 476 ++++++++++++++++++ 31 files changed, 1238 insertions(+), 4 deletions(-) create mode 100644 temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb create mode 100644 temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_operation_handle.rb create mode 100644 temporalio/lib/temporalio/workflow/nexus_client.rb create mode 100644 temporalio/lib/temporalio/workflow/nexus_operation_cancellation_type.rb create mode 100644 temporalio/lib/temporalio/workflow/nexus_operation_handle.rb create mode 100644 temporalio/sig/temporalio/internal/worker/workflow_instance/nexus_client.rbs create mode 100644 temporalio/sig/temporalio/internal/worker/workflow_instance/nexus_operation_handle.rbs create mode 100644 temporalio/sig/temporalio/workflow/nexus_client.rbs create mode 100644 temporalio/sig/temporalio/workflow/nexus_operation_cancellation_type.rbs create mode 100644 temporalio/sig/temporalio/workflow/nexus_operation_handle.rbs create mode 100644 temporalio/test/worker_workflow_nexus_test.rb diff --git a/temporalio/Gemfile b/temporalio/Gemfile index aac001d8..d7edc4d9 100644 --- a/temporalio/Gemfile +++ b/temporalio/Gemfile @@ -25,7 +25,8 @@ group :development do gem 'rbs', '~> 3.10' gem 'rb_sys', '~> 0.9' gem 'rdoc' - gem 'rubocop' + # TODO: Unpin when https://github.com/rubocop/rubocop/issues/14837 is fixed + gem 'rubocop', '1.84.0' gem 'sqlite3' gem 'steep', '~> 1.10' gem 'yard' diff --git a/temporalio/lib/temporalio/contrib/open_telemetry.rb b/temporalio/lib/temporalio/contrib/open_telemetry.rb index 77b03439..89dde93c 100644 --- a/temporalio/lib/temporalio/contrib/open_telemetry.rb +++ b/temporalio/lib/temporalio/contrib/open_telemetry.rb @@ -67,6 +67,11 @@ def _apply_context_to_headers(headers, context: ::OpenTelemetry::Context.current headers[@header_key] = carrier unless carrier.empty? end + # @!visibility private + def _propagator + @propagator + end + # @!visibility private def _attach_context(headers) context = _context_from_headers(headers) @@ -402,6 +407,22 @@ def start_child_workflow(input) super end + # @!visibility private + def start_nexus_operation(input) + # Nexus headers are string-to-string maps (not payload-based like activity/workflow headers) + # so we inject the tracing context directly into the headers instead of nesting under a key + span = Workflow.completed_span("StartNexusOperation:#{input.service}/#{input.operation}", kind: :client) + Temporalio::Workflow::Unsafe.durable_scheduler_disabled do + if span + @root._propagator.inject( + input.headers, + context: ::OpenTelemetry::Trace.context_with_span(span) + ) + end + end + super + end + # @!visibility private def _apply_span_to_headers(headers, span) # See WorkflowInbound#_attach_context comments for why we have to disable scheduler even for these simple diff --git a/temporalio/lib/temporalio/converters/failure_converter.rb b/temporalio/lib/temporalio/converters/failure_converter.rb index 7d24b694..3332f217 100644 --- a/temporalio/lib/temporalio/converters/failure_converter.rb +++ b/temporalio/lib/temporalio/converters/failure_converter.rb @@ -84,6 +84,18 @@ def to_failure(error, converter) started_event_id: error.started_event_id, retry_state: error.retry_state ) + when Error::NexusOperationError + failure.nexus_operation_execution_failure_info = Api::Failure::V1::NexusOperationFailureInfo.new( + endpoint: error.endpoint, + service: error.service, + operation: error.operation, + operation_token: error.operation_token || '' + ) + when Error::NexusHandlerError + failure.nexus_handler_failure_info = Api::Failure::V1::NexusHandlerFailureInfo.new( + type: error.error_type.to_s, + retry_behavior: error.retry_behavior + ) else failure.application_failure_info = Api::Failure::V1::ApplicationFailureInfo.new( type: error.class.name.to_s.split('::').last @@ -190,6 +202,24 @@ def from_failure(failure, converter) zero_means_nil: true ) ) + elsif failure.nexus_operation_execution_failure_info + token = failure.nexus_operation_execution_failure_info.operation_token + Error::NexusOperationError.new( + Internal::ProtoUtils.string_or(failure.message, 'Nexus operation error'), + endpoint: failure.nexus_operation_execution_failure_info.endpoint, + service: failure.nexus_operation_execution_failure_info.service, + operation: failure.nexus_operation_execution_failure_info.operation, + operation_token: token.empty? ? nil : token + ) + elsif failure.nexus_handler_failure_info + Error::NexusHandlerError.new( + Internal::ProtoUtils.string_or(failure.message, 'Nexus handler error'), + error_type: failure.nexus_handler_failure_info.type, + retry_behavior: Internal::ProtoUtils.enum_to_int( + Api::Enums::V1::NexusHandlerErrorRetryBehavior, + failure.nexus_handler_failure_info.retry_behavior + ) + ) else Error::Failure.new(Internal::ProtoUtils.string_or(failure.message, 'Failure error')) end diff --git a/temporalio/lib/temporalio/error/failure.rb b/temporalio/lib/temporalio/error/failure.rb index fc69bd74..6edc5cd5 100644 --- a/temporalio/lib/temporalio/error/failure.rb +++ b/temporalio/lib/temporalio/error/failure.rb @@ -233,5 +233,71 @@ def initialize( @retry_state = retry_state end end + + # Error raised on Nexus operation failure. + # + # WARNING: Nexus support is experimental. + class NexusOperationError < Failure + # @return [String] Nexus endpoint. + attr_reader :endpoint + # @return [String] Nexus service. + attr_reader :service + # @return [String] Nexus operation. + attr_reader :operation + # @return [String, nil] Operation token for async operations. + attr_reader :operation_token + + # @!visibility private + def initialize( + message, + endpoint:, + service:, + operation:, + operation_token: + ) + super(message) + @endpoint = endpoint + @service = service + @operation = operation + @operation_token = operation_token + end + end + + # Error raised from a Nexus handler. + # + # WARNING: Nexus support is experimental. + class NexusHandlerError < Failure + # @return [Symbol] Error type from the handler. + attr_reader :error_type + + # @return [RetryBehavior] Retry behavior for the error. + attr_reader :retry_behavior + + # @!visibility private + def initialize( + message, + error_type:, + retry_behavior: + ) + super(message) + @error_type = error_type.to_sym + @retry_behavior = retry_behavior + end + + # Nexus handler error retry behavior. + module RetryBehavior + # Unspecified retry behavior. + UNSPECIFIED = + Api::Enums::V1::NexusHandlerErrorRetryBehavior::NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED + + # Retryable error. + RETRYABLE = + Api::Enums::V1::NexusHandlerErrorRetryBehavior::NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE + + # Non-retryable error. + NON_RETRYABLE = + Api::Enums::V1::NexusHandlerErrorRetryBehavior::NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE + end + end end end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb index 192c8d49..adb1a637 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb @@ -54,6 +54,7 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa attr_reader :context, :logger, :info, :scheduler, :disable_eager_activity_execution, :pending_activities, :pending_timers, :pending_child_workflow_starts, :pending_child_workflows, + :pending_nexus_operation_starts, :pending_nexus_operations, :pending_external_signals, :pending_external_cancels, :in_progress_handlers, :payload_converter, :failure_converter, :cancellation, :continue_as_new_suggested, :current_deployment_version, :current_history_length, :current_history_size, :replaying, :random, @@ -79,6 +80,8 @@ def initialize(details) @pending_timers = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflow_starts = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflows = {} # Keyed by sequence, value is ChildWorkflowHandle to resolve with proto result + @pending_nexus_operation_starts = {} # Keyed by sequence, value is fiber to resume with proto result + @pending_nexus_operations = {} # Keyed by sequence, value is NexusOperationHandle to resolve with proto result @pending_external_signals = {} # Keyed by sequence, value is fiber to resume with proto result @pending_external_cancels = {} # Keyed by sequence, value is fiber to resume with proto result @buffered_signals = {} # Keyed by signal name, value is array of signal jobs @@ -369,6 +372,14 @@ def apply(job) pending_child_workflows[job.resolve_child_workflow_execution.seq]&._resolve( job.resolve_child_workflow_execution.result ) + when :resolve_nexus_operation_start + pending_nexus_operation_starts[job.resolve_nexus_operation_start.seq]&.resume( + job.resolve_nexus_operation_start + ) + when :resolve_nexus_operation + pending_nexus_operations[job.resolve_nexus_operation.seq]&._resolve( + job.resolve_nexus_operation.result + ) when :resolve_signal_external_workflow pending_external_signals[job.resolve_signal_external_workflow.seq]&.resume( job.resolve_signal_external_workflow diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb index e0b391dd..b08dafb1 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb @@ -6,6 +6,7 @@ require 'temporalio/internal/proto_utils' require 'temporalio/internal/worker/workflow_instance' require 'temporalio/internal/worker/workflow_instance/external_workflow_handle' +require 'temporalio/internal/worker/workflow_instance/nexus_client' require 'temporalio/worker/interceptor' require 'temporalio/workflow' @@ -32,6 +33,10 @@ def continue_as_new_suggested @instance.continue_as_new_suggested end + def create_nexus_client(endpoint:, service:) + NexusClient.new(endpoint:, service:, outbound: @outbound) + end + def current_details @instance.current_details || '' end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb new file mode 100644 index 00000000..873dded9 --- /dev/null +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require 'temporalio/workflow' +require 'temporalio/workflow/nexus_client' + +module Temporalio + module Internal + module Worker + class WorkflowInstance + # Implementation of the Nexus client. + class NexusClient < Workflow::NexusClient + attr_reader :endpoint, :service + + def initialize(endpoint:, service:, outbound:) # rubocop:disable Lint/MissingSuper + @endpoint = endpoint.to_s + @service = service.to_s + @outbound = outbound + end + + def start_operation(operation, arg, schedule_to_close_timeout: nil, cancellation_type: nil, summary: nil, + cancellation: Workflow.cancellation, arg_hint: nil, result_hint: nil) + @outbound.start_nexus_operation( + Temporalio::Worker::Interceptor::Workflow::StartNexusOperationInput.new( + endpoint: @endpoint, + service: @service, + operation: operation.to_s, + arg:, + schedule_to_close_timeout:, + cancellation_type:, + summary:, + cancellation:, + arg_hint:, + result_hint:, + headers: {} + ) + ) + end + end + end + end + end +end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_operation_handle.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_operation_handle.rb new file mode 100644 index 00000000..84750aa0 --- /dev/null +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_operation_handle.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require 'temporalio/cancellation' +require 'temporalio/workflow' +require 'temporalio/workflow/nexus_operation_handle' + +module Temporalio + module Internal + module Worker + class WorkflowInstance + # Implementation of the Nexus operation handle. + class NexusOperationHandle < Workflow::NexusOperationHandle + attr_reader :operation_token, :result_hint + + def initialize(operation_token:, instance:, cancellation:, cancel_callback_key:, # rubocop:disable Lint/MissingSuper + result_hint:) + @operation_token = operation_token + @instance = instance + @cancellation = cancellation + @cancel_callback_key = cancel_callback_key + @result_hint = result_hint + @resolution = nil + end + + def result(result_hint: nil) + # Use detached cancellation like child workflow to avoid interrupting result wait + Workflow.wait_condition(cancellation: Cancellation.new) { @resolution } + + case @resolution.status + when :completed + @instance.payload_converter.from_payload(@resolution.completed, hint: result_hint || @result_hint) + when :failed + raise @instance.failure_converter.from_failure(@resolution.failed, @instance.payload_converter) + when :cancelled + raise @instance.failure_converter.from_failure(@resolution.cancelled, @instance.payload_converter) + when :timed_out + raise @instance.failure_converter.from_failure(@resolution.timed_out, @instance.payload_converter) + else + raise "Unrecognized Nexus operation result status: #{@resolution.status}" + end + end + + def _resolve(resolution) + @cancellation.remove_cancel_callback(@cancel_callback_key) + @resolution = resolution + end + end + end + end + end +end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb index cbae4b6e..db5c5c55 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb @@ -6,6 +6,7 @@ require 'temporalio/internal/bridge/api' require 'temporalio/internal/proto_utils' require 'temporalio/internal/worker/workflow_instance' +require 'temporalio/internal/worker/workflow_instance/nexus_operation_handle' require 'temporalio/worker/interceptor' require 'temporalio/workflow' require 'temporalio/workflow/child_workflow_handle' @@ -22,6 +23,7 @@ def initialize(instance) @activity_counter = 0 @timer_counter = 0 @child_counter = 0 + @nexus_operation_counter = 0 @external_signal_counter = 0 @external_cancel_counter = 0 end @@ -429,6 +431,72 @@ def start_child_workflow(input) raise "Unknown resolution status: #{resolution.status}" end end + + def start_nexus_operation(input) + raise Error::CanceledError, 'Nexus operation canceled before scheduled' if input.cancellation.canceled? + + # Add the command + seq = (@nexus_operation_counter += 1) + @instance.add_command( + Bridge::Api::WorkflowCommands::WorkflowCommand.new( + schedule_nexus_operation: Bridge::Api::WorkflowCommands::ScheduleNexusOperation.new( + seq:, + endpoint: input.endpoint, + service: input.service, + operation: input.operation, + input: @instance.payload_converter.to_payload(input.arg, hint: input.arg_hint), + schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout), + nexus_header: input.headers, + cancellation_type: input.cancellation_type + ), + user_metadata: ProtoUtils.to_user_metadata(input.summary, nil, @instance.payload_converter) + ) + ) + + # Set as pending start + @instance.pending_nexus_operation_starts[seq] = Fiber.current + + # Register cancel callback + cancel_callback_key = input.cancellation.add_cancel_callback do + # Send cancel if in start or pending + if @instance.pending_nexus_operation_starts.include?(seq) || + @instance.pending_nexus_operations.include?(seq) + @instance.add_command( + Bridge::Api::WorkflowCommands::WorkflowCommand.new( + request_cancel_nexus_operation: Bridge::Api::WorkflowCommands::RequestCancelNexusOperation.new( + seq: + ) + ) + ) + end + end + + # Wait for start resolution + resolution = begin + Fiber.yield + ensure + # Remove pending start + @instance.pending_nexus_operation_starts.delete(seq) + end + + # Handle start failure + if resolution.failed + input.cancellation.remove_cancel_callback(cancel_callback_key) + raise @instance.failure_converter.from_failure(resolution.failed, @instance.payload_converter) + end + + # Create handle and add to pending operations (result will come via resolve_nexus_operation) + handle = NexusOperationHandle.new( + operation_token: resolution.operation_token, + instance: @instance, + cancellation: input.cancellation, + cancel_callback_key:, + result_hint: input.result_hint + ) + @instance.pending_nexus_operations[seq] = handle + + handle + end end end end diff --git a/temporalio/lib/temporalio/testing/workflow_environment.rb b/temporalio/lib/temporalio/testing/workflow_environment.rb index d5e4d23c..7e03452a 100644 --- a/temporalio/lib/temporalio/testing/workflow_environment.rb +++ b/temporalio/lib/temporalio/testing/workflow_environment.rb @@ -2,6 +2,7 @@ require 'delegate' require 'temporalio/api' +require 'temporalio/api/operatorservice/v1/request_response' require 'temporalio/api/testservice/v1/request_response' require 'temporalio/client' require 'temporalio/client/connection/test_service' @@ -264,6 +265,45 @@ def current_time Time.now end + # Create Nexus endpoint on this test environment. + # + # WARNING: Nexus support is experimental. + # + # @param name [String] Endpoint name. + # @param task_queue [String] Task queue for the endpoint. + # @return [Temporalio::Api::Nexus::V1::Endpoint] Created endpoint. + def create_nexus_endpoint(name:, task_queue:) + resp = client.connection.operator_service.create_nexus_endpoint( + Temporalio::Api::OperatorService::V1::CreateNexusEndpointRequest.new( + spec: Temporalio::Api::Nexus::V1::EndpointSpec.new( + name:, + target: Temporalio::Api::Nexus::V1::EndpointTarget.new( + worker: Temporalio::Api::Nexus::V1::EndpointTarget::Worker.new( + namespace: client.namespace, + task_queue: + ) + ) + ) + ) + ) + resp.endpoint + end + + # Delete Nexus endpoint on this test environment. + # + # WARNING: Nexus support is experimental. + # + # @param endpoint [Temporalio::Api::Nexus::V1::Endpoint] Endpoint to delete. + def delete_nexus_endpoint(endpoint) + client.connection.operator_service.delete_nexus_endpoint( + Temporalio::Api::OperatorService::V1::DeleteNexusEndpointRequest.new( + id: endpoint.id, + version: endpoint.version + ) + ) + nil + end + # Run a block with automatic time skipping disabled. This just runs the block for environments that don't support # time skipping. # diff --git a/temporalio/lib/temporalio/worker/interceptor.rb b/temporalio/lib/temporalio/worker/interceptor.rb index 241ea740..5ca385f5 100644 --- a/temporalio/lib/temporalio/worker/interceptor.rb +++ b/temporalio/lib/temporalio/worker/interceptor.rb @@ -300,6 +300,23 @@ def handle_update(input) :headers ) + # Input for {Outbound.start_nexus_operation}. + # + # WARNING: Nexus support is experimental. + StartNexusOperationInput = Data.define( + :endpoint, + :service, + :operation, + :arg, + :schedule_to_close_timeout, + :cancellation_type, + :summary, + :cancellation, + :arg_hint, + :result_hint, + :headers + ) + # Outbound interceptor for intercepting outbound workflow calls. This should be extended by users needing to # intercept workflow calls. class Outbound @@ -371,6 +388,16 @@ def sleep(input) def start_child_workflow(input) @next_interceptor.start_child_workflow(input) end + + # Start Nexus operation. + # + # WARNING: Nexus support is experimental. + # + # @param input [StartNexusOperationInput] Input. + # @return [Workflow::NexusOperationHandle] Nexus operation handle. + def start_nexus_operation(input) + @next_interceptor.start_nexus_operation(input) + end end end end diff --git a/temporalio/lib/temporalio/workflow.rb b/temporalio/lib/temporalio/workflow.rb index b57093c2..39f1fd68 100644 --- a/temporalio/lib/temporalio/workflow.rb +++ b/temporalio/lib/temporalio/workflow.rb @@ -12,6 +12,9 @@ require 'temporalio/workflow/future' require 'temporalio/workflow/handler_unfinished_policy' require 'temporalio/workflow/info' +require 'temporalio/workflow/nexus_client' +require 'temporalio/workflow/nexus_operation_cancellation_type' +require 'temporalio/workflow/nexus_operation_handle' require 'temporalio/workflow/parent_close_policy' require 'temporalio/workflow/update_info' require 'timeout' @@ -40,6 +43,17 @@ def self.continue_as_new_suggested _current.continue_as_new_suggested end + # Create a Nexus client for executing operations. + # + # WARNING: Nexus support is experimental. + # + # @param endpoint [Symbol, String] Endpoint name. + # @param service [Symbol, String] Service name. + # @return [NexusClient] Client for executing Nexus operations. + def self.create_nexus_client(endpoint:, service:) + _current.create_nexus_client(endpoint:, service:) + end + # Get current details for this workflow that may appear in UI/CLI. Unlike static details set at start, this value # can be updated throughout the life of the workflow. This can be in Temporal markdown format and can span multiple # lines. This is currently experimental. diff --git a/temporalio/lib/temporalio/workflow/nexus_client.rb b/temporalio/lib/temporalio/workflow/nexus_client.rb new file mode 100644 index 00000000..6c946326 --- /dev/null +++ b/temporalio/lib/temporalio/workflow/nexus_client.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +module Temporalio + module Workflow + # Client for executing Nexus operations from workflows. + # + # This is created via {Workflow.create_nexus_client}, it is never instantiated directly. + # + # WARNING: Nexus support is experimental. + class NexusClient + # @!visibility private + def initialize + raise NotImplementedError, 'Cannot instantiate a Nexus client directly' + end + + # @return [String] Endpoint name for this client. + def endpoint + raise NotImplementedError + end + + # @return [String] Service name for this client. + def service + raise NotImplementedError + end + + # Start a Nexus operation and return a handle. + # + # @param operation [Symbol, String] Operation name. + # @param arg [Object] Argument for the operation. + # @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds. + # @param cancellation_type [NexusOperationCancellationType] How the operation will react to cancellation. + # @param summary [String, nil] Optional summary for the operation (appears in UI/CLI). + # @param cancellation [Cancellation] Cancellation for the operation. + # @param arg_hint [Object, nil] Converter hint for the argument. + # @param result_hint [Object, nil] Converter hint for the result. + # @return [NexusOperationHandle] Handle to the started operation. + def start_operation( + operation, + arg, + schedule_to_close_timeout: nil, + cancellation_type: NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED, + summary: nil, + cancellation: Workflow.cancellation, + arg_hint: nil, + result_hint: nil + ) + raise NotImplementedError + end + + # Execute a Nexus operation and wait for the result. + # + # This is a convenience method that calls {#start_operation} and immediately waits for the result. + # + # @param operation [Symbol, String] Operation name. + # @param arg [Object] Argument for the operation. + # @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds. + # @param cancellation_type [NexusOperationCancellationType] How the operation will react to cancellation. + # @param summary [String, nil] Optional summary for the operation (appears in UI/CLI). + # @param cancellation [Cancellation] Cancellation for the operation. + # @param arg_hint [Object, nil] Converter hint for the argument. + # @param result_hint [Object, nil] Converter hint for the result. + # @return [Object] Result of the operation. + # @raise [Error::NexusOperationError] Operation failed. + def execute_operation( + operation, + arg, + schedule_to_close_timeout: nil, + cancellation_type: NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED, + summary: nil, + cancellation: Workflow.cancellation, + arg_hint: nil, + result_hint: nil + ) + start_operation( + operation, arg, schedule_to_close_timeout:, cancellation_type:, summary:, cancellation:, + arg_hint:, result_hint: + ).result + end + end + end +end diff --git a/temporalio/lib/temporalio/workflow/nexus_operation_cancellation_type.rb b/temporalio/lib/temporalio/workflow/nexus_operation_cancellation_type.rb new file mode 100644 index 00000000..8fe30c3e --- /dev/null +++ b/temporalio/lib/temporalio/workflow/nexus_operation_cancellation_type.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +require 'temporalio/internal/bridge/api' + +module Temporalio + module Workflow + # How a Nexus operation should handle cancellation. + # + # WARNING: Nexus support is experimental. + module NexusOperationCancellationType + # Wait for cancellation to complete (default). + WAIT_CANCELLATION_COMPLETED = Internal::Bridge::Api::Nexus::NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED + # Abandon the operation without sending a cancellation request. + ABANDON = Internal::Bridge::Api::Nexus::NexusOperationCancellationType::ABANDON + # Send a cancellation request but do not wait for confirmation. + TRY_CANCEL = Internal::Bridge::Api::Nexus::NexusOperationCancellationType::TRY_CANCEL + # Wait for the server to confirm the cancellation request was delivered. + WAIT_CANCELLATION_REQUESTED = Internal::Bridge::Api::Nexus::NexusOperationCancellationType::WAIT_CANCELLATION_REQUESTED + end + end +end diff --git a/temporalio/lib/temporalio/workflow/nexus_operation_handle.rb b/temporalio/lib/temporalio/workflow/nexus_operation_handle.rb new file mode 100644 index 00000000..3de22930 --- /dev/null +++ b/temporalio/lib/temporalio/workflow/nexus_operation_handle.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +module Temporalio + module Workflow + # Handle for interacting with a Nexus operation. + # + # This is created via {NexusClient#start_operation}, it is never instantiated directly. + # + # WARNING: Nexus support is experimental. + class NexusOperationHandle + # @!visibility private + def initialize + raise NotImplementedError, 'Cannot instantiate a Nexus operation handle directly' + end + + # @return [String, nil] Operation token for async operations, nil for sync operations. + def operation_token + raise NotImplementedError + end + + # @return [Object, nil] Hint for the result if any. + def result_hint + raise NotImplementedError + end + + # Wait for the result. + # + # @param result_hint [Object, nil] Override the result hint, or if nil uses the one on the handle. + # @return [Object] Result of the Nexus operation. + # + # @raise [Error::NexusOperationError] Operation failed with +cause+ as the cause. + def result(result_hint: nil) + raise NotImplementedError + end + end + end +end diff --git a/temporalio/sig/temporalio/error/failure.rbs b/temporalio/sig/temporalio/error/failure.rbs index cc59607c..738f3b4a 100644 --- a/temporalio/sig/temporalio/error/failure.rbs +++ b/temporalio/sig/temporalio/error/failure.rbs @@ -124,5 +124,39 @@ module Temporalio retry_state: RetryState::enum? ) -> void end + + class NexusOperationError < Failure + attr_reader endpoint: String + attr_reader service: String + attr_reader operation: String + attr_reader operation_token: String? + + def initialize: ( + String message, + endpoint: String, + service: String, + operation: String, + operation_token: String? + ) -> void + end + + class NexusHandlerError < Failure + attr_reader error_type: Symbol + attr_reader retry_behavior: RetryBehavior::enum + + def initialize: ( + String message, + error_type: Symbol | String, + retry_behavior: RetryBehavior::enum + ) -> void + + module RetryBehavior + type enum = Integer + + UNSPECIFIED: enum + RETRYABLE: enum + NON_RETRYABLE: enum + end + end end end \ No newline at end of file diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs index 07967eee..a2eba2da 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs @@ -18,6 +18,8 @@ module Temporalio attr_reader pending_timers: Hash[Integer, Fiber] attr_reader pending_child_workflow_starts: Hash[Integer, Fiber] attr_reader pending_child_workflows: Hash[Integer, ChildWorkflowHandle] + attr_reader pending_nexus_operation_starts: Hash[Integer, Fiber] + attr_reader pending_nexus_operations: Hash[Integer, NexusOperationHandle] attr_reader pending_external_signals: Hash[Integer, Fiber] attr_reader pending_external_cancels: Hash[Integer, Fiber] attr_reader in_progress_handlers: Array[HandlerExecution] diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs index 40265eac..c1387f88 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs @@ -11,6 +11,8 @@ module Temporalio def continue_as_new_suggested: -> bool + def create_nexus_client: (endpoint: Symbol | String, service: Symbol | String) -> Workflow::NexusClient + def current_details: -> String def current_details=: (String? details) -> void diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/nexus_client.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/nexus_client.rbs new file mode 100644 index 00000000..315176fe --- /dev/null +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/nexus_client.rbs @@ -0,0 +1,25 @@ +module Temporalio + module Internal + module Worker + class WorkflowInstance + class NexusClient < Workflow::NexusClient + attr_reader endpoint: String + attr_reader service: String + + def initialize: (endpoint: Symbol | String, service: Symbol | String, outbound: untyped) -> void + + def start_operation: ( + Symbol | String operation, + Object? arg, + ?schedule_to_close_timeout: duration?, + ?cancellation_type: Workflow::NexusOperationCancellationType::enum, + ?summary: String?, + ?cancellation: Cancellation, + ?arg_hint: Object?, + ?result_hint: Object? + ) -> Workflow::NexusOperationHandle + end + end + end + end +end diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/nexus_operation_handle.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/nexus_operation_handle.rbs new file mode 100644 index 00000000..da60cb20 --- /dev/null +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/nexus_operation_handle.rbs @@ -0,0 +1,19 @@ +module Temporalio + module Internal + module Worker + class WorkflowInstance + class NexusOperationHandle < Workflow::NexusOperationHandle + def initialize: ( + operation_token: String?, + instance: WorkflowInstance, + cancellation: Cancellation, + cancel_callback_key: Object, + result_hint: Object? + ) -> void + + def _resolve: (untyped resolution) -> void + end + end + end + end +end diff --git a/temporalio/sig/temporalio/testing/workflow_environment.rbs b/temporalio/sig/temporalio/testing/workflow_environment.rbs index 58ac5cef..437dbfef 100644 --- a/temporalio/sig/temporalio/testing/workflow_environment.rbs +++ b/temporalio/sig/temporalio/testing/workflow_environment.rbs @@ -104,6 +104,10 @@ module Temporalio def current_time: -> Time + def create_nexus_endpoint: (name: String, task_queue: String) -> untyped + + def delete_nexus_endpoint: (untyped endpoint) -> nil + def auto_time_skipping_disabled: [T] { -> T } -> T class Ephemeral < WorkflowEnvironment diff --git a/temporalio/sig/temporalio/worker/interceptor.rbs b/temporalio/sig/temporalio/worker/interceptor.rbs index b103d657..214b5597 100644 --- a/temporalio/sig/temporalio/worker/interceptor.rbs +++ b/temporalio/sig/temporalio/worker/interceptor.rbs @@ -312,6 +312,34 @@ module Temporalio ) -> void end + class StartNexusOperationInput + attr_reader endpoint: String + attr_reader service: String + attr_reader operation: String + attr_reader arg: Object? + attr_reader schedule_to_close_timeout: duration? + attr_reader cancellation_type: Temporalio::Workflow::NexusOperationCancellationType::enum + attr_reader summary: String? + attr_reader cancellation: Cancellation + attr_reader arg_hint: Object? + attr_reader result_hint: Object? + attr_reader headers: Hash[String, String] + + def initialize: ( + endpoint: String, + service: String, + operation: String, + arg: Object?, + schedule_to_close_timeout: duration?, + cancellation_type: Temporalio::Workflow::NexusOperationCancellationType::enum, + summary: String?, + cancellation: Cancellation, + arg_hint: Object?, + result_hint: Object?, + headers: Hash[String, String] + ) -> void + end + class Outbound attr_reader next_interceptor: Outbound @@ -332,6 +360,8 @@ module Temporalio def sleep: (SleepInput input) -> void def start_child_workflow: (StartChildWorkflowInput input) -> Temporalio::Workflow::ChildWorkflowHandle + + def start_nexus_operation: (StartNexusOperationInput input) -> Temporalio::Workflow::NexusOperationHandle end end end diff --git a/temporalio/sig/temporalio/workflow.rbs b/temporalio/sig/temporalio/workflow.rbs index 0343d30e..df80b29b 100644 --- a/temporalio/sig/temporalio/workflow.rbs +++ b/temporalio/sig/temporalio/workflow.rbs @@ -6,6 +6,8 @@ module Temporalio def self.continue_as_new_suggested: -> bool + def self.create_nexus_client: (endpoint: Symbol | String, service: Symbol | String) -> NexusClient + def self.current_details: -> String def self.current_details=: (String? details) -> void diff --git a/temporalio/sig/temporalio/workflow/nexus_client.rbs b/temporalio/sig/temporalio/workflow/nexus_client.rbs new file mode 100644 index 00000000..94d86121 --- /dev/null +++ b/temporalio/sig/temporalio/workflow/nexus_client.rbs @@ -0,0 +1,30 @@ +module Temporalio + module Workflow + class NexusClient + def endpoint: -> String + def service: -> String + + def start_operation: ( + Symbol | String operation, + Object? arg, + ?schedule_to_close_timeout: duration?, + ?cancellation_type: NexusOperationCancellationType::enum, + ?summary: String?, + ?cancellation: Cancellation, + ?arg_hint: Object?, + ?result_hint: Object? + ) -> NexusOperationHandle + + def execute_operation: ( + Symbol | String operation, + Object? arg, + ?schedule_to_close_timeout: duration?, + ?cancellation_type: NexusOperationCancellationType::enum, + ?summary: String?, + ?cancellation: Cancellation, + ?arg_hint: Object?, + ?result_hint: Object? + ) -> Object? + end + end +end diff --git a/temporalio/sig/temporalio/workflow/nexus_operation_cancellation_type.rbs b/temporalio/sig/temporalio/workflow/nexus_operation_cancellation_type.rbs new file mode 100644 index 00000000..26b3f352 --- /dev/null +++ b/temporalio/sig/temporalio/workflow/nexus_operation_cancellation_type.rbs @@ -0,0 +1,12 @@ +module Temporalio + module Workflow + module NexusOperationCancellationType + type enum = Integer + + WAIT_CANCELLATION_COMPLETED: enum + ABANDON: enum + TRY_CANCEL: enum + WAIT_CANCELLATION_REQUESTED: enum + end + end +end diff --git a/temporalio/sig/temporalio/workflow/nexus_operation_handle.rbs b/temporalio/sig/temporalio/workflow/nexus_operation_handle.rbs new file mode 100644 index 00000000..c864ef0a --- /dev/null +++ b/temporalio/sig/temporalio/workflow/nexus_operation_handle.rbs @@ -0,0 +1,10 @@ +module Temporalio + module Workflow + class NexusOperationHandle + def operation_token: -> String? + def result_hint: -> Object? + + def result: (?result_hint: Object?) -> Object? + end + end +end diff --git a/temporalio/test/golangworker/main.go b/temporalio/test/golangworker/main.go index a971cbe8..53233ecf 100644 --- a/temporalio/test/golangworker/main.go +++ b/temporalio/test/golangworker/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "errors" "fmt" @@ -9,9 +10,11 @@ import ( "os" "time" + "github.com/nexus-rpc/sdk-go/nexus" "go.temporal.io/sdk/client" sdklog "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/temporalnexus" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -20,6 +23,55 @@ func init() { slog.SetLogLoggerLevel(slog.LevelWarn) } +// Nexus operation definitions +var syncOp = nexus.NewSyncOperation("echo", func(ctx context.Context, input string, options nexus.StartOperationOptions) (string, error) { + switch input { + case "success": + return input, nil + case "fail": + return "", nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "operation failed") + case "operation-error": + return "", fmt.Errorf("operation error") + default: + return input, nil + } +}) + +type NexusHandlerInput struct { + Action string `json:"action"` +} + +type NexusHandlerOutput struct { + Result string `json:"result"` +} + +var asyncOp = temporalnexus.NewWorkflowRunOperation( + "workflow-operation", + NexusHandlerWorkflow, + func(ctx context.Context, input NexusHandlerInput, options nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + return client.StartWorkflowOptions{ + ID: fmt.Sprintf("nexus-handler-%d", time.Now().UnixNano()), + }, nil + }, +) + +func NexusHandlerWorkflow(ctx workflow.Context, input NexusHandlerInput) (NexusHandlerOutput, error) { + switch input.Action { + case "success": + return NexusHandlerOutput{Result: "success"}, nil + case "fail": + return NexusHandlerOutput{}, temporal.NewApplicationError("workflow failed", "TestError") + case "wait-for-cancel": + // Block forever, waiting for cancellation + if err := workflow.Await(ctx, func() bool { return false }); err != nil { + return NexusHandlerOutput{}, err + } + return NexusHandlerOutput{Result: "cancelled"}, nil + default: + return NexusHandlerOutput{Result: input.Action}, nil + } +} + func main() { if len(os.Args) != 4 { log.Fatalf("expected endpoint, namespace, and task queue arg, found %v args", len(os.Args)-1) @@ -44,6 +96,18 @@ func run(endpoint, namespace, taskQueue string) error { slog.Info("Creating worker") w := worker.New(cl, taskQueue, worker.Options{}) w.RegisterWorkflowWithOptions(KitchenSinkWorkflow, workflow.RegisterOptions{Name: "kitchen_sink"}) + w.RegisterWorkflowWithOptions(NexusHandlerWorkflow, workflow.RegisterOptions{Name: "nexus_handler"}) + + // Register Nexus service + nexusService := nexus.NewService("test-service") + if err := nexusService.Register(syncOp); err != nil { + return fmt.Errorf("failed to register sync operation: %w", err) + } + if err := nexusService.Register(asyncOp); err != nil { + return fmt.Errorf("failed to register async operation: %w", err) + } + w.RegisterNexusService(nexusService) + defer slog.Info("Stopping worker") return w.Run(worker.InterruptCh()) } diff --git a/temporalio/test/sig/test.rbs b/temporalio/test/sig/test.rbs index 244f1e4a..68116073 100644 --- a/temporalio/test/sig/test.rbs +++ b/temporalio/test/sig/test.rbs @@ -37,7 +37,8 @@ class Test < Minitest::Test def with_kitchen_sink_worker: [T] ( ?Temporalio::Client worker_client, - ?task_queue: String + ?task_queue: String, + ?nexus: bool ) { (String task_queue) -> T } -> T def kitchen_sink_exe: -> String diff --git a/temporalio/test/sig/workflow_utils.rbs b/temporalio/test/sig/workflow_utils.rbs index 830b737d..38a0009b 100644 --- a/temporalio/test/sig/workflow_utils.rbs +++ b/temporalio/test/sig/workflow_utils.rbs @@ -23,7 +23,7 @@ module WorkflowUtils ?priority: Temporalio::Priority, ?start_workflow_client: Temporalio::Client, ?tuner: Temporalio::Worker::Tuner - ) -> Object? | + ) -> untyped | [T] ( singleton(Temporalio::Workflow::Definition) workflow, *Object? args, diff --git a/temporalio/test/test.rb b/temporalio/test/test.rb index db467637..14df676b 100644 --- a/temporalio/test/test.rb +++ b/temporalio/test/test.rb @@ -175,7 +175,14 @@ def client @server.client end - def with_kitchen_sink_worker(worker_client = client, task_queue: "tq-#{SecureRandom.uuid}") + def with_kitchen_sink_worker(worker_client = client, task_queue: "tq-#{SecureRandom.uuid}", nexus: false) + # Create Nexus endpoint for the task queue if requested + endpoint = nil + if nexus + endpoint_name = "nexus-endpoint-#{task_queue}" + endpoint = @server.create_nexus_endpoint(name: endpoint_name, task_queue: task_queue) + end + # Run the golangworker pid = spawn( kitchen_sink_exe, @@ -187,6 +194,7 @@ def with_kitchen_sink_worker(worker_client = client, task_queue: "tq-#{SecureRan ensure Process.kill('KILL', pid) Timeout.timeout(5) { Process.wait(pid) } + @server.delete_nexus_endpoint(endpoint) if endpoint end end diff --git a/temporalio/test/worker_workflow_nexus_test.rb b/temporalio/test/worker_workflow_nexus_test.rb new file mode 100644 index 00000000..bedcf592 --- /dev/null +++ b/temporalio/test/worker_workflow_nexus_test.rb @@ -0,0 +1,476 @@ +# frozen_string_literal: true + +require 'opentelemetry/sdk' +require 'temporalio/client' +require 'temporalio/contrib/open_telemetry' +require 'temporalio/error' +require 'temporalio/testing' +require 'temporalio/worker' +require 'temporalio/workflow' +require 'test' + +class WorkerWorkflowNexusTest < Test + # Test basic sync operation success + class NexusSyncOperationSuccessWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + client.execute_operation('echo', 'success') + end + end + + def test_nexus_sync_operation_success + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + result = execute_workflow(NexusSyncOperationSuccessWorkflow, endpoint) + assert_equal 'success', result + end + end + + # Test sync operation with handler error + class NexusSyncOperationHandlerErrorWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + client.execute_operation('echo', 'fail') + end + end + + def test_nexus_sync_operation_handler_error + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + err = assert_raises(Temporalio::Error::WorkflowFailedError) do + execute_workflow(NexusSyncOperationHandlerErrorWorkflow, endpoint) + end + assert_instance_of Temporalio::Error::NexusOperationError, err.cause + assert_includes err.cause.message, 'nexus operation completed unsuccessfully' + end + end + + # Test async operation success + class NexusAsyncOperationSuccessWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + handle = client.start_operation('workflow-operation', { 'action' => 'success' }) + + # For async operations, operation_token should be present + Temporalio::Workflow.logger.info("Operation started with token: #{handle.operation_token}") + + handle.result + end + end + + def test_nexus_async_operation_success + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + result = execute_workflow(NexusAsyncOperationSuccessWorkflow, endpoint) + assert_equal 'success', result['result'] + end + end + + # Test async operation failure + class NexusAsyncOperationFailureWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + handle = client.start_operation('workflow-operation', { 'action' => 'fail' }) + + # For async operations, operation_token should be present + Temporalio::Workflow.logger.info("Operation started with token: #{handle.operation_token}") + + handle.result + end + end + + def test_nexus_async_operation_failure + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + err = assert_raises(Temporalio::Error::WorkflowFailedError) do + execute_workflow(NexusAsyncOperationFailureWorkflow, endpoint) + end + assert_instance_of Temporalio::Error::NexusOperationError, err.cause + assert_includes err.cause.message, 'nexus operation completed unsuccessfully' + end + end + + # Test operation handle with token (reused for both async and sync) + class NexusOperationHandleWorkflow < Temporalio::Workflow::Definition + workflow_query_attr_reader :operation_token + workflow_query_attr_reader :has_token + + def execute(endpoint, operation, input) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + handle = client.start_operation(operation, input) + + @operation_token = handle.operation_token + @has_token = !handle.operation_token.nil? + + handle.result + end + end + + def test_nexus_operation_handle_async_has_token + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + execute_workflow( + NexusOperationHandleWorkflow, + endpoint, + 'workflow-operation', + { 'action' => 'success' } + ) do |handle| + # Wait for operation to start + assert_eventually { assert handle.query(NexusOperationHandleWorkflow.has_token) } + + token = handle.query(NexusOperationHandleWorkflow.operation_token) + refute_nil token + assert token.length.positive? # steep:ignore + + result = handle.result + assert_equal 'success', result['result'] # steep:ignore + end + end + end + + def test_nexus_operation_handle_sync_has_token + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + execute_workflow(NexusOperationHandleWorkflow, endpoint, 'echo', 'success') do |handle| + # Even sync operations have tokens when using start_operation + assert_eventually { assert handle.query(NexusOperationHandleWorkflow.has_token) } + + token = handle.query(NexusOperationHandleWorkflow.operation_token) + refute_nil token + + result = handle.result + assert_equal 'success', result + end + end + end + + # Test operation cancellation (reused for different cancellation types) + class NexusCancellationWorkflow < Temporalio::Workflow::Definition + workflow_query_attr_reader :started + workflow_query_attr_reader :cancelled + + def execute(endpoint, cancellation_type) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + + cancellation, cancel_proc = Temporalio::Cancellation.new + handle = client.start_operation( + 'workflow-operation', + { 'action' => 'wait-for-cancel' }, + cancellation_type:, + cancellation: + ) + + @started = true + + # Wait for the operation to actually start before cancelling + # In time-skipping mode, we need to yield to let the operation start + Temporalio::Workflow.sleep(0.01) + + # Cancel the operation + cancel_proc.call + @cancelled = true + + # Yield to allow the cancel command to be processed + Temporalio::Workflow.sleep(0.01) + + # For async operations that are cancelled, result should raise NexusOperationError + begin + handle.result + rescue Temporalio::Error::NexusOperationError => e + # Check if the cause is a CanceledError + return { 'cancelled' => true, 'error' => e.class.name } if e.cause.is_a?(Temporalio::Error::CanceledError) + + raise + end + + { 'cancelled' => false, 'result' => 'unexpected' } + end + end + + def test_nexus_operation_cancellation_try_cancel + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + execute_workflow( + NexusCancellationWorkflow, + endpoint, + Temporalio::Workflow::NexusOperationCancellationType::TRY_CANCEL + ) do |handle| + assert_eventually { assert handle.query(NexusCancellationWorkflow.started) } + assert_eventually { assert handle.query(NexusCancellationWorkflow.cancelled) } + + result = handle.result + assert result['cancelled'] # steep:ignore + end + end + end + + def test_nexus_operation_cancellation_abandon + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + execute_workflow( + NexusCancellationWorkflow, + endpoint, + Temporalio::Workflow::NexusOperationCancellationType::ABANDON + ) do |handle| + assert_eventually { assert handle.query(NexusCancellationWorkflow.started) } + assert_eventually { assert handle.query(NexusCancellationWorkflow.cancelled) } + + result = handle.result + assert result['cancelled'] # steep:ignore + end + end + end + + # Test multiple operations in sequence + class NexusMultipleOperationsWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + + # Execute sync operation + sync_result = client.execute_operation('echo', 'success') + + # Execute async operation + async_result = client.execute_operation('workflow-operation', { 'action' => 'success' }) + + { + 'sync' => sync_result, + 'async' => async_result['result'] # steep:ignore + } + end + end + + def test_nexus_multiple_operations_in_sequence + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + result = execute_workflow(NexusMultipleOperationsWorkflow, endpoint) + + assert_equal 'success', result['sync'] + assert_equal 'success', result['async'] + end + end + + # Test concurrent operations + class NexusConcurrentOperationsWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + + # Start multiple operations concurrently + handles = 3.times.map do |i| + client.start_operation('echo', "op-#{i}") + end + + # Wait for all to complete + handles.map(&:result) + end + end + + def test_nexus_concurrent_operations + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + results = execute_workflow(NexusConcurrentOperationsWorkflow, endpoint) + + assert_equal 3, results.length + assert_equal 'op-0', results[0] + assert_equal 'op-1', results[1] + assert_equal 'op-2', results[2] + end + end + + # Test operation with schedule_to_close_timeout (success case) + class NexusOperationWithTimeoutSuccessWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + client.execute_operation('echo', 'success', schedule_to_close_timeout: 60) + end + end + + def test_nexus_operation_with_timeout + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + result = execute_workflow(NexusOperationWithTimeoutSuccessWorkflow, endpoint) + + assert_equal 'success', result + end + end + + # Test that Nexus operations can timeout + class NexusOperationTimeoutWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + client.execute_operation( + 'workflow-operation', + { 'action' => 'wait-for-cancel' }, + schedule_to_close_timeout: 0.1 + ) + end + end + + def test_nexus_operation_timeout + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + err = assert_raises(Temporalio::Error::WorkflowFailedError) do + execute_workflow(NexusOperationTimeoutWorkflow, endpoint) + end + + # Should be a NexusOperationError wrapping a TimeoutError + assert_instance_of Temporalio::Error::NexusOperationError, err.cause + assert_instance_of Temporalio::Error::TimeoutError, err.cause.cause + assert_includes err.cause.message, 'nexus operation completed unsuccessfully' + end + end + + # Test that NexusOperationError has correct attributes + class NexusOperationErrorCheckWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + begin + client.execute_operation('echo', 'fail') + rescue Temporalio::Error::NexusOperationError => e + { + 'endpoint' => e.endpoint, + 'service' => e.service, + 'operation' => e.operation, + 'has_token' => !e.operation_token.nil?, + 'message' => e.message + } + end + end + end + + def test_nexus_operation_error_attributes + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + result = execute_workflow(NexusOperationErrorCheckWorkflow, endpoint) + + assert_equal endpoint, result['endpoint'] + assert_equal 'test-service', result['service'] + assert_equal 'echo', result['operation'] + assert_equal false, result['has_token'] # Sync operation that fails immediately has no token + assert_includes result['message'], 'nexus operation completed unsuccessfully' + end + end + + # Test using symbols for endpoint and service names + class NexusSymbolNamesWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint: endpoint.to_sym, service: :'test-service') + client.execute_operation(:echo, 'success') + end + end + + def test_nexus_operation_with_symbol_names + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + result = execute_workflow(NexusSymbolNamesWorkflow, endpoint) + + assert_equal 'success', result + end + end + + # Test handler error returned from Go side + class NexusHandlerErrorWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + begin + client.execute_operation('echo', 'fail') + rescue Temporalio::Error::NexusOperationError => e + # The handler error should be wrapped in the cause + raise unless e.cause.is_a?(Temporalio::Error::NexusHandlerError) + + # steep:ignore:start + { + handler_error: true, + error_type: e.cause.error_type, + retry_behavior: e.cause.retry_behavior, + message: e.cause.message + } + # steep:ignore:end + end + end + end + + def test_nexus_handler_error + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + result = execute_workflow(NexusHandlerErrorWorkflow, endpoint) + + assert result['handler_error'] + assert_equal 'BAD_REQUEST', result['error_type'] + assert_equal Temporalio::Error::NexusHandlerError::RetryBehavior::UNSPECIFIED, result['retry_behavior'] + assert_includes result['message'], 'operation failed' + end + end + + # Test that summary appears in workflow history + class NexusOperationSummaryWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + client.execute_operation('echo', 'success', summary: 'custom operation summary') + end + end + + def test_nexus_operation_summary_in_history + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + execute_workflow(NexusOperationSummaryWorkflow, endpoint) do |handle| + assert_equal 'success', handle.result + + history_events = handle.fetch_history_events.to_a + scheduled_event = history_events.find(&:nexus_operation_scheduled_event_attributes) + summary_payload = scheduled_event.user_metadata.summary + actual_summary = env.client.data_converter.from_payload(summary_payload) + assert_equal 'custom operation summary', actual_summary + end + end + end + + class NexusOperationTracingWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + client.execute_operation('echo', 'success') + end + end + + def test_nexus_operation_open_telemetry_tracing + # Set up in-memory span exporter + exporter = OpenTelemetry::SDK::Trace::Export::InMemorySpanExporter.new + tracer_provider = OpenTelemetry::SDK::Trace::TracerProvider.new + tracer_provider.add_span_processor(OpenTelemetry::SDK::Trace::Export::SimpleSpanProcessor.new(exporter)) + tracer = tracer_provider.tracer('nexus-test-tracer') + tracing_interceptor = Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(tracer) + + # Create client with tracing interceptor + new_options = env.client.options.with(interceptors: [tracing_interceptor]) + traced_client = Temporalio::Client.new(**new_options.to_h) # steep:ignore + + env.with_kitchen_sink_worker(traced_client, nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + # Execute workflow + result = execute_workflow( + NexusOperationTracingWorkflow, + endpoint, + client: traced_client + ) + assert_equal 'success', result + + # Verify StartNexusOperation span was created with correct name + finished_spans = exporter.finished_spans + span_names = finished_spans.map(&:name) + assert_includes span_names, 'StartNexusOperation:test-service/echo', + "Expected StartNexusOperation span, got: #{span_names.inspect}" + end + end +end