From 020b3da90a7a2a93ee1d9cefd8c1d3940d72dc6b Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 13 May 2026 17:22:36 +0200 Subject: [PATCH 01/22] Fix unit for configUpdatedAt is seconds --- lib/aikido/zen/api_client.rb | 2 +- lib/aikido/zen/runtime_settings.rb | 2 +- test/aikido/zen/agent_test.rb | 4 ++-- test/aikido/zen/api_client_test.rb | 6 +++--- test/aikido/zen/runtime_settings_test.rb | 10 +++++----- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/aikido/zen/api_client.rb b/lib/aikido/zen/api_client.rb index a2418ea3..b6f258a3 100644 --- a/lib/aikido/zen/api_client.rb +++ b/lib/aikido/zen/api_client.rb @@ -41,7 +41,7 @@ def should_fetch_settings?(last_updated_at = Aikido::Zen.runtime_settings.update base_url: @config.realtime_endpoint ) - new_updated_at = Time.at(response["configUpdatedAt"].to_i / 1000) + new_updated_at = Time.at(response["configUpdatedAt"].to_i) new_updated_at > last_updated_at end diff --git a/lib/aikido/zen/runtime_settings.rb b/lib/aikido/zen/runtime_settings.rb index 12d9d093..cf21cbb5 100644 --- a/lib/aikido/zen/runtime_settings.rb +++ b/lib/aikido/zen/runtime_settings.rb @@ -84,7 +84,7 @@ def initialize(*) def update_from_runtime_config_json(data) last_updated_at = updated_at - self.updated_at = Time.at(data["configUpdatedAt"].to_i / 1000) + self.updated_at = Time.at(data["configUpdatedAt"].to_i) self.heartbeat_interval = data["heartbeatIntervalInMS"].to_i / 1000 self.endpoints = RuntimeSettings::Endpoints.from_json(data["endpoints"]) self.blocked_user_ids = data["blockedUserIds"] diff --git a/test/aikido/zen/agent_test.rb b/test/aikido/zen/agent_test.rb index 7580c626..7eee8461 100644 --- a/test/aikido/zen/agent_test.rb +++ b/test/aikido/zen/agent_test.rb @@ -112,7 +112,7 @@ def report(event) test "#start! takes the response of the STARTED event as runtime settings" do @api_client.expect :report, - {"configUpdatedAt" => 1234567890000}, + {"configUpdatedAt" => 1234567890}, [Aikido::Zen::Events::Started] assert_changes -> { Aikido::Zen.runtime_settings.updated_at }, to: Time.at(1234567890) do @@ -152,7 +152,7 @@ def @api_client.report(event) test "#start! updates the runtime settings after polling if needed" do @api_client.expect :should_fetch_settings?, true - @api_client.expect :fetch_runtime_config, {"configUpdatedAt" => 1234567890000} + @api_client.expect :fetch_runtime_config, {"configUpdatedAt" => 1234567890} assert_changes -> { Aikido::Zen.runtime_settings.updated_at }, to: Time.at(1234567890) do @agent.start! diff --git a/test/aikido/zen/api_client_test.rb b/test/aikido/zen/api_client_test.rb index 202a7184..fd64f69e 100644 --- a/test/aikido/zen/api_client_test.rb +++ b/test/aikido/zen/api_client_test.rb @@ -83,7 +83,7 @@ class CheckIfStaleConfigTest < ActiveSupport::TestCase test "returns false if the updated_at from the server is the same or older than the one we have" do stub_request(:get, "https://runtime.aikido.dev/config") - .to_return(status: 200, body: JSON.dump(configUpdatedAt: 1234567890000)) + .to_return(status: 200, body: JSON.dump(configUpdatedAt: 1234567890)) Aikido::Zen.runtime_settings.updated_at = Time.at(1234567890) assert_not @client.should_fetch_settings? @@ -94,7 +94,7 @@ class CheckIfStaleConfigTest < ActiveSupport::TestCase test "returns true if the updated_at from the server is newer than the one we have" do stub_request(:get, "https://runtime.aikido.dev/config") - .to_return(status: 200, body: JSON.dump(configUpdatedAt: 1234567890000)) + .to_return(status: 200, body: JSON.dump(configUpdatedAt: 1234567890)) Aikido::Zen.runtime_settings.updated_at = Time.at(1234567890 - 1) assert @client.should_fetch_settings? @@ -102,7 +102,7 @@ class CheckIfStaleConfigTest < ActiveSupport::TestCase test "sets the User-Agent on the request" do stub_request(:get, "https://runtime.aikido.dev/config") - .to_return(status: 200, body: JSON.dump(configUpdatedAt: 1234567890000)) + .to_return(status: 200, body: JSON.dump(configUpdatedAt: 1234567890)) @client.should_fetch_settings? diff --git a/test/aikido/zen/runtime_settings_test.rb b/test/aikido/zen/runtime_settings_test.rb index 8b39ec1a..b127487d 100644 --- a/test/aikido/zen/runtime_settings_test.rb +++ b/test/aikido/zen/runtime_settings_test.rb @@ -11,7 +11,7 @@ class Aikido::Zen::RuntimeSettingsTest < ActiveSupport::TestCase assert @settings.update_from_runtime_config_json({ "success" => true, "serviceId" => 1234, - "configUpdatedAt" => 1717171717000, + "configUpdatedAt" => 1717171717, "heartbeatIntervalInMS" => 60000, "endpoints" => [], "blockedUserIds" => [], @@ -60,7 +60,7 @@ class Aikido::Zen::RuntimeSettingsTest < ActiveSupport::TestCase assert @settings.update_from_runtime_config_json({ "success" => true, "serviceId" => 1234, - "configUpdatedAt" => 1717171717000, + "configUpdatedAt" => 1717171717, "heartbeatIntervalInMS" => 60000, "endpoints" => [], "blockedUserIds" => [], @@ -81,7 +81,7 @@ class Aikido::Zen::RuntimeSettingsTest < ActiveSupport::TestCase payload = { "success" => true, "serviceId" => 1234, - "configUpdatedAt" => 1717171717000, + "configUpdatedAt" => 1717171717, "heartbeatIntervalInMS" => 60000, "endpoints" => [], "blockedUserIds" => [], @@ -197,7 +197,7 @@ class Aikido::Zen::RuntimeSettingsTest < ActiveSupport::TestCase assert @settings.update_from_runtime_config_json({ "success" => true, "serviceId" => 1234, - "configUpdatedAt" => 1717171717000, + "configUpdatedAt" => 1717171717, "heartbeatIntervalInMS" => 60000, "endpoints" => [ { @@ -265,7 +265,7 @@ class Aikido::Zen::RuntimeSettingsTest < ActiveSupport::TestCase assert @settings.update_from_runtime_config_json({ "success" => true, "serviceId" => 1234, - "configUpdatedAt" => 1717171717000, + "configUpdatedAt" => 1717171717, "heartbeatIntervalInMS" => 60000, "endpoints" => [], "blockedUserIds" => [], From dca80c6dd9cd54fe2a26bfe446dd25c63b589be0 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Tue, 12 May 2026 16:52:57 +0200 Subject: [PATCH 02/22] Add POSTGRES_PORT environment variable override for port in PostgreSQL tests --- test/aikido/zen/sinks/pg_test.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/aikido/zen/sinks/pg_test.rb b/test/aikido/zen/sinks/pg_test.rb index aa2d65a1..dc8e2188 100644 --- a/test/aikido/zen/sinks/pg_test.rb +++ b/test/aikido/zen/sinks/pg_test.rb @@ -9,6 +9,7 @@ class Aikido::Zen::Sinks::PGTest < ActiveSupport::TestCase setup do @db = PG.connect( host: ENV.fetch("POSTGRES_HOST", "127.0.0.1"), + port: ENV.fetch("POSTGRES_PORT", "5432"), user: ENV.fetch("POSTGRES_USERNAME", "postgres"), password: ENV.fetch("POSTGRES_PASSWORD", "password"), dbname: ENV.fetch("POSTGRES_DATABASE", "postgres") @@ -237,6 +238,7 @@ def with_mocked_protector(params = nil) setup do @db = PG.connect( host: ENV.fetch("POSTGRES_HOST", "127.0.0.1"), + port: ENV.fetch("POSTGRES_PORT", "5432"), user: ENV.fetch("POSTGRES_USERNAME", "postgres"), password: ENV.fetch("POSTGRES_PASSWORD", "password"), dbname: ENV.fetch("POSTGRES_DATABASE", "postgres") From ee99854c66f14db6813d94763c33642769f7ee98 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Tue, 12 May 2026 17:00:24 +0200 Subject: [PATCH 03/22] Add MYSQL_PORT environment variable override for port in MySQL tests --- test/aikido/zen/sinks/mysql2_test.rb | 2 ++ test/aikido/zen/sinks/trilogy_test.rb | 2 ++ 2 files changed, 4 insertions(+) diff --git a/test/aikido/zen/sinks/mysql2_test.rb b/test/aikido/zen/sinks/mysql2_test.rb index 9776325d..b4ff437a 100644 --- a/test/aikido/zen/sinks/mysql2_test.rb +++ b/test/aikido/zen/sinks/mysql2_test.rb @@ -9,6 +9,7 @@ class Aikido::Zen::Sinks::Mysql2Test < ActiveSupport::TestCase setup do @db = Mysql2::Client.new( host: ENV.fetch("MYSQL_HOST", "127.0.0.1"), + port: ENV.fetch("MYSQL_PORT", "3306"), username: ENV.fetch("MYSQL_USERNAME", "root"), password: ENV.fetch("MYSQL_PASSWORD", "") ) @@ -75,6 +76,7 @@ def with_mocked_protector(params = nil) setup do @db = Mysql2::Client.new( host: ENV.fetch("MYSQL_HOST", "127.0.0.1"), + port: ENV.fetch("MYSQL_PORT", "3306"), username: ENV.fetch("MYSQL_USERNAME", "root"), password: ENV.fetch("MYSQL_PASSWORD", "") ) diff --git a/test/aikido/zen/sinks/trilogy_test.rb b/test/aikido/zen/sinks/trilogy_test.rb index e8cb32f3..99d29205 100644 --- a/test/aikido/zen/sinks/trilogy_test.rb +++ b/test/aikido/zen/sinks/trilogy_test.rb @@ -9,6 +9,7 @@ class Aikido::Zen::Sinks::TrilogyTest < ActiveSupport::TestCase setup do @db = Trilogy.new( host: ENV.fetch("MYSQL_HOST", "127.0.0.1"), + port: ENV.fetch("MYSQL_PORT", "3306"), username: ENV.fetch("MYSQL_USERNAME", "root"), password: ENV.fetch("MYSQL_PASSWORD", "") ) @@ -75,6 +76,7 @@ def with_mocked_protector(params = nil) setup do @db = Trilogy.new( host: ENV.fetch("MYSQL_HOST", "127.0.0.1"), + port: ENV.fetch("MYSQL_PORT", "3306"), username: ENV.fetch("MYSQL_USERNAME", "root"), password: ENV.fetch("MYSQL_PASSWORD", "") ) From 9e497c85bb0f0f99755f64b1a58a9210e6e22e7e Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 13 May 2026 14:54:04 +0200 Subject: [PATCH 04/22] Return Boolean from update settings methods To detect whether the update method completed. --- lib/aikido/zen/runtime_settings.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/aikido/zen/runtime_settings.rb b/lib/aikido/zen/runtime_settings.rb index cf21cbb5..8c1a480d 100644 --- a/lib/aikido/zen/runtime_settings.rb +++ b/lib/aikido/zen/runtime_settings.rb @@ -80,7 +80,7 @@ def initialize(*) # # @param data [Hash] the decoded JSON payload from the /api/runtime/config # API endpoint. - # @return [bool] + # @return [Boolean] def update_from_runtime_config_json(data) last_updated_at = updated_at @@ -105,7 +105,7 @@ def update_from_runtime_config_json(data) # # @param data [Hash] the decoded JSON payload from the /api/runtime/firewall/lists # API endpoint. - # @return [void] + # @return [Boolean] def update_from_runtime_firewall_lists_json(data) self.blocked_user_agent_regexp = pattern(data["blockedUserAgents"]) @@ -142,6 +142,8 @@ def update_from_runtime_firewall_lists_json(data) data["monitoredIPAddresses"]&.each do |ip_list| monitored_ip_lists << RuntimeSettings::IPList.from_json(ip_list) end + + true end # Construct a regular expression from the non-nil and non-empty string, From 63348d19dc178eef67f086f35d1639452ebc07ae Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 13 May 2026 14:10:28 +0200 Subject: [PATCH 05/22] Add updater to update settings An updater method returns early if the updater is already updating. --- lib/aikido/zen/agent.rb | 58 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/lib/aikido/zen/agent.rb b/lib/aikido/zen/agent.rb index 484d9ff1..88730ad6 100644 --- a/lib/aikido/zen/agent.rb +++ b/lib/aikido/zen/agent.rb @@ -59,7 +59,7 @@ def start! at_exit { stop! if started? } report(Events::Started.new(time: @started_at)) do |response| - if Aikido::Zen.runtime_settings.update_from_runtime_config_json(response) + if update_settings_from_runtime_config!(response) updated_settings! @config.logger.info("Updated runtime settings") end @@ -68,7 +68,7 @@ def start! end begin - Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists) + update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists) @config.logger.info("Updated runtime firewall list") rescue => err @config.logger.error(err.message) @@ -157,11 +157,11 @@ def send_heartbeat(at: Time.now.utc) heartbeat = @collector.flush report(heartbeat) do |response| - if Aikido::Zen.runtime_settings.update_from_runtime_config_json(response) + if update_settings_from_runtime_config!(response) updated_settings! @config.logger.info("Updated runtime settings after heartbeat") - Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists) + update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists) @config.logger.info("Updated runtime firewall list after heartbeat") end end @@ -177,23 +177,67 @@ def send_heartbeat(at: Time.now.utc) def poll_for_setting_updates @worker.every(@config.polling_interval) do if @api_client.should_fetch_settings? - if Aikido::Zen.runtime_settings.update_from_runtime_config_json(@api_client.fetch_runtime_config) + if update_settings_from_runtime_config!(@api_client.fetch_runtime_config) updated_settings! @config.logger.info("Updated runtime settings after polling") end - Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists) + update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists) @config.logger.info("Updated runtime firewall list after polling") end end end - private def heartbeats + private + + def heartbeats @heartbeats ||= Aikido::Zen::Agent::HeartbeatsManager.new( config: @config, worker: @worker ) end + + module Updater + # Define a method `method_name` that returns early if the method is running. + # + # @param method_name [Symbol, String] the name of the method to define + # @yield the block to execute + # @yieldparam args [Array] the positional arguments passed to the method + # @yieldparam blk [Proc] the block passed to the method + # @yieldparam kwargs [Hash] the keyword arguments passed to the method + # @yieldreturn [Object] the return value of the method + # @return [void] + def updater(method_name, &block) + raise ArgumentError, "block required" unless block + + instance_variable = :"@__updater_#{block.object_id}" + + define_method(method_name) do |*args, **kwargs| + updating = instance_variable_get(instance_variable) || + instance_variable_set(instance_variable, Concurrent::AtomicBoolean.new) + + return unless updating.make_true + begin + instance_exec(*args, **kwargs, &block) + ensure + updating.make_false + end + end + end + end + extend Updater + + # @param data [Hash] + # @return [Boolean, nil] + updater :update_settings_from_runtime_config! do |data| + Aikido::Zen.runtime_settings.update_from_runtime_config_json(data) + end + + # @param data [Hash] + # @return [Boolean, nil] + updater :update_settings_from_runtime_firewall_lists! do |data| + Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(data) + end end end From a8c5bc543529348ec9740aa8d70e2bc8b2bdc69d Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 13 May 2026 17:22:02 +0200 Subject: [PATCH 06/22] Normalize instance variable order --- lib/aikido/zen/agent.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/aikido/zen/agent.rb b/lib/aikido/zen/agent.rb index 88730ad6..606ecb3e 100644 --- a/lib/aikido/zen/agent.rb +++ b/lib/aikido/zen/agent.rb @@ -23,13 +23,13 @@ def initialize( worker: Aikido::Zen::Worker.new(config: config), api_client: Aikido::Zen::APIClient.new(config: config) ) - @started_at = nil - @config = config - @worker = worker - @api_client = api_client @collector = collector @detached_agent = detached_agent + @worker = worker + @api_client = api_client + + @started_at = nil end def started? From 7a8e9f7b407ebdd3c1dac1bd19c19dcc9abdd51e Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 13 May 2026 17:24:11 +0200 Subject: [PATCH 07/22] Add realtime settings updates --- lib/aikido/zen.rb | 1 + lib/aikido/zen/agent.rb | 30 +++++- lib/aikido/zen/api_stream.rb | 172 +++++++++++++++++++++++++++++++++++ 3 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 lib/aikido/zen/api_stream.rb diff --git a/lib/aikido/zen.rb b/lib/aikido/zen.rb index 6ac3e649..491c4b25 100644 --- a/lib/aikido/zen.rb +++ b/lib/aikido/zen.rb @@ -10,6 +10,7 @@ require_relative "zen/worker" require_relative "zen/agent" require_relative "zen/api_client" +require_relative "zen/api_stream" require_relative "zen/context" require_relative "zen/current_context" require_relative "zen/detached_agent" diff --git a/lib/aikido/zen/agent.rb b/lib/aikido/zen/agent.rb index 606ecb3e..e4294aa2 100644 --- a/lib/aikido/zen/agent.rb +++ b/lib/aikido/zen/agent.rb @@ -21,13 +21,15 @@ def initialize( collector: Aikido::Zen.collector, detached_agent: Aikido::Zen.detached_agent, worker: Aikido::Zen::Worker.new(config: config), - api_client: Aikido::Zen::APIClient.new(config: config) + api_client: Aikido::Zen::APIClient.new(config: config), + api_stream: Aikido::Zen::APIStream.new(config: config) ) @config = config @collector = collector @detached_agent = detached_agent @worker = worker @api_client = api_client + @api_stream = api_stream @started_at = nil end @@ -82,6 +84,9 @@ def start! @config.logger.info("Executed initial heartbeat after #{heartbeat_delay} seconds") end end + + @api_stream.handle("config-updated", &:settings_updated) + @api_stream.start! end # Clean up any ongoing threads, and reset the state. Called automatically @@ -92,6 +97,8 @@ def stop! @config.logger.info("Stopping Aikido agent") @started_at = nil @worker.shutdown + + @api_stream.stop! end # Respond to the runtime settings changing after being fetched from the @@ -188,8 +195,29 @@ def poll_for_setting_updates end end + def settings_updated(event) + updated_at = Time.at(event[:data]["configUpdatedAt"].to_i) + + if should_fetch_settings?(updated_at) + if update_settings_from_runtime_config!(@api_client.fetch_runtime_config) + updated_settings! + @config.logger.info("Updated runtime settings after server-side event") + + update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists) + @config.logger.info("Updated runtime firewall list after server-side event") + end + end + end + private + def should_fetch_settings?(updated_at, last_updated_at = Aikido::Zen.runtime_settings.updated_at) + return false unless @api_client.can_make_requests? + return true if last_updated_at.nil? + + updated_at > last_updated_at + end + def heartbeats @heartbeats ||= Aikido::Zen::Agent::HeartbeatsManager.new( config: @config, diff --git a/lib/aikido/zen/api_stream.rb b/lib/aikido/zen/api_stream.rb new file mode 100644 index 00000000..01e140f7 --- /dev/null +++ b/lib/aikido/zen/api_stream.rb @@ -0,0 +1,172 @@ +# frozen_string_literal: true + +require "net/http" +require "uri" +require "json" + +module Aikido::Zen + class APIStream + def initialize( + config: Aikido::Zen.config, + min_backoff: 5, + max_backoff: 60, + backoff_reset: 30, + open_timeout: 5, + write_timeout: open_timeout, + read_timeout: 70 + ) + @config = config + @min_backoff = min_backoff + @max_backoff = max_backoff + @backoff_reset = backoff_reset + @open_timeout = open_timeout + @write_timeout = write_timeout + @read_timeout = read_timeout + + @running = Concurrent::AtomicBoolean.new + @executor = nil + + @host = @config.realtime_endpoint.host + @port = @config.realtime_endpoint.port + @use_ssl = @config.realtime_endpoint.scheme == "https" + @token = @config.api_token + + @handlers = Concurrent::Array.new + end + + def running? + @running.true? + end + alias_method :started?, :running? + + def start! + return false unless @running.make_true + + @executor = Concurrent::SingleThreadExecutor.new + + @executor.post do + backoff = @min_backoff + + while running? + time_before = Process.clock_gettime(Process::CLOCK_MONOTONIC, :second) + + begin + work + rescue Timeout::Error, SocketError, IOError, SystemCallError, OpenSSL::OpenSSLError => err + @config.logger.debug("Error in API stream: #{err.class}: #{err.message}") + rescue => err + @config.logger.error("Error in API stream: #{err.class}: #{err.message}") + end + + break unless running? + + time_after = Process.clock_gettime(Process::CLOCK_MONOTONIC, :second) + + backoff = if time_after - time_before > @backoff_reset + @min_backoff + else + [backoff * 2, @max_backoff].min + end + + jitter = rand * backoff / 2 + + @config.logger.debug("API stream reconnecting in %d seconds" % (backoff + jitter).ceil) + + sleep(backoff + jitter) + end + end + + true + end + + def stop! + return false unless @running.make_false + + @executor.shutdown + @executor.wait_for_termination(@read_timeout) + + true + end + + def handle(type, &block) + raise ArgumentError, "block required" unless block + + @handlers << proc do |event| + block.call(event) if type === event[:type] + end + end + + private def work + http = Net::HTTP.new(@host, @port) + http.use_ssl = @use_ssl + http.open_timeout = @open_timeout + http.write_timeout = @write_timeout + http.read_timeout = @read_timeout + http.max_retries = 0 + + request = Net::HTTP::Get.new("/api/runtime/stream") + request["Authorization"] = @token + request["Accept"] = "text/event-stream" + request["Cache-Control"] = "no-cache" + + @config.logger.debug("API stream connecting") + http.start + @config.logger.debug("API stream connected") + + begin + http.request(request) do |response| + case response.code.to_i + when 200 + # empty + when 401, 403 + @running.make_false + return nil + else + return nil + end + + buffer = +"" + + response.read_body do |chunk| + return nil unless running? + + @config.logger.debug("API stream received chunk:\n#{chunk.strip}") + + buffer << chunk + + while (index = buffer.index("\n\n")) + event_str = buffer.slice!(0..index + 1) + buffer = buffer.lstrip + + event = {} + + begin + event_str.each_line do |line| + case line + when /^event:\s*(.+)/ + event[:type] = $1.strip + when /^data:\s*(.+)/ + event[:data] = JSON.parse($1.strip) + end + end + rescue => err + @config.logger.error("Error in API stream: #{err.class}: #{err.message}") + next + end + + @handlers.each do |handler| + handler.call(event) + rescue => err + @config.logger.error("Error in API stream: #{err.class}: #{err.message}") + end + end + end + end + ensure + @config.logger.debug("API stream disconnecting") + http.finish + @config.logger.debug("API stream disconnected") + end + end + end +end From 9842e0b6e9c91a729985c10088214ccfc7fea792 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 20 May 2026 14:59:33 +0200 Subject: [PATCH 08/22] Remove unused stub methods --- test/aikido/zen/agent_test.rb | 9 --------- 1 file changed, 9 deletions(-) diff --git a/test/aikido/zen/agent_test.rb b/test/aikido/zen/agent_test.rb index 7eee8461..90361ecd 100644 --- a/test/aikido/zen/agent_test.rb +++ b/test/aikido/zen/agent_test.rb @@ -432,13 +432,4 @@ def exception(*) Aikido::Zen::UnderAttackError.new(self) end end - - def stub_context(path = "/", env = {}) - env = Rack::MockRequest.env_for(path, {"REQUEST_METHOD" => "GET"}.merge(env)) - Aikido::Zen.current_context = Aikido::Zen::Context.from_rack_env(env) - end - - def stub_request(path = "/", env = {}) - stub_context(path, env).request - end end From 34b06ba131832a6abd7093c352b2c3b8bdac7a27 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 20 May 2026 15:00:06 +0200 Subject: [PATCH 09/22] Fix tests By mocking Aikido::Zen::APIStream to prevent real HTTP request to https://runtime.aikido.dev/api/runtime/stream. --- test/aikido/zen/agent_test.rb | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/test/aikido/zen/agent_test.rb b/test/aikido/zen/agent_test.rb index 90361ecd..9786fc5a 100644 --- a/test/aikido/zen/agent_test.rb +++ b/test/aikido/zen/agent_test.rb @@ -23,18 +23,27 @@ def report(event) end end + class MockAPIStream < Aikido::Zen::APIStream + def work + nil + end + end + setup do @config = Aikido::Zen.config @config.api_token = "TOKEN" - @api_client = Minitest::Mock.new(MockAPIClient.new) @collector = Aikido::Zen.collector @worker = MockWorker.new + @api_client = Minitest::Mock.new(MockAPIClient.new) + @api_stream = Minitest::Mock.new(MockAPIStream.new) @agent = Aikido::Zen::Agent.new( - api_client: @api_client, + config: @config, collector: @collector, - worker: @worker + worker: @worker, + api_client: @api_client, + api_stream: @api_stream ) @test_sink = Aikido::Zen::Sink.new("test", scanners: [NOOP]) From b9e2e93766157a72321dec701717a011b0b55fad Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Thu, 21 May 2026 15:06:54 +0200 Subject: [PATCH 10/22] Add tests --- test/aikido/zen/api_stream_test.rb | 237 +++++++++++++++++++++++++++++ test/support/wait_helpers.rb | 10 ++ test/test_helper.rb | 2 + 3 files changed, 249 insertions(+) create mode 100644 test/aikido/zen/api_stream_test.rb create mode 100644 test/support/wait_helpers.rb diff --git a/test/aikido/zen/api_stream_test.rb b/test/aikido/zen/api_stream_test.rb new file mode 100644 index 00000000..42dd2264 --- /dev/null +++ b/test/aikido/zen/api_stream_test.rb @@ -0,0 +1,237 @@ +# frozen_string_literal: true + +require "test_helper" +require "securerandom" + +class Aikido::Zen::StreamTest < ActiveSupport::TestCase + setup do + @endpoint = "https://runtime.aikido.dev/api/runtime/stream" + + Aikido::Zen.config.api_token = "TOKEN" + + @api_stream = Aikido::Zen::APIStream.new( + min_backoff: 0.2, + max_backoff: 0.8, + backoff_reset: 0.4, + open_timeout: 1, + read_timeout: 1 + ) + end + + teardown do + @api_stream.stop! + end + + DEFAULT_SSE_BODY = <<~SSE + event: config-updated + data: {"serviceId":1,"configUpdatedAt":1779292466} + + event: config-updated + data: {"serviceId":1,"configUpdatedAt":1779292467} + + : ping + + SSE + + test "#start! returns false if already running" do + stub_request(:get, @endpoint) + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + assert @api_stream.start! + assert_equal false, @api_stream.start! + end + + test "#handle raises ArgumentError without a block" do + assert_raises(ArgumentError) { @api_stream.handle("config-updated") } + end + + test "it starts and connects" do + connection = stub_request(:get, @endpoint) + .with( + headers: { + "Authorization" => "TOKEN", + "Accept" => "text/event-stream", + "Cache-Control" => "no-cache" + } + ) + .to_return(status: 200, body: "", headers: {}) + + assert_connects(connection, times: 1) + end + + test "it handles valid events" do + connection = stub_request(:get, @endpoint) + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + events = Concurrent::Array.new + @api_stream.handle("config-updated") { |event| events << event } + + assert_connects(connection, times: 1) + + assert_equal 2, events.size + + assert_equal "config-updated", events[0][:type] + assert_equal 1, events[0][:data]["serviceId"] + assert_equal 1779292466, events[0][:data]["configUpdatedAt"] + + assert_equal "config-updated", events[1][:type] + assert_equal 1, events[1][:data]["serviceId"] + assert_equal 1779292467, events[1][:data]["configUpdatedAt"] + end + + test "it skips invalid events and continues processing" do + body = <<~SSE + event: config-updated + data: not valid json + + event: config-updated + data: {"serviceId":1,"configUpdatedAt":1779292466} + + SSE + + connection = stub_request(:get, @endpoint) + .to_return(status: 200, body: body) + + events = Concurrent::Array.new + @api_stream.handle("config-updated") { |event| events << event } + + assert_connects(connection, times: 1) + + assert_equal 1, events.size + + assert_equal "config-updated", events[0][:type] + assert_equal 1, events[0][:data]["serviceId"] + assert_equal 1779292466, events[0][:data]["configUpdatedAt"] + end + + test "it skips handler errors and continues processing" do + connection = stub_request(:get, @endpoint) + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + events = Concurrent::Array.new + @api_stream.handle("config-updated") { |_event| raise "handler error" } + @api_stream.handle("config-updated") { |event| events << event } + + assert_connects(connection, times: 1) + + assert_equal 2, events.size + end + + test "it reconnects after the stream ends naturally" do + connection = stub_request(:get, @endpoint) + .to_return(status: 200, body: DEFAULT_SSE_BODY).then + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + assert_connects(connection, times: 2) + end + + test "it reconnects after connection reset" do + connection = stub_request(:get, @endpoint) + .to_raise(Errno::ECONNRESET).then + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + assert_connects(connection, times: 2) + end + + test "it reconnects after connection refused" do + connection = stub_request(:get, @endpoint) + .to_raise(Errno::ECONNREFUSED).then + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + assert_connects(connection, times: 2) + end + + test "it reconnects after open timeout" do + connection = stub_request(:get, @endpoint) + .to_raise(Net::OpenTimeout).then + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + assert_connects(connection, times: 2) + end + + test "it reconnects after write timeout" do + connection = stub_request(:get, @endpoint) + .to_raise(Net::WriteTimeout).then + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + assert_connects(connection, times: 2) + end + + test "it reconnects after read timeout" do + connection = stub_request(:get, @endpoint) + .to_raise(Net::ReadTimeout).then + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + assert_connects(connection, times: 2) + end + + test "it reconnects after unexpected error" do + connection = stub_request(:get, @endpoint) + .to_raise(RuntimeError).then + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + assert_connects(connection, times: 2) + end + + test "it reconnects after unexpected HTTP status code" do + connection = stub_request(:get, @endpoint) + .to_return(status: 418).then + .to_return(status: 200, body: DEFAULT_SSE_BODY) + + @api_stream.start! + + assert @api_stream.running? + + wait_until(timeout: 2) { connected?(connection, times: 2) } + + assert @api_stream.running? + + assert_requested connection, times: 2 + end + + test "it does not reconnect after 401 Unauthorized" do + connection = stub_request(:get, @endpoint) + .to_return(status: 401) + + @api_stream.start! + + assert @api_stream.running? + + wait_until(timeout: 2) { connected?(connection, times: 1) } + + refute @api_stream.running? + + assert_requested connection, times: 1 + end + + test "it does not reconnect after 403 Forbidden" do + connection = stub_request(:get, @endpoint) + .to_return(status: 403) + + @api_stream.start! + + assert @api_stream.running? + + wait_until(timeout: 2) { connected?(connection, times: 1) } + + refute @api_stream.running? + + assert_requested connection, times: 1 + end + + private + + def connected?(connection, times: 1) + WebMock::RequestRegistry.instance.times_executed(connection.request_pattern) == times + end + + def assert_connects(connection, times:, timeout: 2) + @api_stream.start! + + wait_until(timeout: timeout) { connected?(connection, times: times) } + + @api_stream.stop! + + assert_requested connection, times: times + end +end diff --git a/test/support/wait_helpers.rb b/test/support/wait_helpers.rb new file mode 100644 index 00000000..9f3b27ba --- /dev/null +++ b/test/support/wait_helpers.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +module WaitHelpers + def wait_until(timeout:) + start_time = Time.now + until yield || (Time.now - start_time) > timeout + sleep 0.1 + end + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index 909b5f73..c9be4c47 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -58,11 +58,13 @@ def handle_fork require_relative "support/rate_limiting_assertions" require_relative "support/sink_attack_helpers" require_relative "support/worker_helpers" +require_relative "support/wait_helpers" # Utility proc that does nothing. NOOP = ->(*args, **opts) {} class ActiveSupport::TestCase + include WaitHelpers self.file_fixture_path = "test/fixtures" # Reset any global state before each test From 6f9bdd3f51aecc52de88f83dd7786d79aa6d44d8 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Thu, 21 May 2026 15:36:12 +0200 Subject: [PATCH 11/22] Optimize tests By scaling down durations by 10x. --- test/aikido/zen/api_stream_test.rb | 6 +++--- test/support/wait_helpers.rb | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/aikido/zen/api_stream_test.rb b/test/aikido/zen/api_stream_test.rb index 42dd2264..4b5a6bbf 100644 --- a/test/aikido/zen/api_stream_test.rb +++ b/test/aikido/zen/api_stream_test.rb @@ -10,9 +10,9 @@ class Aikido::Zen::StreamTest < ActiveSupport::TestCase Aikido::Zen.config.api_token = "TOKEN" @api_stream = Aikido::Zen::APIStream.new( - min_backoff: 0.2, - max_backoff: 0.8, - backoff_reset: 0.4, + min_backoff: 0.02, + max_backoff: 0.08, + backoff_reset: 0.04, open_timeout: 1, read_timeout: 1 ) diff --git a/test/support/wait_helpers.rb b/test/support/wait_helpers.rb index 9f3b27ba..2d011341 100644 --- a/test/support/wait_helpers.rb +++ b/test/support/wait_helpers.rb @@ -4,7 +4,7 @@ module WaitHelpers def wait_until(timeout:) start_time = Time.now until yield || (Time.now - start_time) > timeout - sleep 0.1 + sleep 0.01 end end end From 94795b2d046836afd68ef917744c92060593bd13 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Tue, 26 May 2026 15:23:43 +0200 Subject: [PATCH 12/22] Probe realtime endpoint at startup --- lib/aikido/zen/agent.rb | 8 ++++++-- lib/aikido/zen/api_stream.rb | 25 +++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/lib/aikido/zen/agent.rb b/lib/aikido/zen/agent.rb index e4294aa2..fecfc23d 100644 --- a/lib/aikido/zen/agent.rb +++ b/lib/aikido/zen/agent.rb @@ -85,8 +85,12 @@ def start! end end - @api_stream.handle("config-updated", &:settings_updated) - @api_stream.start! + if @api_stream.can_connect? + @api_stream.handle("config-updated", &:settings_updated) + @api_stream.start! + else + @config.logger.warn("Can't reach #{Aikido::Zen.config.realtime_endpoint}, make sure it's in your outbound firewall allowlist. Realtime config updates won't be available, switched to polling.") + end end # Clean up any ongoing threads, and reset the state. Called automatically diff --git a/lib/aikido/zen/api_stream.rb b/lib/aikido/zen/api_stream.rb index 01e140f7..a46542e9 100644 --- a/lib/aikido/zen/api_stream.rb +++ b/lib/aikido/zen/api_stream.rb @@ -34,6 +34,31 @@ def initialize( @handlers = Concurrent::Array.new end + # @return [Boolean] whether we could connect to the realtime endpoint + def can_connect? + http = Net::HTTP.new(@host, @port) + http.use_ssl = @use_ssl + http.open_timeout = 5 + http.write_timeout = 5 + http.read_timeout = 5 + http.max_retries = 0 + + request = Net::HTTP::Get.new("/config") + request["Authorization"] = @token + + begin + http.request(request) + + return true + rescue Timeout::Error, SocketError, IOError, SystemCallError, OpenSSL::OpenSSLError => err + @config.logger.debug("Error probing realtime endpoint: #{err.class}: #{err.message}") + rescue => err + @config.logger.error("Error probing realtime endpoint: #{err.class}: #{err.message}") + end + + false + end + def running? @running.true? end From cd322ff50ea9f1e305ce4140989eca53ebd87ad2 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Tue, 26 May 2026 15:49:33 +0200 Subject: [PATCH 13/22] Fix tests --- test/aikido/zen/agent_test.rb | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/test/aikido/zen/agent_test.rb b/test/aikido/zen/agent_test.rb index 9786fc5a..ee04cb50 100644 --- a/test/aikido/zen/agent_test.rb +++ b/test/aikido/zen/agent_test.rb @@ -29,6 +29,10 @@ def work end end + def stub_probe_realtime_endpoint + stub_request(:get, "#{@config.realtime_endpoint}/config") + end + setup do @config = Aikido::Zen.config @config.api_token = "TOKEN" @@ -54,6 +58,8 @@ def work end test "knows if it has started" do + stub_probe_realtime_endpoint + refute @agent.started? @agent.start! @@ -64,6 +70,8 @@ def work end test "#start! fails if attempted to start multiple times" do + stub_probe_realtime_endpoint + @agent.start! err = assert_raises Aikido::ZenError do @@ -74,12 +82,16 @@ def work end test "#start! sets the start time for our stats funnel" do + stub_probe_realtime_endpoint + assert_changes "@collector.stats.started_at", from: nil do @agent.start! end end test "#start! warns if blocking mode is disabled" do + stub_probe_realtime_endpoint + @config.blocking_mode = false @agent.start! @@ -88,6 +100,8 @@ def work end test "#start! notifies if blocking mode is enabled" do + stub_probe_realtime_endpoint + @config.blocking_mode = true @agent.start! @@ -96,6 +110,8 @@ def work end test "#start! notifies if an API token has been set" do + stub_probe_realtime_endpoint + @config.api_token = "TOKEN" @agent.start! @@ -104,6 +120,8 @@ def work end test "#start! warns if there's no API token set" do + stub_probe_realtime_endpoint + @config.api_token = nil @agent.start! @@ -112,6 +130,8 @@ def work end test "#start! reports a STARTED event" do + stub_probe_realtime_endpoint + @api_client.expect :report, {}, [Aikido::Zen::Events::Started] @agent.start! @@ -120,6 +140,8 @@ def work end test "#start! takes the response of the STARTED event as runtime settings" do + stub_probe_realtime_endpoint + @api_client.expect :report, {"configUpdatedAt" => 1234567890}, [Aikido::Zen::Events::Started] @@ -145,6 +167,8 @@ def @api_client.report(event) end test "#start! starts polling for setting updates every minute" do + stub_probe_realtime_endpoint + @api_client.expect :should_fetch_settings?, false assert_difference "@worker.jobs.size", +1 do @@ -160,6 +184,8 @@ def @api_client.report(event) end test "#start! updates the runtime settings after polling if needed" do + stub_probe_realtime_endpoint + @api_client.expect :should_fetch_settings?, true @api_client.expect :fetch_runtime_config, {"configUpdatedAt" => 1234567890} @@ -340,6 +366,8 @@ def @api_client.report(event) end test "#start! queues a one-off tasks for each initial heartbeat delay" do + stub_probe_realtime_endpoint + size = @config.initial_heartbeat_delays.size assert_difference "@worker.delayed.size", size do @@ -355,6 +383,8 @@ def @api_client.report(event) end test "#start! successfully sends the initial heartbeats" do + stub_probe_realtime_endpoint + # Make sure there are _some_ stats @collector.track_request From 8d6bcc3950ed74cc84648de25635a12ec73f56fd Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Tue, 26 May 2026 16:57:51 +0200 Subject: [PATCH 14/22] Fix assert_logged and refute_logged --- test/aikido/zen/agent_test.rb | 2 +- test/test_helper.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/aikido/zen/agent_test.rb b/test/aikido/zen/agent_test.rb index ee04cb50..4e8a20e4 100644 --- a/test/aikido/zen/agent_test.rb +++ b/test/aikido/zen/agent_test.rb @@ -115,7 +115,7 @@ def stub_probe_realtime_endpoint @config.api_token = "TOKEN" @agent.start! - assert_logged :debug, /api token set! reporting has been enabled/i + assert_logged :info, /api token set! reporting has been enabled/i refute_logged :warn, /no api token set! reporting has been disabled/i end diff --git a/test/test_helper.rb b/test/test_helper.rb index c9be4c47..f2937ddc 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -144,7 +144,7 @@ def assert_logged(level = nil, pattern) "matches #{pattern.inspect}. ".squeeze("\s") + "Log messages:\n#{lines.map { |line| "\t* #{line}" }.join("\n")}" - assert lines.any? { |line| pattern === line && (match_level === line or true) }, reason + assert lines.any? { |line| pattern === line && (match_level.nil? || line.include?(match_level)) }, reason end def refute_logged(level = nil, pattern) @@ -157,7 +157,7 @@ def refute_logged(level = nil, pattern) "to match #{pattern.inspect}".squeeze("\s") + "Log messages:\n#{lines.map { |line| "\t* #{line}" }.join("\n")}" - refute lines.any? { |line| pattern === line && (match_level === line or true) }, reason + refute lines.any? { |line| pattern === line && (match_level.nil? || line.include?(match_level)) }, reason end # rubocop:enable Style/OptionalArguments From fb03e7dcdf0dfed2dd7db53877c207e3f6e48c5c Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Tue, 26 May 2026 15:55:32 +0200 Subject: [PATCH 15/22] Add tests --- test/aikido/zen/agent_test.rb | 70 +++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/test/aikido/zen/agent_test.rb b/test/aikido/zen/agent_test.rb index 4e8a20e4..a39a394e 100644 --- a/test/aikido/zen/agent_test.rb +++ b/test/aikido/zen/agent_test.rb @@ -129,6 +129,76 @@ def stub_probe_realtime_endpoint refute_logged :debug, /api token set! reporting has been enabled/i end + test "#start! probes the realtime endpont" do + request = stub_probe_realtime_endpoint + .to_return(status: 200, body: "") + + @config.api_token = "TOKEN" + @agent.start! + + assert_requested request + + refute_logged :debug, /error probing realtime endpoint/i + refute_logged :error, /error probing realtime endpoint/i + refute_logged :warn, /can't reach #{Aikido::Zen.config.realtime_endpoint}/i + end + + test "#start! probes the realtime endpont and logs warning after open timeout" do + request = stub_probe_realtime_endpoint + .to_raise(Net::OpenTimeout) + + @config.api_token = "TOKEN" + @agent.start! + + assert_requested request + + assert_logged :debug, /error probing realtime endpoint/i + refute_logged :error, /error probing realtime endpoint/i + assert_logged :warn, /can't reach #{Aikido::Zen.config.realtime_endpoint}/i + end + + test "#start! probes the realtime endpont and logs warning after write timeout" do + request = stub_probe_realtime_endpoint + .to_raise(Net::WriteTimeout) + + @config.api_token = "TOKEN" + @agent.start! + + assert_requested request + + assert_logged :debug, /error probing realtime endpoint/i + refute_logged :error, /error probing realtime endpoint/i + assert_logged :warn, /can't reach #{Aikido::Zen.config.realtime_endpoint}/i + end + + test "#start! probes the realtime endpont and logs warning after read timeout" do + request = stub_probe_realtime_endpoint + .to_raise(Net::ReadTimeout) + + @config.api_token = "TOKEN" + @agent.start! + + assert_requested request + + assert_logged :debug, /error probing realtime endpoint/i + refute_logged :error, /error probing realtime endpoint/i + assert_logged :warn, /can't reach #{Aikido::Zen.config.realtime_endpoint}/i + end + + test "#start! probes the realtime endpont and logs error after unexpected error" do + request = stub_probe_realtime_endpoint + .to_raise(RuntimeError) + + @config.api_token = "TOKEN" + @agent.start! + + assert_requested request + + refute_logged :debug, /error probing realtime endpoint/i + assert_logged :error, /error probing realtime endpoint/i + assert_logged :warn, /can't reach #{Aikido::Zen.config.realtime_endpoint}/i + end + test "#start! reports a STARTED event" do stub_probe_realtime_endpoint From ddef592f8d3b4e8636b31e4d36a6907c8f070e46 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Thu, 28 May 2026 09:31:28 +0200 Subject: [PATCH 16/22] Fix call #settings_updated with event --- lib/aikido/zen/agent.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/aikido/zen/agent.rb b/lib/aikido/zen/agent.rb index fecfc23d..9cae48fe 100644 --- a/lib/aikido/zen/agent.rb +++ b/lib/aikido/zen/agent.rb @@ -86,7 +86,7 @@ def start! end if @api_stream.can_connect? - @api_stream.handle("config-updated", &:settings_updated) + @api_stream.handle("config-updated") { |event| settings_updated(event) } @api_stream.start! else @config.logger.warn("Can't reach #{Aikido::Zen.config.realtime_endpoint}, make sure it's in your outbound firewall allowlist. Realtime config updates won't be available, switched to polling.") From 607c1461f0d01eab2995cfe4732ce8dac91be405 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Fri, 12 Jun 2026 12:51:25 +0200 Subject: [PATCH 17/22] Use configured realtime endpoint in API stream tests --- test/aikido/zen/api_stream_test.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/aikido/zen/api_stream_test.rb b/test/aikido/zen/api_stream_test.rb index 4b5a6bbf..9831d8f9 100644 --- a/test/aikido/zen/api_stream_test.rb +++ b/test/aikido/zen/api_stream_test.rb @@ -5,9 +5,10 @@ class Aikido::Zen::StreamTest < ActiveSupport::TestCase setup do - @endpoint = "https://runtime.aikido.dev/api/runtime/stream" + config = Aikido::Zen.config + config.api_token = "TOKEN" - Aikido::Zen.config.api_token = "TOKEN" + @endpoint = "#{config.realtime_endpoint}/api/runtime/stream" @api_stream = Aikido::Zen::APIStream.new( min_backoff: 0.02, From b7534cc4c2ebecc3bb16cc6bf37aad2293c7ee7d Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Fri, 12 Jun 2026 12:05:52 +0200 Subject: [PATCH 18/22] Add realtime updates feature flag --- lib/aikido/zen/agent.rb | 12 +++++++----- lib/aikido/zen/config.rb | 6 ++++++ test/aikido/zen/agent_test.rb | 1 + test/aikido/zen/config_test.rb | 1 + 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/aikido/zen/agent.rb b/lib/aikido/zen/agent.rb index 9cae48fe..2f03300e 100644 --- a/lib/aikido/zen/agent.rb +++ b/lib/aikido/zen/agent.rb @@ -85,11 +85,13 @@ def start! end end - if @api_stream.can_connect? - @api_stream.handle("config-updated") { |event| settings_updated(event) } - @api_stream.start! - else - @config.logger.warn("Can't reach #{Aikido::Zen.config.realtime_endpoint}, make sure it's in your outbound firewall allowlist. Realtime config updates won't be available, switched to polling.") + if @config.realtime_updates_enabled? + if @api_stream.can_connect? + @api_stream.handle("config-updated") { |event| settings_updated(event) } + @api_stream.start! + else + @config.logger.warn("Can't reach #{Aikido::Zen.config.realtime_endpoint}, make sure it's in your outbound firewall allowlist. Realtime config updates won't be available, switched to polling.") + end end end diff --git a/lib/aikido/zen/config.rb b/lib/aikido/zen/config.rb index eb43773a..1e8a7788 100644 --- a/lib/aikido/zen/config.rb +++ b/lib/aikido/zen/config.rb @@ -216,6 +216,11 @@ class Config # Defaults to 1000 entries. attr_accessor :idor_max_cache_entries + # @return [Boolean] whether the realtime updates feature is enabled. + # Defaults to false. + attr_accessor :realtime_updates_enabled + alias_method :realtime_updates_enabled?, :realtime_updates_enabled + def initialize self.insert_middleware_after = ::ActionDispatch::RemoteIp self.disabled = read_boolean_from_env(ENV.fetch("AIKIDO_DISABLE", false)) || read_boolean_from_env(ENV.fetch("AIKIDO_DISABLED", false)) @@ -261,6 +266,7 @@ def initialize self.idor_tenant_column_name = nil self.idor_excluded_table_names = [] self.idor_max_cache_entries = 1000 + self.realtime_updates_enabled = false end # Set the base URL for API requests. diff --git a/test/aikido/zen/agent_test.rb b/test/aikido/zen/agent_test.rb index a39a394e..61573357 100644 --- a/test/aikido/zen/agent_test.rb +++ b/test/aikido/zen/agent_test.rb @@ -36,6 +36,7 @@ def stub_probe_realtime_endpoint setup do @config = Aikido::Zen.config @config.api_token = "TOKEN" + @config.realtime_updates_enabled = true @collector = Aikido::Zen.collector @worker = MockWorker.new diff --git a/test/aikido/zen/config_test.rb b/test/aikido/zen/config_test.rb index 4bb88126..14504a84 100644 --- a/test/aikido/zen/config_test.rb +++ b/test/aikido/zen/config_test.rb @@ -54,6 +54,7 @@ class Aikido::Zen::ConfigTest < ActiveSupport::TestCase assert_nil @config.idor_tenant_column_name assert_equal [], @config.idor_excluded_table_names assert_equal 1000, @config.idor_max_cache_entries + assert_equal false, @config.realtime_updates_enabled? end test "can set AIKIDO_DISABLE to configure if the agent should be turned off" do From 06886e1ebccd4cc3bc3c0bd081ca267679473b0a Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Fri, 12 Jun 2026 15:35:19 +0200 Subject: [PATCH 19/22] Avoid logging raw chunks In case the server-side event stream ever contains sensitive data. --- lib/aikido/zen/api_stream.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/aikido/zen/api_stream.rb b/lib/aikido/zen/api_stream.rb index a46542e9..b6bcbd8a 100644 --- a/lib/aikido/zen/api_stream.rb +++ b/lib/aikido/zen/api_stream.rb @@ -155,7 +155,7 @@ def handle(type, &block) response.read_body do |chunk| return nil unless running? - @config.logger.debug("API stream received chunk:\n#{chunk.strip}") + @config.logger.debug("API stream received chunk of #{chunk.bytesize} bytes") buffer << chunk From 34e57b51ae137648ad4e163456e1ec3df5caf201 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Fri, 12 Jun 2026 15:47:01 +0200 Subject: [PATCH 20/22] Rename Updater to ExclusiveUpdater To emphasize the mutual exclusion property of defined updater methods. --- lib/aikido/zen/agent.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/aikido/zen/agent.rb b/lib/aikido/zen/agent.rb index 2f03300e..ad50f5ca 100644 --- a/lib/aikido/zen/agent.rb +++ b/lib/aikido/zen/agent.rb @@ -231,7 +231,7 @@ def heartbeats ) end - module Updater + module ExclusiveUpdater # Define a method `method_name` that returns early if the method is running. # # @param method_name [Symbol, String] the name of the method to define @@ -241,7 +241,7 @@ module Updater # @yieldparam kwargs [Hash] the keyword arguments passed to the method # @yieldreturn [Object] the return value of the method # @return [void] - def updater(method_name, &block) + def exclusive_updater(method_name, &block) raise ArgumentError, "block required" unless block instance_variable = :"@__updater_#{block.object_id}" @@ -259,17 +259,17 @@ def updater(method_name, &block) end end end - extend Updater + extend ExclusiveUpdater # @param data [Hash] # @return [Boolean, nil] - updater :update_settings_from_runtime_config! do |data| + exclusive_updater :update_settings_from_runtime_config! do |data| Aikido::Zen.runtime_settings.update_from_runtime_config_json(data) end # @param data [Hash] # @return [Boolean, nil] - updater :update_settings_from_runtime_firewall_lists! do |data| + exclusive_updater :update_settings_from_runtime_firewall_lists! do |data| Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(data) end end From c6b6d14118aca9797c30dfbc4c8d78b1181a6876 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Fri, 12 Jun 2026 15:53:28 +0200 Subject: [PATCH 21/22] Make settings_updated private --- lib/aikido/zen/agent.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/aikido/zen/agent.rb b/lib/aikido/zen/agent.rb index ad50f5ca..ff145e58 100644 --- a/lib/aikido/zen/agent.rb +++ b/lib/aikido/zen/agent.rb @@ -201,6 +201,8 @@ def poll_for_setting_updates end end + private + def settings_updated(event) updated_at = Time.at(event[:data]["configUpdatedAt"].to_i) @@ -215,8 +217,6 @@ def settings_updated(event) end end - private - def should_fetch_settings?(updated_at, last_updated_at = Aikido::Zen.runtime_settings.updated_at) return false unless @api_client.can_make_requests? return true if last_updated_at.nil? From 38322c009cf1fb59aec42888b1e3787270f83601 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Fri, 12 Jun 2026 16:01:01 +0200 Subject: [PATCH 22/22] Fix test description typo endpont to endpoint --- test/aikido/zen/agent_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/aikido/zen/agent_test.rb b/test/aikido/zen/agent_test.rb index 61573357..f4dd918f 100644 --- a/test/aikido/zen/agent_test.rb +++ b/test/aikido/zen/agent_test.rb @@ -130,7 +130,7 @@ def stub_probe_realtime_endpoint refute_logged :debug, /api token set! reporting has been enabled/i end - test "#start! probes the realtime endpont" do + test "#start! probes the realtime endpoint" do request = stub_probe_realtime_endpoint .to_return(status: 200, body: "")