Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def poll_loop
end

def poll_for_task
require 'grpc/errors'
connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)
rescue ::GRPC::Cancelled
# We're shutting down and we've already reported that in the logs
Expand Down
5 changes: 5 additions & 0 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
require 'temporal/execution_options'
require 'temporal/connection'
require 'temporal/concerns/payloads'
require 'temporal/connection/errors'
require 'temporal/connection/serializer'
require 'temporal/connection/serializer/failure'
require 'temporal/connection/serializer/workflow_id_reuse_policy'
require 'temporal/activity'
require 'temporal/activity/async_token'
require 'temporal/workflow'
Expand Down
8 changes: 4 additions & 4 deletions lib/temporal/connection.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
require 'temporal/connection/grpc'

module Temporal
module Connection
autoload :GRPC, 'temporal/connection/grpc'

CLIENT_TYPES_MAP = {
grpc: Temporal::Connection::GRPC
grpc: :GRPC
}.freeze

def self.generate(configuration)
connection_class = CLIENT_TYPES_MAP[configuration.type]
connection_class = const_get(CLIENT_TYPES_MAP.fetch(configuration.type))
host = configuration.host
port = configuration.port
credentials = configuration.credentials
Expand Down
3 changes: 1 addition & 2 deletions lib/temporal/connection/retryer.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
require 'grpc/errors'

module Temporal
module Connection
module Retryer
Expand All @@ -12,6 +10,7 @@ module Retryer
# https://github.com/temporalio/sdk-java/blob/ad8831d4a4d9d257baf3482ab49f1aa681895c0e/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java#L32
# No amount of retrying will help in these cases.
def self.do_not_retry_errors
require 'grpc/errors'
[
::GRPC::AlreadyExists,
::GRPC::Cancelled,
Expand Down
2 changes: 2 additions & 0 deletions lib/temporal/workflow/errors.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
require 'temporal/errors'
require 'temporal/concerns/payloads'
require 'gen/temporal/api/enums/v1/failed_cause_pb'

module Temporal
class Workflow
Expand Down
1 change: 1 addition & 0 deletions lib/temporal/workflow/history/event.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'temporal/concerns/input_deserializer'
require 'temporal/concerns/payloads'

module Temporal
class Workflow
Expand Down
73 changes: 73 additions & 0 deletions lib/temporal/workflow/history/event_target.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,82 @@ def hash
[id, type].hash
end

# Value-aware comparison for attributes that handles plain Ruby objects
# without custom == (e.g., workflow/activity Request classes).
#
# During replay, Temporal deserializes inputs from history and compares
# them to the current execution. Ruby's default Object#== uses object
# identity, so two Request objects with identical data compare as unequal,
# causing false NonDeterministicWorkflowError. This method falls back to
# instance-variable comparison for objects that don't define their own ==.
def attributes_equal?(other_attributes)
return true if attributes == other_attributes
return false if attributes.nil? || other_attributes.nil?

normalized_attributes = self.class.normalize_hash_for_compare(attributes)
normalized_other = self.class.normalize_hash_for_compare(other_attributes)
return false if normalized_attributes.nil? || normalized_other.nil?
return false unless normalized_attributes.size == normalized_other.size

normalized_attributes.all? do |key, value|
normalized_other.key?(key) && self.class.deep_value_equal?(value, normalized_other[key])
end
end

def self.deep_value_equal?(a, b)
# Fast path: identical object
return true if a.equal?(b)

# Standard types that define meaningful ==
return a == b if a.is_a?(String) || a.is_a?(Numeric) || a.is_a?(Symbol) ||
a.is_a?(TrueClass) || a.is_a?(FalseClass) || a.nil?

# Arrays: compare element-by-element
if a.is_a?(Array) && b.is_a?(Array)
return false if a.length != b.length
return a.zip(b).all? { |x, y| deep_value_equal?(x, y) }
end

# Hashes: compare key-by-key (normalize symbol keys to strings)
if a.is_a?(Hash) && b.is_a?(Hash)
normalized_a = normalize_hash_for_compare(a)
normalized_b = normalize_hash_for_compare(b)
return false if normalized_a.nil? || normalized_b.nil?
return false unless normalized_a.size == normalized_b.size

return normalized_a.all? do |key, value|
normalized_b.key?(key) && deep_value_equal?(value, normalized_b[key])
end
end

# If the class defines its own == (not inherited from BasicObject/Object)
eq_owner = a.class.instance_method(:==).owner
return a == b if eq_owner != ::BasicObject && eq_owner != ::Object

# Fallback: compare by class + instance variables for plain objects
return false unless a.class == b.class
return true if a.instance_variables.empty? && b.instance_variables.empty?

a.instance_variables.all? do |var|
deep_value_equal?(a.instance_variable_get(var), b.instance_variable_get(var))
end
end

def to_s
"#{type}: #{id} (#{attributes})"
end

def self.normalize_hash_for_compare(hash)
normalized = {}
hash.each do |key, value|
normalized_key = key.is_a?(Symbol) ? key.to_s : key
return nil if normalized.key?(normalized_key)

normalized[normalized_key] = value
end

normalized
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow/poller.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
require 'grpc/errors'
require 'temporal/connection'
require 'temporal/thread_pool'
require 'temporal/middleware/chain'
Expand Down Expand Up @@ -92,6 +91,7 @@ def poll_loop
end

def poll_for_task
require 'grpc/errors'
connection.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue, binary_checksum: binary_checksum)
rescue ::GRPC::Cancelled
# We're shutting down and we've already reported that in the logs
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def discard_command(history_target)

replay_target = History::EventTarget.from_command(replay_command_id, replay_command)

if history_target != replay_target || history_target.attributes != replay_target.attributes
if history_target != replay_target || !replay_target.attributes_equal?(history_target.attributes)
raise NonDeterministicWorkflowError,
"Unexpected command. The replaying code is issuing: #{replay_target}, "\
"but the history of previous executions recorded: #{history_target}. " + NONDETERMINISM_ERROR_SUGGESTION
Expand Down
1 change: 1 addition & 0 deletions lib/temporal/workflow/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'temporal/workflow/history'
require 'temporal/workflow/stack_trace_tracker'
require 'temporal/metric_keys'
require 'gen/temporal/api/enums/v1/failed_cause_pb'

module Temporal
class Workflow
Expand Down
61 changes: 61 additions & 0 deletions spec/unit/lib/temporal/load_order_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require 'open3'
require 'rbconfig'
require 'temporal'

describe 'Temporal load order' do
def run_clean_ruby(script)
root = File.expand_path('../../../..', __dir__)
lib_path = File.join(root, 'lib')
gemfile = File.join(root, 'Gemfile')
env = { 'BUNDLE_GEMFILE' => gemfile }

Open3.capture3(
env,
RbConfig.ruby,
'-rbundler/setup',
'-I', lib_path,
'-e', script,
chdir: root
)
end

it 'loads payload concerns without gRPC' do
expect(Temporal::Concerns::Payloads).to be_a(Module)
end

it 'loads serializer constants without gRPC' do
expect(Temporal::Connection::Serializer::Failure).to be_a(Class)
end

it 'loads workflow error enums at boot' do
expect(Temporal::Workflow::Errors::WORKFLOW_ALREADY_EXISTS_SYM).not_to be_nil
end

it 'loads workflow history event class at boot' do
expect(Temporal::Workflow::History::Event).to be_a(Class)
end

it 'does not load gRPC at require time' do
script = <<~RUBY
require 'temporal'
puts(defined?(::GRPC::Core::Channel) ? 'loaded' : 'not_loaded')
RUBY

stdout, stderr, status = run_clean_ruby(script)
expect(status.success?).to be(true), "stderr: #{stderr}"
expect(stdout.strip).to eq('not_loaded')
end

it 'autoloads gRPC when the connection class is referenced' do
script = <<~RUBY
require 'temporal'
puts(defined?(::GRPC::Core::Channel) ? 'loaded_before' : 'not_loaded_before')
Temporal::Connection::GRPC
puts(defined?(::GRPC::Core::Channel) ? 'loaded_after' : 'not_loaded_after')
RUBY

stdout, stderr, status = run_clean_ruby(script)
expect(status.success?).to be(true), "stderr: #{stderr}"
expect(stdout.split("\n")).to eq(%w[not_loaded_before loaded_after])
end
end
Loading