Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ spec/test_app/config/credentials/
# rspec failure tracking
.rspec_status
.byebug_history
/vendor/bundle/
87 changes: 30 additions & 57 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ end

- **`child_queue_name`**: The Active Job queue for the `DripperChild` batch job. Defaults to `ENV["DATA_DRIP_CHILD_QUEUE"]` if set, otherwise `:data_drip_child`. This is separate from `queue_name` so you can route parent and child jobs to different queues for priority or resource management.

- **`hooks_handler_class_name`**: The name of the class that handles lifecycle hooks for backfill runs and batches. When configured, this class will receive callbacks when backfills change status (e.g., `on_run_completed`, `on_batch_failed`). This is useful for sending notifications, tracking metrics, or integrating with external systems. See the [Hooks](#hooks) section for more details.
- **`hooks_handler_class_name`**: The name of the class that handles lifecycle hooks for backfill runs and batches. When configured, this class will receive callbacks when backfills change status (e.g., `after_run_completed`, `after_batch_failed`). This is useful for sending notifications, tracking metrics, or integrating with external systems. See the [Hooks](#hooks) section for more details.

This configuration is particularly useful when your application uses custom authentication systems, non-standard naming conventions, or when you need DataDrip to integrate with existing API controllers or admin interfaces.

## Hooks

DataDrip provides a powerful hooks system that allows you to respond to lifecycle events during backfill execution. Hooks are triggered when backfill runs or batches change status, enabling you to integrate with external systems, send notifications, track metrics, or perform any custom logic.
DataDrip provides a powerful hooks system that allows you to respond to lifecycle events during backfill execution. Hooks are tied to status transitions and run around the action that changes the status, enabling you to integrate with external systems, send notifications, track metrics, or perform any custom logic.

### Setting Up a Global Hook Handler

Expand All @@ -128,63 +128,40 @@ Then create your hook handler class:
```ruby
# app/services/hook_handler.rb
class HookHandler
# Run hooks - triggered when a BackfillRun changes status
def self.on_run_pending(run)
# Called when a run is created
# Run hooks - triggered when a BackfillRun changes status or action starts
def self.before_run_enqueued(run)
# Called before the run is enqueued
end

def self.on_run_enqueued(run)
def self.around_run_enqueued(run)
# Called around the enqueue action
yield
end

def self.after_run_enqueued(run)
# Called when a run is enqueued for execution
# Example: Track metrics
Metrics.increment('backfill.enqueued', tags: { backfill: run.backfill_class_name })
end

def self.on_run_running(run)
# Called when a run starts processing
end

def self.on_run_completed(run)
def self.after_run_completed(run)
# Called when a run completes successfully
# Example: Send notification
SlackNotifier.notify("Backfill #{run.backfill_class_name} completed!")
end

def self.on_run_failed(run)
def self.after_run_failed(run)
# Called when a run fails
# Example: Send error alert
ErrorTracker.notify("Backfill failed: #{run.error_message}")
end

def self.on_run_stopped(run)
# Called when a run is manually stopped
end

# Batch hooks - triggered when a BackfillRunBatch changes status
def self.on_batch_pending(batch)
# Called when a batch is created
end

def self.on_batch_enqueued(batch)
# Called when a batch is enqueued
end

def self.on_batch_running(batch)
# Called when a batch starts processing
end

def self.on_batch_completed(batch)
def self.after_batch_completed(batch)
# Called when a batch completes
# Example: Update progress tracking
ProgressTracker.update(batch.backfill_run_id, batch.id)
end

def self.on_batch_failed(batch)
# Called when a batch fails
end

def self.on_batch_stopped(batch)
# Called when a batch is stopped
end
end
```

Expand All @@ -196,29 +173,25 @@ DataDrip provides hooks for both backfill runs (entire backfill execution) and b

#### Run Hooks

These hooks receive a `BackfillRun` object as a parameter:
These hooks receive a `BackfillRun` object as a parameter. For every status, you can define any or all of the following:

| Hook | Triggered When |
| ------------------ | --------------------------------- |
| `on_run_pending` | Run is created |
| `on_run_enqueued` | Run is enqueued for execution |
| `on_run_running` | Run starts processing batches |
| `on_run_completed` | All batches complete successfully |
| `on_run_failed` | Run encounters an error |
| `on_run_stopped` | Run is manually stopped |
- `before_run_<status>` (runs first)
- `around_run_<status>` (wraps the action and must `yield`)
- `after_run_<status>` (runs last)

Valid statuses: `pending`, `enqueued`, `running`, `completed`, `failed`, `stopped`.
Hooks always wrap the action that performs the status transition. The `around_*` hook wraps the transition itself, and the `after_*` hook runs after the status update and after the `around_*` hook completes.

#### Batch Hooks

These hooks receive a `BackfillRunBatch` object as a parameter:
These hooks receive a `BackfillRunBatch` object as a parameter. For every status, you can define any or all of the following:

- `before_batch_<status>` (runs first)
- `around_batch_<status>` (wraps the action and must `yield`)
- `after_batch_<status>` (runs last)

| Hook | Triggered When |
| -------------------- | -------------------------------- |
| `on_batch_pending` | Batch is created |
| `on_batch_enqueued` | Batch is enqueued for processing |
| `on_batch_running` | Batch starts processing records |
| `on_batch_completed` | Batch completes successfully |
| `on_batch_failed` | Batch encounters an error |
| `on_batch_stopped` | Batch is stopped |
Valid statuses: `pending`, `enqueued`, `running`, `completed`, `failed`, `stopped`.
Hooks always wrap the action that performs the status transition. The `around_*` hook wraps the transition itself, and the `after_*` hook runs after the status update and after the `around_*` hook completes.
Comment on lines 113 to +194
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states that hooks “always wrap the action that performs the status transition”, but the implementation also runs hooks from an after_commit callback (where “before” is no longer before, and around_* may wrap a no-op). Please clarify in the README when hooks truly wrap an action (e.g., only when transitions use the provided *_!/enqueue/run! APIs) vs when they run post-commit for out-of-band status updates.

Copilot uses AI. Check for mistakes.

### Per-Backfill Hooks

Expand All @@ -236,11 +209,11 @@ class SendWelcomeEmails < DataDrip::Backfill
end

# Backfill-specific hook
def self.on_run_completed(run)
def self.after_run_completed(run)
AdminMailer.backfill_summary(run).deliver_now
end

def self.on_batch_completed(batch)
def self.after_batch_completed(batch)
# Track progress for this specific backfill
Rails.logger.info("Sent #{batch.batch_size} welcome emails")
end
Expand Down
82 changes: 42 additions & 40 deletions app/jobs/data_drip/dripper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,51 @@ class Dripper < DataDrip.base_job_class.safe_constantize
queue_as { DataDrip.queue_name }

def perform(backfill_run)
backfill_run.running!

new_backfill =
backfill_run.backfill_class.new(
batch_size: backfill_run.batch_size,
sleep_time: 5,
backfill_options: backfill_run.options || {}
)
scope = new_backfill.scope

if backfill_run.amount_of_elements.present? &&
backfill_run.amount_of_elements.positive?
scope = scope.limit(backfill_run.amount_of_elements)
end

batch_ids =
scope
.find_in_batches(batch_size: backfill_run.batch_size)
.map do |batch|
{
finish_id: batch.last.id,
start_id: batch.first.id,
actual_size: batch.size
}
end
backfill_run.with_run_hooks(:running) do
backfill_run.running!

new_backfill =
backfill_run.backfill_class.new(
batch_size: backfill_run.batch_size,
sleep_time: 5,
backfill_options: backfill_run.options || {}
)
scope = new_backfill.scope

backfill_run.update!(total_count: scope.count)
if backfill_run.amount_of_elements.present? &&
backfill_run.amount_of_elements.positive?
scope = scope.limit(backfill_run.amount_of_elements)
end

if backfill_run.amount_of_elements.present? &&
backfill_run.amount_of_elements < backfill_run.batch_size
backfill_run.batch_size = backfill_run.amount_of_elements
backfill_run.save!
end
batch_ids =
scope
.find_in_batches(batch_size: backfill_run.batch_size)
.map do |batch|
{
finish_id: batch.last.id,
start_id: batch.first.id,
actual_size: batch.size
}
end

backfill_run.update!(total_count: scope.count)

if backfill_run.amount_of_elements.present? &&
backfill_run.amount_of_elements < backfill_run.batch_size
backfill_run.batch_size = backfill_run.amount_of_elements
backfill_run.save!
end

BackfillRun.transaction do
batch_ids.each do |batch|
BackfillRunBatch.create!(
backfill_run: backfill_run,
status: :pending,
batch_size: batch[:actual_size],
start_id: batch[:start_id],
finish_id: batch[:finish_id]
)
BackfillRun.transaction do
batch_ids.each do |batch|
BackfillRunBatch.create!(
backfill_run: backfill_run,
status: :pending,
batch_size: batch[:actual_size],
start_id: batch[:start_id],
finish_id: batch[:finish_id]
)
end
end
end
rescue StandardError => e
Expand Down
61 changes: 61 additions & 0 deletions app/models/concerns/data_drip/hookable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

module DataDrip
module Hookable
private

def run_status_change_hooks
return unless status_previously_changed?

if @__hooks_ran_for_status_change
@__hooks_ran_for_status_change = false
return
end

run_hook(:before, status)
run_around_hook(status) { }
run_hook(:after, status)
end

def with_action_hooks(status_value)
return yield if @__hooks_in_action

@__hooks_in_action = true
run_hook(:before, status_value)
run_around_hook(status_value) do
@__hooks_ran_for_status_change = true
yield
end
run_hook(:after, status_value)
Comment thread
oriolgual marked this conversation as resolved.
rescue
@__hooks_ran_for_status_change = false if defined?(@__hooks_ran_for_status_change) && @__hooks_ran_for_status_change
raise
ensure
@__hooks_in_action = false
end

def run_hook(timing, status_value)
hook_name = "#{timing}_#{hook_prefix}_#{status_value}"
hook_target_for(hook_name)&.public_send(hook_name, self)
end

def run_around_hook(status_value)
hook_name = "around_#{hook_prefix}_#{status_value}"
hook_target = hook_target_for(hook_name)

if hook_target
hook_target.public_send(hook_name, self) { yield }
else
yield
end
end

def hook_prefix
raise NotImplementedError
end

def hook_target_for(_hook_name)
raise NotImplementedError
end
end
end
Loading
Loading