Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions docs/track.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Tracking events

`Aikido::Zen.track_user_event` lets you record things happening in your app — like failed logins, signups, or password resets. Zen sends these to Aikido so patterns can be detected, like someone failing to log in 50 times in a minute.

```ruby
# app/controllers/application_controller.rb
class ApplicationController < ActionController::Base
private

def authenticate_user!
# Your authentication logic here
# ...

unless current_user
Aikido::Zen.track_user_event("user.login_failed")
return
end

Aikido::Zen.set_user(
id: current_user.id,
name: current_user.name
)

Aikido::Zen.track_user_event("user.login_succeeded")
end
end
```

Zen automatically picks up the IP address, user agent, and current user (if you called [`setUser`](./user.md)) from the request — you don't need to pass those yourself.

## More examples

```ruby
Aikido::Zen.track_user_event("user.signed_up")
Aikido::Zen.track_user_event("user.password_reset_requested")
Aikido::Zen.track_user_event("plan.invite_sent")
Aikido::Zen.track_user_event("payment.failed")
```

## Naming events

Use lowercase with dots to group related events:

- `user.login_failed`
- `user.login_succeeded`
- `user.signed_up`
- `user.password_reset_requested`
- `payment.failed`
- `plan.invite_sent`

## Things to know

`Aikido::Zen.track_user_event` only works inside an HTTP request. If you call it in a background job or a script, nothing gets sent and you'll see a warning in the console.

If you haven't called `Aikido::Zen.set_user` yet, the event still goes through — it just won't have a user ID attached.
20 changes: 20 additions & 0 deletions lib/aikido/zen.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
require_relative "zen/worker"
require_relative "zen/agent"
require_relative "zen/api_client"
require_relative "zen/api_stream"
require_relative "zen/context"
require_relative "zen/current_context"
require_relative "zen/detached_agent"
Expand Down Expand Up @@ -215,6 +216,25 @@ class << self
alias_method :set_user, :track_user
end

# Track user event with name
#
# @param name [String]
# @return [void]
def self.track_user_event(name)
context = current_context
return unless context

request = context.request

event = Aikido::Zen::UserEvent.new(
name: name,
user_id: request.actor&.id,
ip_address: request.client_ip
)

agent.send_user_event(event)
end

# @return [Aikido::Zen::AttackWave::Detector] the attack wave detector.
def self.attack_wave_detector
@attack_wave_detector ||= AttackWave::Detector.new
Expand Down
113 changes: 101 additions & 12 deletions lib/aikido/zen/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ def initialize(
collector: Aikido::Zen.collector,
detached_agent: Aikido::Zen.detached_agent,
worker: Aikido::Zen::Worker.new(config: config),
api_client: Aikido::Zen::APIClient.new(config: config)
api_client: Aikido::Zen::APIClient.new(config: config),
api_stream: Aikido::Zen::APIStream.new(config: config)
)
@started_at = nil

@config = config
@worker = worker
@api_client = api_client
@collector = collector
@detached_agent = detached_agent
@worker = worker
@api_client = api_client
@api_stream = api_stream

@started_at = nil
end

def started?
Expand Down Expand Up @@ -59,7 +61,7 @@ def start!
at_exit { stop! if started? }

report(Events::Started.new(time: @started_at)) do |response|
if Aikido::Zen.runtime_settings.update_from_runtime_config_json(response)
if update_settings_from_runtime_config!(response)
updated_settings!
@config.logger.info("Updated runtime settings")
end
Expand All @@ -68,7 +70,7 @@ def start!
end

begin
Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists)
update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists)
@config.logger.info("Updated runtime firewall list")
rescue => err
@config.logger.error(err.message)
Expand All @@ -82,6 +84,13 @@ def start!
@config.logger.info("Executed initial heartbeat after #{heartbeat_delay} seconds")
end
end

if @api_stream.can_connect?
@api_stream.handle("config-updated") { |event| settings_updated(event) }
@api_stream.start!
else
@config.logger.warn("Can't reach #{Aikido::Zen.config.realtime_endpoint}, make sure it's in your outbound firewall allowlist. Realtime config updates won't be available, switched to polling.")
end
end

# Clean up any ongoing threads, and reset the state. Called automatically
Expand All @@ -92,6 +101,8 @@ def stop!
@config.logger.info("Stopping Aikido agent")
@started_at = nil
@worker.shutdown

@api_stream.stop!
end

# Respond to the runtime settings changing after being fetched from the
Expand Down Expand Up @@ -143,6 +154,19 @@ def report(event)
end
end

# @param event [Aikido::Zen::Tracked]
Comment thread
marksmith marked this conversation as resolved.
# @return [void]
def send_user_event(event)
return unless @api_client.can_make_requests?

@worker.perform do
response = @api_client.send_user_event(event)
yield response if response && block_given?
rescue Aikido::Zen::APIError, Aikido::Zen::NetworkError => err
@config.logger.error(err.message)
end
end

# @api private
#
# Atomically flushes all the stats stored by the agent, and sends a
Expand All @@ -157,11 +181,11 @@ def send_heartbeat(at: Time.now.utc)

heartbeat = @collector.flush
report(heartbeat) do |response|
if Aikido::Zen.runtime_settings.update_from_runtime_config_json(response)
if update_settings_from_runtime_config!(response)
updated_settings!
@config.logger.info("Updated runtime settings after heartbeat")

Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists)
update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists)
@config.logger.info("Updated runtime firewall list after heartbeat")
end
end
Expand All @@ -177,23 +201,88 @@ def send_heartbeat(at: Time.now.utc)
def poll_for_setting_updates
@worker.every(@config.polling_interval) do
if @api_client.should_fetch_settings?
if Aikido::Zen.runtime_settings.update_from_runtime_config_json(@api_client.fetch_runtime_config)
if update_settings_from_runtime_config!(@api_client.fetch_runtime_config)
updated_settings!
@config.logger.info("Updated runtime settings after polling")
end

Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(@api_client.fetch_runtime_firewall_lists)
update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists)
@config.logger.info("Updated runtime firewall list after polling")
end
end
end

private def heartbeats
def settings_updated(event)
updated_at = Time.at(event[:data]["configUpdatedAt"].to_i)

if should_fetch_settings?(updated_at)
if update_settings_from_runtime_config!(@api_client.fetch_runtime_config)
updated_settings!
@config.logger.info("Updated runtime settings after server-side event")

update_settings_from_runtime_firewall_lists!(@api_client.fetch_runtime_firewall_lists)
@config.logger.info("Updated runtime firewall list after server-side event")
end
end
end

private

def should_fetch_settings?(updated_at, last_updated_at = Aikido::Zen.runtime_settings.updated_at)
return false unless @api_client.can_make_requests?
return true if last_updated_at.nil?

updated_at > last_updated_at
end

def heartbeats
@heartbeats ||= Aikido::Zen::Agent::HeartbeatsManager.new(
config: @config,
worker: @worker
)
end

module Updater
# Define a method `method_name` that returns early if the method is running.
#
# @param method_name [Symbol, String] the name of the method to define
# @yield the block to execute
# @yieldparam args [Array] the positional arguments passed to the method
# @yieldparam blk [Proc] the block passed to the method
# @yieldparam kwargs [Hash] the keyword arguments passed to the method
# @yieldreturn [Object] the return value of the method
# @return [void]
def updater(method_name, &block)
raise ArgumentError, "block required" unless block

instance_variable = :"@__updater_#{block.object_id}"
Comment thread
marksmith marked this conversation as resolved.

define_method(method_name) do |*args, **kwargs|
updating = instance_variable_get(instance_variable) ||
instance_variable_set(instance_variable, Concurrent::AtomicBoolean.new)

return unless updating.make_true
begin
instance_exec(*args, **kwargs, &block)
ensure
updating.make_false
end
end
end
end
extend Updater

# @param data [Hash]
# @return [Boolean, nil]
updater :update_settings_from_runtime_config! do |data|
Aikido::Zen.runtime_settings.update_from_runtime_config_json(data)
end

# @param data [Hash]
# @return [Boolean, nil]
updater :update_settings_from_runtime_firewall_lists! do |data|
Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(data)
end
end
end

Expand Down
28 changes: 27 additions & 1 deletion lib/aikido/zen/api_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def should_fetch_settings?(last_updated_at = Aikido::Zen.runtime_settings.update
base_url: @config.realtime_endpoint
)

new_updated_at = Time.at(response["configUpdatedAt"].to_i / 1000)
new_updated_at = Time.at(response["configUpdatedAt"].to_i)
new_updated_at > last_updated_at
end

Expand Down Expand Up @@ -111,6 +111,30 @@ def report(event)
raise
end

def send_user_event(event)
Comment thread
marksmith marked this conversation as resolved.
event_type = "user_event"

if @rate_limiter.throttle?(event_type)
@config.logger.error("Not reporting #{event_type.upcase} event due to rate limiting")
return
end

@config.logger.debug("Reporting #{event_type.upcase} event")

req = Net::HTTP::Post.new("/api/runtime/events", default_headers)
req.content_type = "application/json"
req.body = if event.respond_to?(:as_json)
@config.json_encoder.call(event.as_json)
else
@config.json_encoder.call(event)
end

request(req, base_url: @config.realtime_endpoint)
rescue Aikido::Zen::RateLimitedError
@rate_limiter.open!
raise
end

# Perform an HTTP request against one of our API endpoints, and process the
# response.
#
Expand All @@ -127,6 +151,8 @@ def report(event)
response = http.request(request)

case response
when Net::HTTPNoContent
# empty
Comment thread
marksmith marked this conversation as resolved.
when Net::HTTPSuccess
begin
body = decode(response.body, response["Content-Encoding"])
Expand Down
Loading
Loading