diff --git a/CHANGELOG.md b/CHANGELOG.md index a3995c5..b6bc618 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [6.11.0] - 2026-03-25 +### Added +- Ability to specify option for compression when publish data + ## [6.10.0] - 2025-11-21 ### Added - Add on_first_sync callback diff --git a/Gemfile b/Gemfile index 7f4b456..1ef87d6 100644 --- a/Gemfile +++ b/Gemfile @@ -19,3 +19,5 @@ gem "bundler" gem "ostruct" gem "pry" gem "rake" + +gem "rabbit_messaging", git: "https://github.com/umbrellio/rabbit_messaging.git", branch: "compression-for-publisher-and-reciver" diff --git a/Gemfile.lock b/Gemfile.lock index daafac9..5cdd3e2 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,9 +1,20 @@ +GIT + remote: https://github.com/umbrellio/rabbit_messaging.git + revision: 8d95a63bf23923ba779df004cab5c6c09ceec1c0 + branch: compression-for-publisher-and-reciver + specs: + rabbit_messaging (1.8.0) + bunny (~> 2.0) + kicks + msgpack + zlib + PATH remote: . specs: - table_sync (6.10.0) + table_sync (6.11.0) memery - rabbit_messaging (>= 1.7.0) + rabbit_messaging (>= 1.8.0) rails self_data @@ -81,9 +92,9 @@ GEM minitest (>= 5.1) securerandom (>= 0.3) tzinfo (~> 2.0, >= 2.0.5) - amq-protocol (2.3.4) + amq-protocol (2.7.0) ast (2.4.3) - base64 (0.2.0) + base64 (0.3.0) benchmark (0.4.0) bigdecimal (3.1.9) builder (3.3.0) @@ -92,7 +103,7 @@ GEM sorted_set (~> 1, >= 1.0.2) cgi (0.5.0) coderay (1.1.3) - concurrent-ruby (1.3.5) + concurrent-ruby (1.3.6) connection_pool (2.5.3) crass (1.0.6) date (3.5.0) @@ -112,8 +123,8 @@ GEM rdoc (>= 4.0.0) reline (>= 0.4.2) json (2.12.2) - kicks (3.2.0) - bunny (~> 2.19) + kicks (3.3.0) + bunny (~> 2.24) concurrent-ruby (~> 1.0) rake (>= 12.3, < 14.0) serverengine (~> 2.1) @@ -136,6 +147,7 @@ GEM mini_mime (1.1.5) mini_portile2 (2.8.9) minitest (5.25.5) + msgpack (1.8.0) net-imap (0.5.12) date net-protocol @@ -181,9 +193,6 @@ GEM psych (5.2.6) date stringio - rabbit_messaging (1.7.0) - bunny (~> 2.0) - kicks racc (1.8.1) rack (3.1.15) rack-session (2.1.1) @@ -223,7 +232,7 @@ GEM thor (~> 1.0, >= 1.2.2) zeitwerk (~> 2.6) rainbow (3.1.1) - rake (13.2.1) + rake (13.3.1) rbtree (0.4.6) rdoc (6.15.1) erb @@ -298,7 +307,6 @@ GEM base64 (~> 0.1) logger (~> 1.4) sigdump (~> 0.2.2) - set (1.1.2) sigdump (0.2.5) simplecov (0.22.0) docile (~> 1.1) @@ -307,11 +315,10 @@ GEM simplecov-html (0.13.1) simplecov-lcov (0.8.0) simplecov_json_formatter (0.1.4) - sorted_set (1.0.3) + sorted_set (1.1.0) rbtree - set (~> 1.0) stringio (3.1.8) - thor (1.4.0) + thor (1.5.0) timecop (0.9.10) timeout (0.4.3) tsort (0.2.0) @@ -326,6 +333,7 @@ GEM websocket-extensions (>= 0.1.0) websocket-extensions (0.1.5) zeitwerk (2.6.18) + zlib (3.2.3) PLATFORMS aarch64-linux-gnu @@ -345,6 +353,7 @@ DEPENDENCIES ostruct pg pry + rabbit_messaging! rake rspec rubocop-config-umbrellio diff --git a/lib/table_sync.rb b/lib/table_sync.rb index 0ac8aec..52441d1 100644 --- a/lib/table_sync.rb +++ b/lib/table_sync.rb @@ -52,6 +52,7 @@ def sync(object_class, **options) if_condition: options[:if], unless_condition: options[:unless], debounce_time: options[:debounce_time], + compress: options.fetch(:compress, false), ).register_callbacks end diff --git a/lib/table_sync/instrument_adapter/active_support.rb b/lib/table_sync/instrument_adapter/active_support.rb index 8d60a34..5287e21 100644 --- a/lib/table_sync/instrument_adapter/active_support.rb +++ b/lib/table_sync/instrument_adapter/active_support.rb @@ -4,13 +4,21 @@ module TableSync::InstrumentAdapter module ActiveSupport module_function - def notify(table:, schema:, event:, direction:, count: 1) + def notify( # rubocop:disable Metrics/ParameterLists + table:, + schema:, + event:, + direction:, + count: 1, + compress: false + ) ::ActiveSupport::Notifications.instrument "tablesync.#{direction}.#{event}", count:, table: table.to_s, schema: schema.to_s, event:, - direction: + direction:, + compress: end end end diff --git a/lib/table_sync/publishing/batch.rb b/lib/table_sync/publishing/batch.rb index 19801d6..5272c0c 100644 --- a/lib/table_sync/publishing/batch.rb +++ b/lib/table_sync/publishing/batch.rb @@ -6,7 +6,8 @@ class TableSync::Publishing::Batch :custom_version, :routing_key, :headers, - :event + :event, + :compress def initialize(attrs = {}) attrs = attrs.with_indifferent_access @@ -15,8 +16,9 @@ def initialize(attrs = {}) self.original_attributes = attrs[:original_attributes] self.custom_version = attrs[:custom_version] self.routing_key = attrs[:routing_key] - self.headers = attrs[:headers] + self.headers = attrs[:headers] || {} self.event = attrs.fetch(:event, :update).to_sym + self.compress = attrs.fetch(:compress, false) validate_required_attributes! end @@ -53,7 +55,7 @@ def attributes original_attributes: original_attributes, custom_version: custom_version, routing_key: routing_key, - headers: headers, + headers: headers.merge(compress: compress), event: event, } end diff --git a/lib/table_sync/publishing/message/base.rb b/lib/table_sync/publishing/message/base.rb index e0fb5e6..52c86ae 100644 --- a/lib/table_sync/publishing/message/base.rb +++ b/lib/table_sync/publishing/message/base.rb @@ -5,7 +5,8 @@ class Base attr_accessor :custom_version, :object_class, :original_attributes, - :event + :event, + :compress attr_reader :objects @@ -14,6 +15,7 @@ def initialize(params = {}) self.object_class = params[:object_class] self.original_attributes = params[:original_attributes] self.event = params[:event].to_sym + self.compress = params.dig(:headers, :compress) || false @objects = find_or_init_objects @@ -65,6 +67,7 @@ def notify! table: model_naming.table, schema: model_naming.schema, event:, + compress:, direction: :publish, count: objects.count, ) diff --git a/lib/table_sync/publishing/message/raw.rb b/lib/table_sync/publishing/message/raw.rb index 6704312..bfa0581 100644 --- a/lib/table_sync/publishing/message/raw.rb +++ b/lib/table_sync/publishing/message/raw.rb @@ -9,7 +9,8 @@ class Raw :routing_key, :headers, :custom_version, - :event + :event, + :compress def initialize(params = {}) self.model_name = params[:model_name] @@ -20,6 +21,7 @@ def initialize(params = {}) self.headers = params[:headers] self.custom_version = params[:custom_version] self.event = params[:event] + self.compress = params.dig(:headers, :compress) || false end def publish @@ -35,6 +37,7 @@ def notify! table: table_name, schema: schema_name, event:, + compress: compress, count: original_attributes.count, direction: :publish, ) diff --git a/lib/table_sync/publishing/message/single.rb b/lib/table_sync/publishing/message/single.rb index 40ae8ab..f166e1b 100644 --- a/lib/table_sync/publishing/message/single.rb +++ b/lib/table_sync/publishing/message/single.rb @@ -2,12 +2,20 @@ module TableSync::Publishing::Message class Single < Base + attr_accessor :headers + + def initialize(params = {}) + super + + self.headers = params[:headers] + end + def object objects.first end def params - TableSync::Publishing::Params::Single.new(object:).construct + TableSync::Publishing::Params::Single.new(object:, headers:).construct end end end diff --git a/lib/table_sync/publishing/params/single.rb b/lib/table_sync/publishing/params/single.rb index 3c5e733..a88e57f 100644 --- a/lib/table_sync/publishing/params/single.rb +++ b/lib/table_sync/publishing/params/single.rb @@ -4,10 +4,10 @@ module TableSync::Publishing::Params class Single < Base attr_reader :object, :routing_key, :headers - def initialize(object:) + def initialize(object:, headers: {}) @object = object @routing_key = calculated_routing_key - @headers = calculated_headers + @headers = headers.merge(calculated_headers || {}) end private diff --git a/lib/table_sync/publishing/raw.rb b/lib/table_sync/publishing/raw.rb index 25cdbd8..97dbfef 100644 --- a/lib/table_sync/publishing/raw.rb +++ b/lib/table_sync/publishing/raw.rb @@ -10,7 +10,8 @@ class TableSync::Publishing::Raw :custom_version, :routing_key, :headers, - :event + :event, + :compress def initialize(attributes = {}) attributes = attributes.with_indifferent_access @@ -21,8 +22,9 @@ def initialize(attributes = {}) self.original_attributes = attributes[:original_attributes] self.custom_version = attributes[:custom_version] self.routing_key = attributes[:routing_key] - self.headers = attributes[:headers] + self.headers = attributes[:headers] || {} self.event = attributes.fetch(:event, :update).to_sym + self.compress = attributes.fetch(:compress, false) end require_attributes :model_name, :original_attributes @@ -45,7 +47,7 @@ def attributes original_attributes: original_attributes, custom_version: custom_version, routing_key: routing_key, - headers: headers, + headers: headers.merge(compress: compress), event: event, } end diff --git a/lib/table_sync/publishing/single.rb b/lib/table_sync/publishing/single.rb index a55ca6b..5c6a0d6 100644 --- a/lib/table_sync/publishing/single.rb +++ b/lib/table_sync/publishing/single.rb @@ -7,7 +7,8 @@ class TableSync::Publishing::Single :original_attributes, :debounce_time, :custom_version, - :event + :event, + :compress def initialize(attrs = {}) attrs = attrs.with_indifferent_access @@ -15,6 +16,7 @@ def initialize(attrs = {}) self.object_class = attrs[:object_class] self.original_attributes = attrs[:original_attributes] self.debounce_time = attrs[:debounce_time] + self.compress = attrs.fetch(:compress, false) self.custom_version = attrs[:custom_version] self.event = attrs.fetch(:event, :update) end @@ -55,6 +57,7 @@ def attributes original_attributes: original_attributes, debounce_time: debounce_time, custom_version: custom_version, + headers: { compress: compress }, event: event, } end diff --git a/lib/table_sync/setup/active_record.rb b/lib/table_sync/setup/active_record.rb index 531f3b7..c8c493b 100644 --- a/lib/table_sync/setup/active_record.rb +++ b/lib/table_sync/setup/active_record.rb @@ -16,6 +16,7 @@ def define_after_commit(event) original_attributes: attributes, event:, debounce_time: options[:debounce_time], + compress: options.fetch(:compress, false), ).publish_later end end diff --git a/lib/table_sync/setup/base.rb b/lib/table_sync/setup/base.rb index 7e0f445..e9fa170 100644 --- a/lib/table_sync/setup/base.rb +++ b/lib/table_sync/setup/base.rb @@ -6,7 +6,7 @@ class Base INVALID_EVENT = Class.new(StandardError) INVALID_CONDITION = Class.new(StandardError) - attr_accessor :object_class, :debounce_time, :on, :if_condition, :unless_condition + attr_accessor :object_class, :debounce_time, :on, :if_condition, :unless_condition, :compress def initialize(attrs = {}) attrs.each do |key, value| @@ -57,6 +57,7 @@ def options_exposed_for_block if: if_condition, unless: unless_condition, debounce_time:, + compress: compress, } end end diff --git a/lib/table_sync/setup/sequel.rb b/lib/table_sync/setup/sequel.rb index c3e61cf..e908a8d 100644 --- a/lib/table_sync/setup/sequel.rb +++ b/lib/table_sync/setup/sequel.rb @@ -15,6 +15,7 @@ def define_after_commit(event) original_attributes: values, event:, debounce_time: options[:debounce_time], + compress: options.fetch(:compress, false), ).publish_later end end diff --git a/lib/table_sync/version.rb b/lib/table_sync/version.rb index bee1ab1..685f4f1 100644 --- a/lib/table_sync/version.rb +++ b/lib/table_sync/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module TableSync - VERSION = "6.10.0" + VERSION = "6.11.0" end diff --git a/spec/publishing/batch_spec.rb b/spec/publishing/batch_spec.rb index de14a8f..f0ed8b7 100644 --- a/spec/publishing/batch_spec.rb +++ b/spec/publishing/batch_spec.rb @@ -10,7 +10,8 @@ let(:object_class) { "ARecordUser" } let(:routing_key) { object_class.tableize } let(:expected_routing_key) { "a_record_users" } - let(:headers) { { klass: object_class } } + let(:headers) { { compress: compress, klass: object_class } } + let(:compress) { false } let(:attributes) do { @@ -22,6 +23,9 @@ custom_version: nil, } end + let(:attributes_for_instance) do + attributes.merge(compress: compress) + end it_behaves_like "publisher#publish_now with stubbed message", TableSync::Publishing::Message::Batch @@ -29,6 +33,16 @@ TableSync::Publishing::Batch, %i[object_class original_attributes] + context "when compress option has been provided" do + let(:compress) { true } + + it_behaves_like "publisher#publish_now with stubbed message", + TableSync::Publishing::Message::Batch + it_behaves_like "publisher#new without expected fields", + TableSync::Publishing::Batch, + %i[object_class original_attributes] + end + context "real user" do context "sequel" do let(:object_class) { "SequelUser" } diff --git a/spec/publishing/message/batch_spec.rb b/spec/publishing/message/batch_spec.rb index a4906e6..3eae81b 100644 --- a/spec/publishing/message/batch_spec.rb +++ b/spec/publishing/message/batch_spec.rb @@ -9,10 +9,11 @@ object_class:, original_attributes: [{ id: 1 }], routing_key: "users", - headers: { kek: 1 }, + headers:, event:, } end + let(:headers) { { kek: 1 } } let(:object_class) { "ARecordUser" } @@ -66,6 +67,20 @@ described_class.new(attributes).publish end + + context "when compress option has been specified" do + let(:headers) { super().merge(compress: true) } + + it "calls data and params with correct attrs" do + expect(data_class).to receive(:new).with(data_attributes) + expect(params_class).to receive(:new).with(params_attributes) + + expect(data).to receive(:construct) + expect(params).to receive(:construct) + + described_class.new(attributes).publish + end + end end it "calls Rabbit#publish" do diff --git a/spec/publishing/message/raw_spec.rb b/spec/publishing/message/raw_spec.rb index 2bcbd36..86a3daa 100644 --- a/spec/publishing/message/raw_spec.rb +++ b/spec/publishing/message/raw_spec.rb @@ -7,11 +7,12 @@ model_name: "User", original_attributes: [{ id: 1 }], routing_key: "users", - headers: { kek: 1 }, + headers:, event: :update, custom_version: nil, } end + let(:headers) { { kek: 1 } } context "with stubbed data and params" do let(:data_class) { TableSync::Publishing::Data::Raw } @@ -54,6 +55,20 @@ described_class.new(attributes).publish end + + context "when compress option has been specified" do + let(:headers) { super().merge(compress: true) } + + it "calls data and params with correct attrs" do + expect(data_class).to receive(:new).with(data_attributes) + expect(params_class).to receive(:new).with(params_attributes) + + expect(data).to receive(:construct) + expect(params).to receive(:construct) + + described_class.new(attributes).publish + end + end end it "calls Rabbit#publish" do diff --git a/spec/publishing/message/single_spec.rb b/spec/publishing/message/single_spec.rb index 58b87d1..30be50b 100644 --- a/spec/publishing/message/single_spec.rb +++ b/spec/publishing/message/single_spec.rb @@ -11,11 +11,12 @@ object_class:, original_attributes: [{ id: 1 }], routing_key: "users", - headers: { kek: 1 }, + headers:, event:, custom_version: nil, } end + let(:headers) { { kek: 1 } } let(:object_class) { "ARecordUser" } @@ -41,6 +42,7 @@ let(:params_attributes) do { object:, + headers:, } end @@ -65,6 +67,20 @@ described_class.new(attributes).publish end + + context "when compress option has been specified" do + let(:headers) { super().merge(compress: true) } + + it "calls data and params with correct attrs" do + expect(data_class).to receive(:new).with(data_attributes) + expect(params_class).to receive(:new).with(params_attributes) + + expect(data).to receive(:construct) + expect(params).to receive(:construct) + + described_class.new(attributes).publish + end + end end it "calls Rabbit#publish" do diff --git a/spec/publishing/params/single_spec.rb b/spec/publishing/params/single_spec.rb index 8a69070..203af2c 100644 --- a/spec/publishing/params/single_spec.rb +++ b/spec/publishing/params/single_spec.rb @@ -3,7 +3,7 @@ describe TableSync::Publishing::Params::Single do let(:object_class) { "ARecordUser" } let(:attributes) { default_attributes } - let(:default_attributes) { { object: } } + let(:default_attributes) { { object:, headers: } } let(:service) { described_class.new(**attributes) } let(:object) do @@ -17,9 +17,13 @@ confirm_select: true, realtime: true, event: :table_sync, + headers: headers, } end + let(:headers) { { compress: compress } } + let(:compress) { false } + shared_examples "constructs with expected values" do specify do expect(service.construct).to include(expected_values) @@ -40,6 +44,7 @@ describe "#construct" do context "default params" do + let(:headers) { super().merge(klass: "ARecordUser") } let(:expected_values) { default_expected_values } it_behaves_like "constructs with expected values" @@ -48,7 +53,7 @@ context "headers" do context "calculated" do let(:expected_values) do - default_expected_values.merge(headers: { object_class: }) + default_expected_values.merge(headers: { object_class:, compress: }) end before do @@ -56,6 +61,12 @@ end it_behaves_like "constructs with expected values" + + context "when message has been compressed" do + let(:compress) { true } + + it_behaves_like "constructs with expected values" + end end context "without headers callable" do @@ -92,6 +103,7 @@ let(:expected_values) do default_expected_values.merge(routing_key: object_class) end + let(:headers) { super().merge(klass: "ARecordUser") } before do TableSync.routing_key_callable = -> (object_class, _atrs) { object_class } @@ -135,6 +147,7 @@ context "by default" do let(:exchange_name) { "some.project.table_sync" } let(:expected_values) { default_expected_values.merge(exchange_name:) } + let(:headers) { super().merge(klass: "ARecordUser") } before { TableSync.exchange_name = exchange_name } diff --git a/spec/publishing/raw_spec.rb b/spec/publishing/raw_spec.rb index c52b0f3..99d68a3 100644 --- a/spec/publishing/raw_spec.rb +++ b/spec/publishing/raw_spec.rb @@ -6,10 +6,11 @@ let(:event) { :update } let(:routing_key) { "custom_routing_key" } let(:expected_routing_key) { "custom_routing_key" } - let(:headers) { { some_key: "123" } } + let(:headers) { { compress: compress, some_key: "123" } } let(:original_attributes) { [{ id: 1, name: "purum" }] } let(:table_name) { "sequel_users" } let(:schema_name) { "public" } + let(:compress) { false } let(:attributes) do { @@ -23,6 +24,9 @@ custom_version: nil, } end + let(:attributes_for_instance) do + attributes.merge(compress: compress) + end let(:expected_object_data) { original_attributes } @@ -32,6 +36,16 @@ it_behaves_like "publisher#publish_now without stubbed message", TableSync::Publishing::Message::Raw + context "when compress option has been provided" do + let(:compress) { true } + + it_behaves_like "publisher#publish_now with stubbed message", + TableSync::Publishing::Message::Raw + + it_behaves_like "publisher#publish_now without stubbed message", + TableSync::Publishing::Message::Raw + end + context "when routing_key is nil" do let(:routing_key) { nil } let(:expected_routing_key) { "sequel_users" } diff --git a/spec/publishing/single_spec.rb b/spec/publishing/single_spec.rb index d674979..16bdb62 100644 --- a/spec/publishing/single_spec.rb +++ b/spec/publishing/single_spec.rb @@ -9,20 +9,32 @@ let(:object_class) { "ARecordUser" } let(:routing_key) { object_class.tableize } let(:expected_routing_key) { "a_record_users" } - let(:headers) { { klass: object_class } } + let(:headers) { { compress: compress, klass: object_class } } let(:debounce_time) { 30 } + let(:compress) { false } let(:attributes) do { object_class:, original_attributes:, event:, + headers: { compress: compress }, debounce_time:, custom_version: nil, } end + let(:attributes_for_instance) do + attributes.merge(compress: compress) + end describe "#publish_now" do + context "when compress option has been provided" do + let(:compress) { true } + + it_behaves_like "publisher#publish_now with stubbed message", + TableSync::Publishing::Message::Single + end + it_behaves_like "publisher#publish_now with stubbed message", TableSync::Publishing::Message::Single diff --git a/spec/support/shared/publishers.rb b/spec/support/shared/publishers.rb index c2b2d93..8b4e8d9 100644 --- a/spec/support/shared/publishers.rb +++ b/spec/support/shared/publishers.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true # needs let(:attributes) +# needs let(:attributes_for_instance) shared_examples "publisher#publish_now with stubbed message" do |message_class| describe "#publish_now" do context "with stubbed message" do @@ -17,13 +18,14 @@ expect(message_class).to receive(:new).with(attributes) expect(message_double).to receive(:publish) - described_class.new(attributes).publish_now + described_class.new(attributes_for_instance).publish_now end end end end # needs let(:attributes) +# needs let(:attributes_for_instance) # needs let(:object_class) - String # needs let(:expected_object_data) # needs let(:headers) @@ -48,7 +50,7 @@ it "calls Rabbit#publish with attributes" do expect(Rabbit).to receive(:publish).with(rabbit_params) - described_class.new(attributes).publish_now + described_class.new(attributes_for_instance).publish_now end end end diff --git a/table_sync.gemspec b/table_sync.gemspec index dc01905..6f1a8c7 100644 --- a/table_sync.gemspec +++ b/table_sync.gemspec @@ -27,7 +27,7 @@ Gem::Specification.new do |spec| end spec.add_dependency "memery" - spec.add_dependency "rabbit_messaging", ">= 1.7.0" + spec.add_dependency "rabbit_messaging", ">= 1.8.0" spec.add_dependency "rails" spec.add_dependency "self_data" end