Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/aikido/zen.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
require_relative "zen/worker"
require_relative "zen/agent"
require_relative "zen/api_client"
require_relative "zen/api_stream"
require_relative "zen/context"
require_relative "zen/current_context"
require_relative "zen/detached_agent"
Expand Down
102 changes: 90 additions & 12 deletions lib/aikido/zen/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ def initialize(
collector: Aikido::Zen.collector,
detached_agent: Aikido::Zen.detached_agent,
worker: Aikido::Zen::Worker.new(config: config),
api_client: Aikido::Zen::APIClient.new(config: config)
api_client: Aikido::Zen::APIClient.new(config: config),
api_stream: Aikido::Zen::APIStream.new(config: config)
)
@started_at = nil

@config = config
@worker = worker
@api_client = api_client
@collector = collector
@detached_agent = detached_agent
@worker = worker
@api_client = api_client
@api_stream = api_stream

@started_at = nil
end

def started?
Expand Down Expand Up @@ -59,7 +61,7 @@ def start!
at_exit { stop! if started? }

report(Events::Started.new(time: @started_at)) do |response|
if Aikido::Zen.runtime_settings.update_from_runtime_config_json(response)
if update_settings_from_runtime_config!(response)
updated_settings!
@config.logger.info("Updated runtime settings")
end
Expand All @@ -68,7 +70,7 @@ def start!
end

begin
Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists)
update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists)
@config.logger.info("Updated runtime firewall list")
rescue => err
@config.logger.error(err.message)
Expand All @@ -82,6 +84,15 @@ def start!
@config.logger.info("Executed initial heartbeat after #{heartbeat_delay} seconds")
end
end

if @config.realtime_updates_enabled?
if @api_stream.can_connect?
@api_stream.handle("config-updated") { |event| settings_updated(event) }
@api_stream.start!
else
@config.logger.warn("Can't reach #{Aikido::Zen.config.realtime_endpoint}, make sure it's in your outbound firewall allowlist. Realtime config updates won't be available, switched to polling.")
end
end
end

# Clean up any ongoing threads, and reset the state. Called automatically
Expand All @@ -92,6 +103,8 @@ def stop!
@config.logger.info("Stopping Aikido agent")
@started_at = nil
@worker.shutdown

@api_stream.stop!
end

# Respond to the runtime settings changing after being fetched from the
Expand Down Expand Up @@ -157,11 +170,11 @@ def send_heartbeat(at: Time.now.utc)

heartbeat = @collector.flush
report(heartbeat) do |response|
if Aikido::Zen.runtime_settings.update_from_runtime_config_json(response)
if update_settings_from_runtime_config!(response)
updated_settings!
@config.logger.info("Updated runtime settings after heartbeat")

Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists)
update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists)
@config.logger.info("Updated runtime firewall list after heartbeat")
end
end
Expand All @@ -177,23 +190,88 @@ def send_heartbeat(at: Time.now.utc)
def poll_for_setting_updates
@worker.every(@config.polling_interval) do
if @api_client.should_fetch_settings?
if Aikido::Zen.runtime_settings.update_from_runtime_config_json(@api_client.fetch_runtime_config)
if update_settings_from_runtime_config!(@api_client.fetch_runtime_config)
updated_settings!
@config.logger.info("Updated runtime settings after polling")
end

Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists)
update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists)
@config.logger.info("Updated runtime firewall list after polling")
end
end
end

private def heartbeats
private

def settings_updated(event)
Comment thread
marksmith marked this conversation as resolved.
Comment thread
marksmith marked this conversation as resolved.
updated_at = Time.at(event[:data]["configUpdatedAt"].to_i)
Comment thread
marksmith marked this conversation as resolved.

if should_fetch_settings?(updated_at)
if update_settings_from_runtime_config!(@api_client.fetch_runtime_config)
updated_settings!
@config.logger.info("Updated runtime settings after server-side event")

update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists)
@config.logger.info("Updated runtime firewall list after server-side event")
end
Comment thread
marksmith marked this conversation as resolved.
end
end

def should_fetch_settings?(updated_at, last_updated_at = Aikido::Zen.runtime_settings.updated_at)
return false unless @api_client.can_make_requests?
return true if last_updated_at.nil?

updated_at > last_updated_at
end

def heartbeats
@heartbeats ||= Aikido::Zen::Agent::HeartbeatsManager.new(
config: @config,
worker: @worker
)
end

module ExclusiveUpdater
# Define a method `method_name` that returns early if the method is running.
#
# @param method_name [Symbol, String] the name of the method to define
# @yield the block to execute
# @yieldparam args [Array] the positional arguments passed to the method
# @yieldparam blk [Proc] the block passed to the method
# @yieldparam kwargs [Hash] the keyword arguments passed to the method
# @yieldreturn [Object] the return value of the method
# @return [void]
def exclusive_updater(method_name, &block)
raise ArgumentError, "block required" unless block

instance_variable = :"@__updater_#{block.object_id}"

define_method(method_name) do |*args, **kwargs|
updating = instance_variable_get(instance_variable) ||
instance_variable_set(instance_variable, Concurrent::AtomicBoolean.new)
Comment thread
marksmith marked this conversation as resolved.

return unless updating.make_true
begin
instance_exec(*args, **kwargs, &block)
ensure
updating.make_false
end
end
end
end
extend ExclusiveUpdater

# @param data [Hash]
# @return [Boolean, nil]
exclusive_updater :update_settings_from_runtime_config! do |data|
Aikido::Zen.runtime_settings.update_from_runtime_config_json(data)
end

# @param data [Hash]
# @return [Boolean, nil]
exclusive_updater :update_settings_from_runtime_firewall_lists! do |data|
Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(data)
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/aikido/zen/api_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def should_fetch_settings?(last_updated_at = Aikido::Zen.runtime_settings.update
base_url: @config.realtime_endpoint
)

new_updated_at = Time.at(response["configUpdatedAt"].to_i / 1000)
new_updated_at = Time.at(response["configUpdatedAt"].to_i)
new_updated_at > last_updated_at
end

Expand Down
197 changes: 197 additions & 0 deletions lib/aikido/zen/api_stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# frozen_string_literal: true

require "net/http"
require "uri"
require "json"

module Aikido::Zen
class APIStream
def initialize(
config: Aikido::Zen.config,
min_backoff: 5,
max_backoff: 60,
backoff_reset: 30,
open_timeout: 5,
write_timeout: open_timeout,
read_timeout: 70
)
@config = config
@min_backoff = min_backoff
@max_backoff = max_backoff
@backoff_reset = backoff_reset
@open_timeout = open_timeout
@write_timeout = write_timeout
@read_timeout = read_timeout

@running = Concurrent::AtomicBoolean.new
@executor = nil

@host = @config.realtime_endpoint.host
@port = @config.realtime_endpoint.port
@use_ssl = @config.realtime_endpoint.scheme == "https"
@token = @config.api_token

@handlers = Concurrent::Array.new
end

# @return [Boolean] whether we could connect to the realtime endpoint
def can_connect?
http = Net::HTTP.new(@host, @port)
Comment thread
marksmith marked this conversation as resolved.
http.use_ssl = @use_ssl
http.open_timeout = 5
http.write_timeout = 5
http.read_timeout = 5
http.max_retries = 0

request = Net::HTTP::Get.new("/config")
request["Authorization"] = @token

begin
http.request(request)

return true
rescue Timeout::Error, SocketError, IOError, SystemCallError, OpenSSL::OpenSSLError => err
@config.logger.debug("Error probing realtime endpoint: #{err.class}: #{err.message}")
rescue => err
@config.logger.error("Error probing realtime endpoint: #{err.class}: #{err.message}")
end

false
end

def running?
@running.true?
end
alias_method :started?, :running?

def start!
return false unless @running.make_true

@executor = Concurrent::SingleThreadExecutor.new

@executor.post do
backoff = @min_backoff

while running?
time_before = Process.clock_gettime(Process::CLOCK_MONOTONIC, :second)

begin
work
rescue Timeout::Error, SocketError, IOError, SystemCallError, OpenSSL::OpenSSLError => err
@config.logger.debug("Error in API stream: #{err.class}: #{err.message}")
rescue => err
@config.logger.error("Error in API stream: #{err.class}: #{err.message}")
end

break unless running?

time_after = Process.clock_gettime(Process::CLOCK_MONOTONIC, :second)

backoff = if time_after - time_before > @backoff_reset
@min_backoff
else
[backoff * 2, @max_backoff].min
end

jitter = rand * backoff / 2

@config.logger.debug("API stream reconnecting in %d seconds" % (backoff + jitter).ceil)

sleep(backoff + jitter)
end
end

true
end

def stop!
return false unless @running.make_false

@executor.shutdown
@executor.wait_for_termination(@read_timeout)
Comment thread
marksmith marked this conversation as resolved.

true
end

def handle(type, &block)
raise ArgumentError, "block required" unless block

@handlers << proc do |event|
block.call(event) if type === event[:type]
end
end

private def work
Comment thread
marksmith marked this conversation as resolved.
Comment thread
marksmith marked this conversation as resolved.
http = Net::HTTP.new(@host, @port)
http.use_ssl = @use_ssl
http.open_timeout = @open_timeout
http.write_timeout = @write_timeout
http.read_timeout = @read_timeout
http.max_retries = 0

request = Net::HTTP::Get.new("/api/runtime/stream")
request["Authorization"] = @token
request["Accept"] = "text/event-stream"
request["Cache-Control"] = "no-cache"

@config.logger.debug("API stream connecting")
http.start
@config.logger.debug("API stream connected")

begin
http.request(request) do |response|
Comment thread
marksmith marked this conversation as resolved.
case response.code.to_i
when 200
# empty
Comment thread
marksmith marked this conversation as resolved.
when 401, 403
@running.make_false
return nil
else
return nil
end

buffer = +""

response.read_body do |chunk|
return nil unless running?

@config.logger.debug("API stream received chunk of #{chunk.bytesize} bytes")

buffer << chunk

while (index = buffer.index("\n\n"))
Comment thread
marksmith marked this conversation as resolved.
event_str = buffer.slice!(0..index + 1)
buffer = buffer.lstrip

event = {}

begin
event_str.each_line do |line|
case line
when /^event:\s*(.+)/
event[:type] = $1.strip
when /^data:\s*(.+)/
event[:data] = JSON.parse($1.strip)
end
end
rescue => err
@config.logger.error("Error in API stream: #{err.class}: #{err.message}")
next
end

@handlers.each do |handler|
handler.call(event)
rescue => err
@config.logger.error("Error in API stream: #{err.class}: #{err.message}")
end
end
end
end
ensure
@config.logger.debug("API stream disconnecting")
http.finish
@config.logger.debug("API stream disconnected")
end
end
end
end
Loading
Loading