From 4808ce4d793109bfa40c368a12dd0b44015f535c Mon Sep 17 00:00:00 2001 From: Rustam Ibragimov Date: Mon, 2 Sep 2024 14:14:42 +0300 Subject: [PATCH 1/3] add table-sync-custom-version support for the table_sync version 6.1.0 --- lib/table_sync/publishing/batch.rb | 2 +- lib/table_sync/publishing/data/objects.rb | 11 ++++++----- lib/table_sync/publishing/data/raw.rb | 7 ++++--- lib/table_sync/publishing/message/base.rb | 5 ++++- lib/table_sync/publishing/message/raw.rb | 3 ++- lib/table_sync/publishing/raw.rb | 2 +- lib/table_sync/publishing/single.rb | 2 +- 7 files changed, 19 insertions(+), 13 deletions(-) diff --git a/lib/table_sync/publishing/batch.rb b/lib/table_sync/publishing/batch.rb index e67ee1c..49cba52 100644 --- a/lib/table_sync/publishing/batch.rb +++ b/lib/table_sync/publishing/batch.rb @@ -5,7 +5,7 @@ class TableSync::Publishing::Batch attribute :object_class attribute :original_attributes - + attribute :custom_version attribute :routing_key attribute :headers diff --git a/lib/table_sync/publishing/data/objects.rb b/lib/table_sync/publishing/data/objects.rb index f1ab5e3..058638a 100644 --- a/lib/table_sync/publishing/data/objects.rb +++ b/lib/table_sync/publishing/data/objects.rb @@ -2,18 +2,19 @@ module TableSync::Publishing::Data class Objects - attr_reader :objects, :event + attr_reader :objects, :event, :custom_version - def initialize(objects:, event:) - @objects = objects - @event = TableSync::Event.new(event) + def initialize(objects:, event:, custom_version:) + @objects = objects + @event = TableSync::Event.new(event) + @custom_version = custom_version end def construct { model: model, attributes: attributes_for_sync, - version: version, + version: custom_version || version, event: event.resolve, metadata: event.metadata, } diff --git a/lib/table_sync/publishing/data/raw.rb b/lib/table_sync/publishing/data/raw.rb index f5fffdd..c5d70d4 100644 --- a/lib/table_sync/publishing/data/raw.rb +++ b/lib/table_sync/publishing/data/raw.rb @@ -2,19 +2,20 @@ module TableSync::Publishing::Data class Raw - attr_reader :model_name, :attributes_for_sync, :event + attr_reader :model_name, :attributes_for_sync, :event, :custom_version - def initialize(model_name:, attributes_for_sync:, event:) + def initialize(model_name:, attributes_for_sync:, event:, custom_version:) @model_name = model_name @attributes_for_sync = attributes_for_sync @event = TableSync::Event.new(event) + @custom_version = custom_version end def construct { model: model_name, attributes: wrapped_attributes_for_sync, - version: version, + version: custom_version || version, event: event.resolve, metadata: event.metadata, } diff --git a/lib/table_sync/publishing/message/base.rb b/lib/table_sync/publishing/message/base.rb index a2af8bc..94305b5 100644 --- a/lib/table_sync/publishing/message/base.rb +++ b/lib/table_sync/publishing/message/base.rb @@ -6,6 +6,7 @@ class Base attr_reader :objects + attribute :custom_version attribute :object_class attribute :original_attributes attribute :event @@ -44,7 +45,9 @@ def message_params def data TableSync::Publishing::Data::Objects.new( - objects: objects, event: event, + objects: objects, + event: event, + custom_version: custom_version, ).construct end diff --git a/lib/table_sync/publishing/message/raw.rb b/lib/table_sync/publishing/message/raw.rb index 661d624..244148b 100644 --- a/lib/table_sync/publishing/message/raw.rb +++ b/lib/table_sync/publishing/message/raw.rb @@ -11,7 +11,7 @@ class Raw attribute :routing_key attribute :headers - + attribute :custom_version attribute :event def publish @@ -41,6 +41,7 @@ def message_params def data TableSync::Publishing::Data::Raw.new( model_name: model_name, attributes_for_sync: original_attributes, event: event, + custom_version: custom_version, ).construct end diff --git a/lib/table_sync/publishing/raw.rb b/lib/table_sync/publishing/raw.rb index 5aa09cd..ed678a1 100644 --- a/lib/table_sync/publishing/raw.rb +++ b/lib/table_sync/publishing/raw.rb @@ -7,7 +7,7 @@ class TableSync::Publishing::Raw attribute :table_name attribute :schema_name attribute :original_attributes - + attribute :custom_version attribute :routing_key attribute :headers diff --git a/lib/table_sync/publishing/single.rb b/lib/table_sync/publishing/single.rb index 063b6fa..b46ba57 100644 --- a/lib/table_sync/publishing/single.rb +++ b/lib/table_sync/publishing/single.rb @@ -7,7 +7,7 @@ class TableSync::Publishing::Single attribute :object_class attribute :original_attributes attribute :debounce_time - + attribute :custom_version attribute :event, Symbol, default: :update # expect job to have perform_at method From 2aecb8142c710f6733f1e5f16e152a47f329665e Mon Sep 17 00:00:00 2001 From: Sergey Sta Date: Wed, 25 Sep 2024 16:02:19 +0300 Subject: [PATCH 2/3] return back model_naming method with small update and use it as fallback in case of missing table_name & schema_name --- lib/table_sync/publishing/message/raw.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/table_sync/publishing/message/raw.rb b/lib/table_sync/publishing/message/raw.rb index 244148b..74eac5a 100644 --- a/lib/table_sync/publishing/message/raw.rb +++ b/lib/table_sync/publishing/message/raw.rb @@ -24,14 +24,18 @@ def publish def notify! TableSync::Instrument.notify( - table: table_name, - schema: schema_name, + table: table_name || model_naming.table, + schema: schema_name || model_naming.schema, event: event, count: original_attributes.count, direction: :publish, ) end + def model_naming + TableSync.publishing_adapter.model_naming(model_name.constantize) + end + # MESSAGE PARAMS def message_params From b51eb77559f03b5dfc68b3812756fe490b42429e Mon Sep 17 00:00:00 2001 From: Vladislav Shved Date: Fri, 29 Nov 2024 14:14:26 +0300 Subject: [PATCH 3/3] Fix sort in receiving --- lib/table_sync/receiving/handler.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/table_sync/receiving/handler.rb b/lib/table_sync/receiving/handler.rb index e536085..29d5c0b 100644 --- a/lib/table_sync/receiving/handler.rb +++ b/lib/table_sync/receiving/handler.rb @@ -23,7 +23,7 @@ def call validate_data(data, target_keys: target_keys) - data.sort_by! { |row| row.values_at(*target_keys).to_s } + data.sort_by! { |row| row.values_at(*target_keys).map { |value| sort_key(value) } } params = { data: data, target_keys: target_keys, version_key: version_key } @@ -149,4 +149,8 @@ def perform(config, params) end end end + + def sort_key(value) + value.is_a?(Comparable) ? value : value.to_s + end end