diff --git a/app/jobs/runtime/events_cleanup.rb b/app/jobs/runtime/events_cleanup.rb
index a1841576395..873c6a7f319 100644
--- a/app/jobs/runtime/events_cleanup.rb
+++ b/app/jobs/runtime/events_cleanup.rb
@@ -9,7 +9,7 @@ def initialize(cutoff_age_in_days)
end
def perform
- Database::OldRecordCleanup.new(Event, cutoff_age_in_days).delete
+ Database::OldRecordCleanup.new(Event, cutoff_age_in_days:).delete
end
def job_name_in_configuration
diff --git a/app/models/runtime/app_usage_event.rb b/app/models/runtime/app_usage_event.rb
index 32913f8cdda..e8c28ba7e74 100644
--- a/app/models/runtime/app_usage_event.rb
+++ b/app/models/runtime/app_usage_event.rb
@@ -9,6 +9,23 @@ class AppUsageEvent < Sequel::Model
:buildpack_guid, :buildpack_name,
:package_state, :previous_package_state, :parent_app_guid,
:parent_app_name, :process_type, :task_name, :task_guid
+
+ def self.usage_lifecycles
+ [
+ {
+ beginning_states: [ProcessModel::STARTED, Repositories::AppUsageEventRepository::WAS_RUNNING_EVENT_STATE],
+ ending_state: ProcessModel::STOPPED,
+ guid_column: :app_guid
+ },
+ {
+ beginning_states: [Repositories::AppUsageEventRepository::TASK_STARTED_EVENT_STATE,
+ Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE],
+ ending_state: Repositories::AppUsageEventRepository::TASK_STOPPED_EVENT_STATE,
+ guid_column: :task_guid
+ }
+ ].freeze
+ end
+
AppUsageEvent.dataset_module do
def supports_window_functions?
false
diff --git a/app/models/runtime/task_model.rb b/app/models/runtime/task_model.rb
index bad22c00eef..fb25a5fe593 100644
--- a/app/models/runtime/task_model.rb
+++ b/app/models/runtime/task_model.rb
@@ -42,7 +42,7 @@ def after_update
def after_destroy
super
- create_stop_event unless terminal_state?
+ create_stop_event_if_needed unless terminal_state?
end
def run_action_user
@@ -137,9 +137,25 @@ def create_start_event
def create_stop_event_if_needed
app_usage_repo = Repositories::AppUsageEventRepository.new
- start_event = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STARTED')
- existing_stop_event = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STOPPED')
- return if start_event.nil? || existing_stop_event.present?
+ return if app_usage_repo.find_by_task_and_state(task: self, state: Repositories::AppUsageEventRepository::TASK_STOPPED_EVENT_STATE).present?
+
+ # Record the stop only when there is recorded evidence that the task
+ # started: the TASK_STARTED event, or the TASK_WAS_RUNNING baseline seeded
+ # for tasks that were already running when the keep-running cleanup was
+ # introduced. Without either, no consumer ever saw the task start, so a
+ # stop event would be unmatched noise.
+ #
+ # NOTE: on MySQL (default REPEATABLE READ) these must be the first reads
+ # in the surrounding transaction. MySQL freezes what a transaction can
+ # see at its first read; if an earlier hook ran a query first, a baseline
+ # committed in the meantime would be invisible here, and the stop would
+ # be wrongly skipped.
+ start_evidence_states = [
+ Repositories::AppUsageEventRepository::TASK_STARTED_EVENT_STATE,
+ Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE
+ ]
+ started = app_usage_repo.find_by_task_and_state(task: self, state: start_evidence_states)
+ return if started.nil?
create_stop_event
end
diff --git a/app/models/services/service_usage_event.rb b/app/models/services/service_usage_event.rb
index f5e694169f9..c09557870d1 100644
--- a/app/models/services/service_usage_event.rb
+++ b/app/models/services/service_usage_event.rb
@@ -7,5 +7,17 @@ class ServiceUsageEvent < Sequel::Model
:service_plan_guid, :service_plan_name,
:service_guid, :service_label,
:service_broker_name, :service_broker_guid
+
+ def self.usage_lifecycles
+ [
+ {
+ beginning_states: [Repositories::ServiceUsageEventRepository::CREATED_EVENT_STATE,
+ Repositories::ServiceUsageEventRepository::UPDATED_EVENT_STATE,
+ Repositories::ServiceUsageEventRepository::WAS_RUNNING_EVENT_STATE],
+ ending_state: Repositories::ServiceUsageEventRepository::DELETED_EVENT_STATE,
+ guid_column: :service_instance_guid
+ }
+ ].freeze
+ end
end
end
diff --git a/app/repositories/app_usage_event_repository.rb b/app/repositories/app_usage_event_repository.rb
index 51d751a8cd1..482fc4cf5ad 100644
--- a/app/repositories/app_usage_event_repository.rb
+++ b/app/repositories/app_usage_event_repository.rb
@@ -4,6 +4,18 @@
module VCAP::CloudController
module Repositories
class AppUsageEventRepository
+ WAS_RUNNING_EVENT_STATE = 'WAS_RUNNING'.freeze
+ TASK_STARTED_EVENT_STATE = 'TASK_STARTED'.freeze
+ TASK_STOPPED_EVENT_STATE = 'TASK_STOPPED'.freeze
+ # Task baselines get their own state (rather than reusing WAS_RUNNING)
+ # because task events share the app_usage_events table with app events but
+ # carry an empty app_guid. If task baselines said WAS_RUNNING, the cleanup
+ # and the backfill's repair would both treat every task baseline as
+ # belonging to a single app whose guid is '' -- the cleanup would wrongly
+ # prune them, and the repair would write bogus STOPPED events for that
+ # phantom app.
+ TASK_WAS_RUNNING_EVENT_STATE = 'TASK_WAS_RUNNING'.freeze
+
def find(guid)
AppUsageEvent.find(guid:)
end
@@ -152,7 +164,7 @@ def purge_and_reseed_started_apps!
end
def delete_events_older_than(cutoff_age_in_days)
- Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days, keep_at_least_one_record: true).delete
+ Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true).delete
end
private
diff --git a/app/repositories/service_usage_event_repository.rb b/app/repositories/service_usage_event_repository.rb
index 35fbef5a1f0..baad88a7772 100644
--- a/app/repositories/service_usage_event_repository.rb
+++ b/app/repositories/service_usage_event_repository.rb
@@ -7,6 +7,7 @@ class ServiceUsageEventRepository
DELETED_EVENT_STATE = 'DELETED'.freeze
CREATED_EVENT_STATE = 'CREATED'.freeze
UPDATED_EVENT_STATE = 'UPDATED'.freeze
+ WAS_RUNNING_EVENT_STATE = 'WAS_RUNNING'.freeze
def find(guid)
ServiceUsageEvent.find(guid:)
@@ -92,7 +93,7 @@ def purge_and_reseed_service_instances!
end
def delete_events_older_than(cutoff_age_in_days)
- Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days, keep_at_least_one_record: true).delete
+ Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true).delete
end
end
end
diff --git a/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb b/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb
new file mode 100644
index 00000000000..728394f11c0
--- /dev/null
+++ b/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb
@@ -0,0 +1,68 @@
+Sequel.migration do
+ no_transaction # to use the 'concurrently' option
+
+ up do
+ if database_type == :postgres
+ VCAP::Migration.with_concurrent_timeout(self) do
+ add_index :app_usage_events, %i[state app_guid id],
+ name: :app_usage_events_lifecycle_index,
+ if_not_exists: true,
+ concurrently: true
+
+ add_index :service_usage_events, %i[state service_instance_guid id],
+ name: :service_usage_events_lifecycle_index,
+ if_not_exists: true,
+ concurrently: true
+ end
+
+ elsif database_type == :mysql
+ alter_table :app_usage_events do
+ # rubocop:disable Sequel/ConcurrentIndex
+ add_index %i[state app_guid id], name: :app_usage_events_lifecycle_index unless @db.indexes(:app_usage_events).include?(:app_usage_events_lifecycle_index)
+ # rubocop:enable Sequel/ConcurrentIndex
+ end
+
+ alter_table :service_usage_events do
+ # rubocop:disable Sequel/ConcurrentIndex
+ unless @db.indexes(:service_usage_events).include?(:service_usage_events_lifecycle_index)
+ add_index %i[state service_instance_guid id],
+ name: :service_usage_events_lifecycle_index
+ end
+ # rubocop:enable Sequel/ConcurrentIndex
+ end
+ end
+ end
+
+ down do
+ if database_type == :postgres
+ VCAP::Migration.with_concurrent_timeout(self) do
+ drop_index :app_usage_events, %i[state app_guid id],
+ name: :app_usage_events_lifecycle_index,
+ if_exists: true,
+ concurrently: true
+
+ drop_index :service_usage_events, %i[state service_instance_guid id],
+ name: :service_usage_events_lifecycle_index,
+ if_exists: true,
+ concurrently: true
+ end
+ end
+
+ if database_type == :mysql
+ alter_table :app_usage_events do
+ # rubocop:disable Sequel/ConcurrentIndex
+ drop_index %i[state app_guid id], name: :app_usage_events_lifecycle_index if @db.indexes(:app_usage_events).include?(:app_usage_events_lifecycle_index)
+ # rubocop:enable Sequel/ConcurrentIndex
+ end
+
+ alter_table :service_usage_events do
+ # rubocop:disable Sequel/ConcurrentIndex
+ if @db.indexes(:service_usage_events).include?(:service_usage_events_lifecycle_index)
+ drop_index %i[state service_instance_guid id],
+ name: :service_usage_events_lifecycle_index
+ end
+ # rubocop:enable Sequel/ConcurrentIndex
+ end
+ end
+ end
+end
diff --git a/db/migrations/20260601120100_seed_was_running_app_usage_events.rb b/db/migrations/20260601120100_seed_was_running_app_usage_events.rb
new file mode 100644
index 00000000000..f22782218a5
--- /dev/null
+++ b/db/migrations/20260601120100_seed_was_running_app_usage_events.rb
@@ -0,0 +1,23 @@
+require 'database/was_running_backfill'
+
+Sequel.migration do
+ no_transaction # backfill manages its own per-batch transactions
+
+ up do
+ logger = Steno.logger('cc.backfill.was_running')
+ if VCAP::WasRunningBackfill.skip?
+ VCAP::WasRunningBackfill.log_skip(logger, 'app')
+ else
+ VCAP::WasRunningBackfill.seed_app_usage_events(self, logger)
+ end
+ end
+
+ down do
+ # Deliberately a no-op. Consumers may already have read the seeded rows,
+ # and deleting a row cannot make a consumer un-read it -- it would only
+ # leave any later STOPPED events without a start event to pair with.
+ # Leaving the rows is safe: re-running the migration or the
+ # 'db:was_running_backfill' rake task skips resources that already have a
+ # baseline.
+ end
+end
diff --git a/db/migrations/20260601120200_seed_was_running_service_usage_events.rb b/db/migrations/20260601120200_seed_was_running_service_usage_events.rb
new file mode 100644
index 00000000000..353dd638883
--- /dev/null
+++ b/db/migrations/20260601120200_seed_was_running_service_usage_events.rb
@@ -0,0 +1,23 @@
+require 'database/was_running_backfill'
+
+Sequel.migration do
+ no_transaction # backfill manages its own per-batch transactions
+
+ up do
+ logger = Steno.logger('cc.backfill.was_running')
+ if VCAP::WasRunningBackfill.skip?
+ VCAP::WasRunningBackfill.log_skip(logger, 'service')
+ else
+ VCAP::WasRunningBackfill.seed_service_usage_events(self, logger)
+ end
+ end
+
+ down do
+ # Deliberately a no-op. Consumers may already have read the seeded rows,
+ # and deleting a row cannot make a consumer un-read it -- it would only
+ # leave any later DELETED events without a start event to pair with.
+ # Leaving the rows is safe: re-running the migration or the
+ # 'db:was_running_backfill' rake task skips instances that already have a
+ # baseline.
+ end
+end
diff --git a/db/migrations/20260601120300_seed_was_running_task_usage_events.rb b/db/migrations/20260601120300_seed_was_running_task_usage_events.rb
new file mode 100644
index 00000000000..0d04e8845f1
--- /dev/null
+++ b/db/migrations/20260601120300_seed_was_running_task_usage_events.rb
@@ -0,0 +1,26 @@
+require 'database/was_running_backfill'
+
+Sequel.migration do
+ no_transaction # backfill manages its own per-batch transactions
+
+ up do
+ logger = Steno.logger('cc.backfill.was_running')
+ if VCAP::WasRunningBackfill.skip?
+ VCAP::WasRunningBackfill.log_skip(logger, 'task')
+ else
+ VCAP::WasRunningBackfill.seed_task_usage_events(self, logger)
+ end
+ end
+
+ down do
+ # Deliberately a no-op. Consumers may already have read the seeded rows,
+ # and deleting a row cannot make a consumer un-read it -- it would only
+ # leave any later TASK_STOPPED events without a start event to pair with.
+ # Worse: a task's stop event is only written when the task has recorded
+ # start evidence, and these rows ARE that evidence for tasks whose
+ # TASK_STARTED the cleanup already deleted. Remove them and those tasks'
+ # eventual stops are silently swallowed. Leaving the rows is safe:
+ # re-running the migration or the 'db:was_running_backfill' rake task
+ # skips tasks that already have a baseline.
+ end
+end
diff --git a/docs/v2/app_usage_events/list_all_app_usage_events.html b/docs/v2/app_usage_events/list_all_app_usage_events.html
index 471dd0e8586..583bce89d12 100644
--- a/docs/v2/app_usage_events/list_all_app_usage_events.html
+++ b/docs/v2/app_usage_events/list_all_app_usage_events.html
@@ -631,9 +631,11 @@
Body
- STARTED
- STOPPED
+ - WAS_RUNNING
- BUILDPACK_SET
- TASK_STARTED
- TASK_STOPPED
+ - TASK_WAS_RUNNING
diff --git a/docs/v2/service_usage_events/list_service_usage_events.html b/docs/v2/service_usage_events/list_service_usage_events.html
index 501999684c8..5f1b68b393a 100644
--- a/docs/v2/service_usage_events/list_service_usage_events.html
+++ b/docs/v2/service_usage_events/list_service_usage_events.html
@@ -290,6 +290,7 @@ Body
CREATED
DELETED
UPDATED
+ WAS_RUNNING
|
diff --git a/docs/v3/source/includes/resources/app_usage_events/_delete.md.erb b/docs/v3/source/includes/resources/app_usage_events/_delete.md.erb
index 5c87b2545ec..54814d0a089 100644
--- a/docs/v3/source/includes/resources/app_usage_events/_delete.md.erb
+++ b/docs/v3/source/includes/resources/app_usage_events/_delete.md.erb
@@ -21,6 +21,8 @@ Content-Type: application/json
Destroys all existing events. Populates new usage events, one for each started app. All populated events will have a `created_at` value of current time. There is the potential race condition if apps are currently being started, stopped, or scaled. The seeded usage events will have the same guid as the app.
+**Note:** the reseed only writes `STARTED` events for app processes — it does not restore the start evidence (`TASK_STARTED`/`TASK_WAS_RUNNING`) of currently-running tasks, and `TASK_STOPPED` events are only emitted for tasks with recorded start evidence. After a purge, operators should run `rake db:was_running_backfill` on a Cloud Controller VM to reseed baselines for running tasks; otherwise their eventual stops are silently suppressed.
+
#### Definition
`POST /v3/app_usage_events/actions/destructively_purge_all_and_reseed`
diff --git a/docs/v3/source/includes/resources/app_usage_events/_object.md.erb b/docs/v3/source/includes/resources/app_usage_events/_object.md.erb
index 36afafaf3d1..d0c72e35754 100644
--- a/docs/v3/source/includes/resources/app_usage_events/_object.md.erb
+++ b/docs/v3/source/includes/resources/app_usage_events/_object.md.erb
@@ -30,3 +30,30 @@ Name | Type | Description
**instance_count.current** | _integer_ or `null` | Current instance count of the app that this event pertains to, if applicable
**instance_count.previous** | _integer_ or `null` | Previous instance count of the app that this event pertains to, if applicable
**links** | [_links object_](#links) | Links to related resources
+
+#### WAS_RUNNING and TASK_WAS_RUNNING events
+
+`WAS_RUNNING` and `TASK_WAS_RUNNING` are synthetic values for `state.current` recorded once per running process (`WAS_RUNNING`) and once per running task (`TASK_WAS_RUNNING`) by a one-time data migration when the keep-running cleanup feature was introduced. They mark every process and task that was already running at the time of the upgrade so that billing consumers can bootstrap from a complete baseline even if the original `STARTED`/`TASK_STARTED` events have been pruned.
+
+**Consumer interpretation** (read `WAS_RUNNING`/`STARTED` as `TASK_WAS_RUNNING`/`TASK_STARTED` for task events, which are keyed by `task.guid`):
+
+* If you have not previously recorded a `STARTED` event for this resource, treat `WAS_RUNNING` as equivalent to `STARTED`.
+* If you have already recorded `STARTED` (or an earlier `WAS_RUNNING`) for this resource, treat as a redundant baseline confirmation and ignore.
+* `created_at` reflects when the backfill migration ran, **not** when the app or task actually started. Treat `WAS_RUNNING` as a baseline marker that the resource was already running as of that timestamp, not as the true start of the running interval.
+* `state.previous` on a `WAS_RUNNING` event is always `null`. Subsequent real events for the same resource will continue to report their actual prior process state in `state.previous` (typically `STARTED`). If you perform chain validation, treat `WAS_RUNNING` as equivalent to `STARTED` for the purpose of validating the next event's `state.previous`.
+
+#### Repaired ending events
+
+The backfill (and any later run of its recovery task) repairs baselines that turn out to be unpaired: if a `WAS_RUNNING`/`TASK_WAS_RUNNING` event was recorded for a resource that is no longer running and no later ending event exists for it — for example because the resource stopped while the backfill was still in progress — the missing ending event (`STOPPED`/`TASK_STOPPED`) is appended. Baselines are never deleted. A repaired ending event:
+
+* carries a `created_at` of when the repair ran, **not** when the resource actually stopped — the interval it closes may overstate the true run by that gap;
+* copies the footprint (`instance_count`, `memory_in_mb_per_instance`) of the baseline it pairs;
+* reports the baseline's state (`WAS_RUNNING`/`TASK_WAS_RUNNING`) in `state.previous`, which normal ending events never carry — use this to tell repaired endings apart.
+
+#### What a consumer must do
+
+Independent of the backfill, the events stream asks three things of any consumer that pairs beginnings with endings:
+
+1. Ignore a `WAS_RUNNING`/`TASK_WAS_RUNNING` event for a resource you already track (see above).
+2. Tolerate duplicate ending events for the same resource: close the interval on the first ending after a beginning and ignore further endings until the next beginning.
+3. Treat an ending event with no visible beginning for that resource as noise.
diff --git a/docs/v3/source/includes/resources/service_usage_events/_object.md.erb b/docs/v3/source/includes/resources/service_usage_events/_object.md.erb
index f904e1ff083..beeaed2b546 100644
--- a/docs/v3/source/includes/resources/service_usage_events/_object.md.erb
+++ b/docs/v3/source/includes/resources/service_usage_events/_object.md.erb
@@ -26,3 +26,25 @@ Name | Type | Description
**service_broker.guid** | _string_ or `null` | Unique identifier of the service broker that this event pertains to, if applicable
**service_broker.name** | _string_ or `null` | Name of the service broker that this event pertains to, if applicable
**links** | [_links object_](#links) | Links to related resources
+
+#### WAS_RUNNING events
+
+`WAS_RUNNING` is a synthetic value for `state` recorded once per existing service instance by a one-time data migration when the keep-running cleanup feature was introduced. It marks every service instance that existed at the time of the upgrade so that billing consumers can bootstrap from a complete baseline of service instances even if the original `CREATED` events have been pruned.
+
+**Consumer interpretation:**
+
+* If you have not previously recorded a `CREATED` event for this service instance, treat `WAS_RUNNING` as equivalent to `CREATED`.
+* If you have already recorded `CREATED` (or an earlier `WAS_RUNNING`) for this instance, treat as a redundant baseline confirmation and ignore.
+* `created_at` reflects when the backfill migration ran, **not** when the service instance was created. Treat `WAS_RUNNING` as a baseline marker that the instance already existed as of that timestamp.
+
+#### Repaired ending events
+
+The backfill (and any later run of its recovery task) repairs baselines that turn out to be unpaired: if a `WAS_RUNNING` event was recorded for a service instance that no longer exists and no later `DELETED` event exists for it — for example because the instance was deleted while the backfill was still in progress — the missing `DELETED` event is appended, copying the baseline's instance, plan, and broker attributes. Baselines are never deleted. A repaired `DELETED` event carries a `created_at` of when the repair ran, **not** when the instance was actually deleted — the interval it closes may overstate the instance's true lifetime by that gap.
+
+#### What a consumer must do
+
+Independent of the backfill, the events stream asks three things of any consumer that pairs beginnings with endings:
+
+1. Ignore a `WAS_RUNNING` event for a service instance you already track (see above).
+2. Tolerate duplicate `DELETED` events for the same instance: close the interval on the first one and ignore the rest.
+3. Treat a `DELETED` event with no visible `CREATED`/`UPDATED`/`WAS_RUNNING` for that instance as noise.
diff --git a/lib/cloud_controller/config_schemas/api_schema.rb b/lib/cloud_controller/config_schemas/api_schema.rb
index ac952146dc3..7c9b685478c 100644
--- a/lib/cloud_controller/config_schemas/api_schema.rb
+++ b/lib/cloud_controller/config_schemas/api_schema.rb
@@ -109,6 +109,7 @@ class ApiSchema < VCAP::Config
optional(:migration_psql_concurrent_statement_timeout_in_seconds) => Integer,
optional(:migration_psql_worker_memory_kb) => Integer,
optional(:skip_bigint_id_migration) => bool,
+ optional(:skip_was_running_backfill) => bool,
db: {
optional(:database) => Hash, # db connection hash for sequel
max_connections: Integer, # max connections in the connection pool
diff --git a/lib/cloud_controller/config_schemas/migrate_schema.rb b/lib/cloud_controller/config_schemas/migrate_schema.rb
index 971ff0999aa..eff4f6e2d14 100644
--- a/lib/cloud_controller/config_schemas/migrate_schema.rb
+++ b/lib/cloud_controller/config_schemas/migrate_schema.rb
@@ -10,6 +10,7 @@ class MigrateSchema < VCAP::Config
optional(:migration_psql_concurrent_statement_timeout_in_seconds) => Integer,
optional(:migration_psql_worker_memory_kb) => Integer,
optional(:skip_bigint_id_migration) => bool,
+ optional(:skip_was_running_backfill) => bool,
db: {
optional(:database) => Hash, # db connection hash for sequel
diff --git a/lib/database/batch_delete.rb b/lib/database/batch_delete.rb
index 9fcc4b58fea..377eff097c9 100644
--- a/lib/database/batch_delete.rb
+++ b/lib/database/batch_delete.rb
@@ -11,10 +11,12 @@ def delete
total_count = 0
loop do
- set = dataset.limit(amount)
- break if set.empty?
+ # Fetch the batch's ids in the same query that checks for emptiness, so the
+ # (potentially expensive) filtered dataset is evaluated once per batch.
+ ids = dataset.limit(amount).select_map(:id)
+ break if ids.empty?
- total_count += delete_batch(set)
+ total_count += delete_batch(ids)
end
total_count
@@ -22,8 +24,8 @@ def delete
private
- def delete_batch(set)
- dataset.model.where(id: set.select_map(:id)).delete
+ def delete_batch(ids)
+ dataset.model.where(id: ids).delete
end
end
end
diff --git a/lib/database/old_record_cleanup.rb b/lib/database/old_record_cleanup.rb
index 44227d84507..e29446af7ea 100644
--- a/lib/database/old_record_cleanup.rb
+++ b/lib/database/old_record_cleanup.rb
@@ -3,25 +3,33 @@
module Database
class OldRecordCleanup
class NoCurrentTimestampError < StandardError; end
- attr_reader :model, :days_ago, :keep_at_least_one_record
+ attr_reader :model, :cutoff_age_in_days, :keep_at_least_one_record, :keep_running_records
- def initialize(model, days_ago, keep_at_least_one_record: false)
+ def initialize(model, cutoff_age_in_days:, keep_at_least_one_record: false, keep_running_records: false)
@model = model
- @days_ago = days_ago
+ @cutoff_age_in_days = cutoff_age_in_days
@keep_at_least_one_record = keep_at_least_one_record
+ @keep_running_records = keep_running_records
end
+ # The two options work together: keep_running_records always keeps the
+ # beginning events that show a resource is still running (those with no
+ # later ending event for the same resource), and keep_at_least_one_record
+ # additionally keeps the single newest row, so the table is never fully
+ # emptied for clients that poll the most recent event.
def delete
- cutoff_date = current_timestamp_from_database - days_ago.to_i.days
-
+ cutoff_date = current_timestamp_from_database - cutoff_age_in_days.to_i.days
old_records = model.dataset.where(Sequel.lit('created_at < ?', cutoff_date))
- if keep_at_least_one_record
- last_record = model.order(:id).last
- old_records = old_records.where(Sequel.lit('id < ?', last_record.id)) if last_record
- end
- logger.info("Cleaning up #{old_records.count} #{model.table_name} table rows")
- Database::BatchDelete.new(old_records, 1000).delete
+ if keep_running_records
+ raise ArgumentError.new("keep_running_records requires #{model} to define .usage_lifecycles") unless model.respond_to?(:usage_lifecycles)
+
+ delete_keeping_running_records(old_records)
+ else
+ old_records = exclude_newest_record(old_records)
+ logger.info("Cleaning up #{old_records.count} #{model.table_name} table rows")
+ Database::BatchDelete.new(old_records, 1000).delete
+ end
end
private
@@ -35,5 +43,96 @@ def current_timestamp_from_database
def logger
@logger ||= Steno.logger('cc.old_record_cleanup')
end
+
+ # Deletes old records while retaining a usable billing baseline for
+ # still-running resources.
+ #
+ # For each lifecycle of the model, a beginning-state row (e.g.
+ # STARTED/CREATED/WAS_RUNNING) is prunable when:
+ # * a later ending-state row (e.g. STOPPED/DELETED) is also old -- the run is
+ # over; or
+ # * it is a superseded baseline: an earlier beginning of the same run and a
+ # later beginning both exist (and are old). Consumers only need the first
+ # beginning of the current run (the true start time) and the latest one
+ # (the current size); the rows in between, written each time a running
+ # resource is scaled or updated, tell a consumer nothing it still needs.
+ #
+ # The deletes are ordered deliberately: prunable beginning rows are removed
+ # FIRST, while the rows that make them prunable still exist, so each beginning
+ # stays prunable until it is itself deleted. Only then are the ending rows (and
+ # any other, non-lifecycle states) removed. Reversing the order could strand a
+ # beginning row whose paired ending was deleted in an earlier batch.
+ def delete_keeping_running_records(old_records)
+ lifecycles = model.usage_lifecycles
+ prunable_beginnings = lifecycles.map { |lifecycle| prunable_beginnings_dataset(old_records, lifecycle) }
+
+ # Everything that is not a beginning-state row of some lifecycle (ending rows
+ # plus any other, non-lifecycle states) is unconditionally prunable.
+ all_beginning_states = lifecycles.flat_map { |lifecycle| lifecycle.fetch(:beginning_states) }
+ unconditional_records = exclude_newest_record(old_records.exclude(state: all_beginning_states))
+
+ deleted_count = prunable_beginnings.sum { |dataset| Database::BatchDelete.new(dataset, 1000).delete }
+ deleted_count += Database::BatchDelete.new(unconditional_records, 1000).delete
+
+ logger.info("Cleaned up #{deleted_count} #{model.table_name} table rows")
+ end
+
+ # Builds the dataset of old beginning-state rows that are prunable for one
+ # lifecycle. All the subqueries use the (state, guid, id) lifecycle index,
+ # and a higher id always means created later. They only look at OLD rows: a
+ # superseded beginning is kept until the row that supersedes it is itself
+ # old, so a consumer reading within the retention window never sees the
+ # decision change underneath it.
+ def prunable_beginnings_dataset(old_records, lifecycle)
+ beginning_states = lifecycle.fetch(:beginning_states)
+ ending_state = lifecycle.fetch(:ending_state)
+ guid_column = lifecycle.fetch(:guid_column)
+
+ old_beginnings = old_records.where(state: beginning_states)
+ old_endings = old_records.where(state: ending_state)
+ initial_records = old_beginnings.from_self(alias: :initial_records)
+
+ # The run is over: an ending row for the same resource was created later.
+ matching_ending = old_endings.from_self(alias: :final_records).
+ where(Sequel[:final_records][guid_column] => Sequel[:initial_records][guid_column]).
+ where { Sequel[:final_records][:id] > Sequel[:initial_records][:id] }.
+ select(1).exists
+
+ # Not the run's true start: an earlier beginning of the same run exists,
+ # i.e. one with no ending event between the two.
+ intervening_ending = old_endings.from_self(alias: :intervening_endings).
+ where(Sequel[:intervening_endings][guid_column] => Sequel[:earlier_beginnings][guid_column]).
+ where { Sequel[:intervening_endings][:id] > Sequel[:earlier_beginnings][:id] }.
+ where { Sequel[:intervening_endings][:id] < Sequel[:initial_records][:id] }.
+ select(1).exists
+ earlier_beginning_in_same_run = old_beginnings.from_self(alias: :earlier_beginnings).
+ where(Sequel[:earlier_beginnings][guid_column] => Sequel[:initial_records][guid_column]).
+ where { Sequel[:earlier_beginnings][:id] < Sequel[:initial_records][:id] }.
+ where(Sequel.~(intervening_ending)).
+ select(1).exists
+
+ # Not the latest baseline: a later beginning for the same resource exists.
+ later_beginning = old_beginnings.from_self(alias: :later_beginnings).
+ where(Sequel[:later_beginnings][guid_column] => Sequel[:initial_records][guid_column]).
+ where { Sequel[:later_beginnings][:id] > Sequel[:initial_records][:id] }.
+ select(1).exists
+
+ superseded_baseline = Sequel.&(earlier_beginning_in_same_run, later_beginning)
+ exclude_newest_record(initial_records.where(Sequel.|(matching_ending, superseded_baseline)))
+ end
+
+ # When keep_at_least_one_record is set, never delete the single newest row so
+ # the table always retains at least one record.
+ def exclude_newest_record(records)
+ return records unless keep_at_least_one_record && newest_record_id
+
+ records.where(Sequel.lit('id < ?', newest_record_id))
+ end
+
+ def newest_record_id
+ return @newest_record_id if defined?(@newest_record_id)
+
+ @newest_record_id = model.order(:id).last&.id
+ end
end
end
diff --git a/lib/database/was_running_backfill.rb b/lib/database/was_running_backfill.rb
new file mode 100644
index 00000000000..3d59fb56d4c
--- /dev/null
+++ b/lib/database/was_running_backfill.rb
@@ -0,0 +1,496 @@
+# Backfills synthetic WAS_RUNNING usage events (TASK_WAS_RUNNING for tasks) for
+# resources that were already running/existing when the keep-running cleanup
+# feature was introduced.
+#
+# Runs from thin Sequel migrations, in batches, like VCAP::BigintMigration.
+# Each batch walks the source table by id and commits in its own transaction,
+# so no single statement comes near the migration statement timeout. Running it
+# again is safe: the NOT EXISTS guards skip resources whose start is already on
+# record, and the post-seed repair only adds missing ending events -- it never
+# deletes rows a consumer may already have read (see repair_stale_*). Uses only
+# raw Sequel, no Cloud Controller models or repositories, because at migration
+# time the schema is the contract, not the app code.
+# rubocop:disable Metrics/ModuleLength
+module VCAP::WasRunningBackfill
+ WAS_RUNNING = 'WAS_RUNNING'.freeze
+ # Task baselines use a distinct state because task events share the
+ # app_usage_events table with app events but carry an empty app_guid -- see
+ # Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE.
+ TASK_WAS_RUNNING = 'TASK_WAS_RUNNING'.freeze
+ # The states of normal, API-written usage events, used by the seed guards and
+ # the repair. The strings are deliberately copied from the model/repository
+ # constants rather than referenced, so migration-time code never loads app
+ # classes; a spec asserts the copies stay equal.
+ STARTED = 'STARTED'.freeze
+ STOPPED = 'STOPPED'.freeze
+ TASK_STARTED = 'TASK_STARTED'.freeze
+ TASK_STOPPED = 'TASK_STOPPED'.freeze
+ CREATED = 'CREATED'.freeze
+ UPDATED = 'UPDATED'.freeze
+ DELETED = 'DELETED'.freeze
+ # Task states that count as "still running", for BOTH the seed filters and
+ # the repair predicate. CANCELING is the span between a cancel request and
+ # Diego reporting the task dead: the task is still running and still
+ # billable, no usage event marks the transition, and the task always ends in
+ # FAILED later. Seed and repair must agree on this list. If the seed counted
+ # CANCELING and the repair did not, the repair would write a TASK_STOPPED for
+ # a task that is still running -- and when the task really stopped, TaskModel
+ # would see a stop event already exists and not write the real one.
+ RUNNING_TASK_STATES = %w[RUNNING CANCELING].freeze
+ DEFAULT_BATCH_SIZE = 1000
+ # Only one backfill may run at a time. If two ran at once (say an operator's
+ # rake task racing another deploy's seed migrations), both would check "is a
+ # stop event missing?" before either had written one, and both would write it
+ # -- duplicate endings. A session-scoped database advisory lock prevents
+ # that. Postgres advisory locks are keyed by a number (this one is the
+ # timestamp of the first seed migration); MySQL locks are keyed by name.
+ ADVISORY_LOCK_KEY = 20_260_601_120_100
+ ADVISORY_LOCK_NAME = 'cloud_controller.was_running_backfill'.freeze
+
+ class << self
+ # Operators can opt out (e.g. very large foundations, or downstream usage-event
+ # consumers that are not yet ready for the WAS_RUNNING state). Mirrors
+ # skip_bigint_id_migration. The flag is checked by the seed migrations rather
+ # than here, so the 'db:was_running_backfill' rake task can still seed the
+ # baseline after a skipped migration has been recorded as applied.
+ def skip?
+ VCAP::CloudController::Config.config&.get(:skip_was_running_backfill) || false
+ rescue VCAP::CloudController::Config::InvalidConfigPath
+ false
+ end
+
+ def log_skip(logger, kind)
+ logger.info("skipping WAS_RUNNING #{kind} usage event backfill (skip_was_running_backfill is set); " \
+ "run 'rake db:was_running_backfill' to seed the baseline later")
+ end
+
+ # Holds a session-scoped advisory lock for the duration of the block.
+ # db.synchronize pins the current thread to one pooled connection so the
+ # acquire, the block's queries, and the release all run on the same
+ # database session.
+ def with_advisory_lock(db)
+ db.synchronize do
+ raise 'another WAS_RUNNING backfill is already running (advisory lock is held by another session); try again later' unless try_advisory_lock(db)
+
+ begin
+ yield
+ ensure
+ release_advisory_lock(db)
+ end
+ end
+ end
+
+ def seed_app_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE)
+ uuid_fn = uuid_function(db)
+ each_batch(db[:processes].where(state: 'STARTED'), batch_size) do |low, high|
+ db.run(app_usage_events_insert_sql(uuid_fn, low, high))
+ logger.info("backfilled WAS_RUNNING app usage events up to process id #{high}")
+ end
+ repair_stale_app_usage_events(db, logger, batch_size)
+ end
+
+ def seed_service_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE)
+ uuid_fn = uuid_function(db)
+ each_batch(db[:service_instances], batch_size) do |low, high|
+ db.run(service_usage_events_insert_sql(uuid_fn, low, high))
+ logger.info("backfilled WAS_RUNNING service usage events up to service_instance id #{high}")
+ end
+ repair_stale_service_usage_events(db, logger, batch_size)
+ end
+
+ def seed_task_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE)
+ uuid_fn = uuid_function(db)
+ each_batch(db[:tasks].where(state: RUNNING_TASK_STATES), batch_size) do |low, high|
+ db.run(task_usage_events_insert_sql(uuid_fn, low, high))
+ logger.info("backfilled TASK_WAS_RUNNING app usage events up to task id #{high}")
+ end
+ repair_stale_task_usage_events(db, logger, batch_size)
+ end
+
+ private
+
+ def try_advisory_lock(db)
+ case db.database_type
+ when :postgres then db.get(Sequel.function(:pg_try_advisory_lock, ADVISORY_LOCK_KEY))
+ when :mysql then db.get(Sequel.function(:get_lock, ADVISORY_LOCK_NAME, 0)) == 1
+ else raise "unsupported database: #{db.database_type}"
+ end
+ end
+
+ def release_advisory_lock(db)
+ case db.database_type
+ when :postgres then db.get(Sequel.function(:pg_advisory_unlock, ADVISORY_LOCK_KEY))
+ when :mysql then db.get(Sequel.function(:release_lock, ADVISORY_LOCK_NAME))
+ end
+ end
+
+ # The latest-package / latest-droplet subqueries are limited to the batch's
+ # apps so they never scan the whole packages/droplets tables, which could
+ # blow the migration statement timeout.
+ #
+ # The COALESCEs are defensive: processes.memory/instances and apps.name are
+ # nullable columns whose defaults are normally filled in by the model layer,
+ # but the target event columns are NOT NULL — one legacy NULL row written
+ # outside the models must not abort the whole migration.
+ #
+ # previous_state is NULL, not the current state. purge_and_reseed_started_apps!
+ # writes previous_state == state (STARTED/STARTED) to mean "no change, current
+ # snapshot" — safe only because its state is the real STARTED. Our state is
+ # the made-up WAS_RUNNING, so writing STARTED here would claim a
+ # STARTED->WAS_RUNNING transition happened, and writing WAS_RUNNING would
+ # claim there was an earlier WAS_RUNNING event. Neither is true.
+ #
+ # The NOT EXISTS guard also skips processes that still have their real
+ # STARTED event (the keep-running cleanup keeps it for as long as the
+ # process runs). Without that, running the rake task again would give every
+ # process started since the last run a second "start" on record, and a
+ # consumer that counts starts would bill it twice. Same in the service/task
+ # seeds below.
+ def app_usage_events_insert_sql(uuid_fn, low, high)
+ batch_apps = "SELECT app_guid FROM processes WHERE id > #{low} AND id <= #{high} AND state = 'STARTED'"
+ <<~SQL.squish
+ INSERT INTO app_usage_events (
+ guid, created_at,
+ state, previous_state,
+ instance_count, previous_instance_count,
+ memory_in_mb_per_instance, previous_memory_in_mb_per_instance,
+ app_guid, app_name,
+ parent_app_guid, parent_app_name,
+ process_type,
+ space_guid, space_name, org_guid,
+ buildpack_guid, buildpack_name,
+ package_state, previous_package_state
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP,
+ '#{WAS_RUNNING}', NULL,
+ COALESCE(p.instances, 0), COALESCE(p.instances, 0),
+ COALESCE(p.memory, 0), COALESCE(p.memory, 0),
+ p.guid, COALESCE(parent_app.name, ''),
+ parent_app.guid, COALESCE(parent_app.name, ''),
+ p.type,
+ spaces.guid, spaces.name, organizations.guid,
+ desired_droplet.buildpack_receipt_buildpack_guid, desired_droplet.buildpack_receipt_buildpack,
+ CASE
+ WHEN latest_droplet.state = 'FAILED' THEN 'FAILED'
+ WHEN latest_droplet.state = 'STAGED' AND latest_droplet.guid = parent_app.droplet_guid THEN 'STAGED'
+ WHEN latest_package.state = 'FAILED' THEN 'FAILED'
+ ELSE 'PENDING'
+ END,
+ 'UNKNOWN'
+ FROM processes p
+ INNER JOIN apps parent_app ON parent_app.guid = p.app_guid
+ INNER JOIN spaces ON spaces.guid = parent_app.space_guid
+ INNER JOIN organizations ON organizations.id = spaces.organization_id
+ LEFT JOIN droplets desired_droplet ON desired_droplet.guid = parent_app.droplet_guid
+ LEFT JOIN (
+ SELECT pkg.guid, pkg.app_guid, pkg.state FROM packages pkg
+ INNER JOIN (
+ SELECT app_guid, MAX(id) AS max_id FROM packages
+ WHERE app_guid IN (#{batch_apps}) GROUP BY app_guid
+ ) lp_ids ON lp_ids.app_guid = pkg.app_guid AND lp_ids.max_id = pkg.id
+ ) latest_package ON latest_package.app_guid = parent_app.guid
+ LEFT JOIN (
+ SELECT d.guid, d.package_guid, d.state FROM droplets d
+ INNER JOIN (
+ SELECT package_guid, MAX(id) AS max_id FROM droplets
+ WHERE package_guid IN (SELECT guid FROM packages WHERE app_guid IN (#{batch_apps}))
+ GROUP BY package_guid
+ ) ld_ids ON ld_ids.package_guid = d.package_guid AND ld_ids.max_id = d.id
+ ) latest_droplet ON latest_droplet.package_guid = latest_package.guid
+ WHERE p.id > #{low} AND p.id <= #{high}
+ AND p.state = 'STARTED'
+ AND NOT EXISTS (
+ SELECT 1 FROM app_usage_events WHERE state IN ('#{WAS_RUNNING}', '#{STARTED}') AND app_guid = p.guid
+ )
+ SQL
+ end
+
+ def service_usage_events_insert_sql(uuid_fn, low, high)
+ <<~SQL.squish
+ INSERT INTO service_usage_events (
+ guid, created_at, state,
+ service_instance_guid, service_instance_name, service_instance_type,
+ service_plan_guid, service_plan_name,
+ service_guid, service_label,
+ service_broker_name, service_broker_guid,
+ space_guid, space_name, org_guid
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP, '#{WAS_RUNNING}',
+ service_instances.guid, service_instances.name,
+ CASE WHEN service_instances.is_gateway_service THEN 'managed_service_instance' ELSE 'user_provided_service_instance' END,
+ service_plans.guid, service_plans.name,
+ services.guid, services.label,
+ service_brokers.name, service_brokers.guid,
+ spaces.guid, spaces.name, organizations.guid
+ FROM service_instances
+ INNER JOIN spaces ON spaces.id = service_instances.space_id
+ INNER JOIN organizations ON organizations.id = spaces.organization_id
+ LEFT OUTER JOIN service_plans ON service_plans.id = service_instances.service_plan_id
+ LEFT OUTER JOIN services ON services.id = service_plans.service_id
+ LEFT OUTER JOIN service_brokers ON service_brokers.id = services.service_broker_id
+ WHERE service_instances.id > #{low} AND service_instances.id <= #{high}
+ AND NOT EXISTS (
+ SELECT 1 FROM service_usage_events
+ WHERE state IN ('#{WAS_RUNNING}', '#{CREATED}', '#{UPDATED}') AND service_instance_guid = service_instances.guid
+ )
+ SQL
+ end
+
+ # Mirrors AppUsageEventRepository#create_from_task: task events carry an
+ # empty app_guid/app_name and are keyed by task_guid instead. The COALESCE
+ # on memory_in_mb is defensive -- it is a nullable legacy column whose
+ # default is normally filled in by the model layer.
+ #
+ # previous_state is NULL for the same reason as the app baseline (see
+ # app_usage_events_insert_sql): TASK_WAS_RUNNING is made up, so writing
+ # RUNNING here would claim a RUNNING->TASK_WAS_RUNNING transition happened.
+ # It didn't.
+ def task_usage_events_insert_sql(uuid_fn, low, high)
+ <<~SQL.squish
+ INSERT INTO app_usage_events (
+ guid, created_at,
+ state, previous_state,
+ instance_count, previous_instance_count,
+ memory_in_mb_per_instance, previous_memory_in_mb_per_instance,
+ app_guid, app_name,
+ parent_app_guid, parent_app_name,
+ space_guid, space_name, org_guid,
+ package_state, previous_package_state,
+ task_guid, task_name
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP,
+ '#{TASK_WAS_RUNNING}', NULL,
+ 1, 1,
+ COALESCE(t.memory_in_mb, 0), COALESCE(t.memory_in_mb, 0),
+ '', '',
+ parent_app.guid, COALESCE(parent_app.name, ''),
+ spaces.guid, spaces.name, organizations.guid,
+ 'STAGED', 'STAGED',
+ t.guid, t.name
+ FROM tasks t
+ INNER JOIN apps parent_app ON parent_app.guid = t.app_guid
+ INNER JOIN spaces ON spaces.guid = parent_app.space_guid
+ INNER JOIN organizations ON organizations.id = spaces.organization_id
+ WHERE t.id > #{low} AND t.id <= #{high}
+ AND t.state IN (#{running_task_states_sql})
+ AND NOT EXISTS (
+ SELECT 1 FROM app_usage_events WHERE state IN ('#{TASK_WAS_RUNNING}', '#{TASK_STARTED}') AND task_guid = t.guid
+ )
+ SQL
+ end
+
+ # The API stays live while the backfill runs, so a seed batch can race a
+ # concurrent stop or delete: the batch's snapshot still sees the resource
+ # as running and writes a WAS_RUNNING row for something that is already
+ # gone -- or whose stop event landed earlier in the table, with a lower id.
+ # Left alone, the keep-running cleanup would keep that row forever and
+ # consumers would bill a dead resource as still running.
+ #
+ # Deleting such rows would not fix this. Consumers read these tables
+ # forward, by id, and keep what they read: a poller may already have the
+ # baseline, and for tasks a TASK_STOPPED may already have been written
+ # against it. You can delete a row; you cannot make a consumer un-read it.
+ # And deleting it would leave any stop event that points at it dangling.
+ #
+ # So instead of deleting, repair: for every baseline whose resource is no
+ # longer running and that has no later ending event, add the missing ending
+ # event. It is built from the baseline row itself, which carries every NOT
+ # NULL column an ending needs -- necessary, because the resource row may be
+ # gone entirely (for stale service baselines it always is). Once an ending
+ # is added, the baseline no longer counts as unpaired, so running the
+ # repair again changes nothing.
+ #
+ # Two properties of the added ending, both deliberate. Its created_at is
+ # the repair time, not the true stop time, so a consumer may overbill by
+ # that gap -- a bounded error that ends, which beats a missing ending
+ # billed forever. And its previous_state is the baseline's state, which no
+ # normal ending ever carries, so repaired endings are easy to tell apart.
+ def repair_stale_app_usage_events(db, logger, batch_size)
+ uuid_fn = uuid_function(db)
+ stale = db[:app_usage_events].where(state: WAS_RUNNING).where(Sequel.lit(stale_app_baseline_predicates('app_usage_events')))
+ repaired = batch_repair(db, stale, batch_size) { |ids| app_usage_events_repair_sql(uuid_fn, ids) }
+ logger.info("added #{repaired} STOPPED usage events to pair stale WAS_RUNNING baselines") if repaired.positive?
+ end
+
+ def repair_stale_service_usage_events(db, logger, batch_size)
+ uuid_fn = uuid_function(db)
+ stale = db[:service_usage_events].where(state: WAS_RUNNING).where(Sequel.lit(stale_service_baseline_predicates('service_usage_events')))
+ repaired = batch_repair(db, stale, batch_size) { |ids| service_usage_events_repair_sql(uuid_fn, ids) }
+ logger.info("added #{repaired} DELETED usage events to pair stale WAS_RUNNING baselines") if repaired.positive?
+ end
+
+ def repair_stale_task_usage_events(db, logger, batch_size)
+ uuid_fn = uuid_function(db)
+ stale = db[:app_usage_events].where(state: TASK_WAS_RUNNING).where(Sequel.lit(stale_task_baseline_predicates('app_usage_events')))
+ repaired = batch_repair(db, stale, batch_size) { |ids| task_usage_events_repair_sql(uuid_fn, ids) }
+ logger.info("added #{repaired} TASK_STOPPED usage events to pair stale TASK_WAS_RUNNING baselines") if repaired.positive?
+ end
+
+ # The test for a stale, unpaired baseline: the resource is not running (or
+ # is gone), AND no later ending event -- one with a higher id -- exists for
+ # it. The second half is what makes re-runs safe: a baseline whose resource
+ # stopped normally already has its ending event and is left alone.
+ # `qualifier` names the baseline row -- the table itself in the
+ # id-collecting SELECT, its alias in the INSERT..SELECT -- so both queries
+ # apply exactly the same test and cannot drift apart.
+ def stale_app_baseline_predicates(qualifier)
+ "NOT EXISTS (SELECT 1 FROM processes WHERE processes.guid = #{qualifier}.app_guid AND processes.state = 'STARTED') " \
+ "AND NOT EXISTS (SELECT 1 FROM app_usage_events endings WHERE endings.state = '#{STOPPED}' AND endings.app_guid = #{qualifier}.app_guid AND endings.id > #{qualifier}.id)"
+ end
+
+ def stale_service_baseline_predicates(qualifier)
+ "NOT EXISTS (SELECT 1 FROM service_instances WHERE service_instances.guid = #{qualifier}.service_instance_guid) " \
+ 'AND NOT EXISTS (SELECT 1 FROM service_usage_events endings ' \
+ "WHERE endings.state = '#{DELETED}' AND endings.service_instance_guid = #{qualifier}.service_instance_guid AND endings.id > #{qualifier}.id)"
+ end
+
+ def stale_task_baseline_predicates(qualifier)
+ "NOT EXISTS (SELECT 1 FROM tasks WHERE tasks.guid = #{qualifier}.task_guid AND tasks.state IN (#{running_task_states_sql})) " \
+ 'AND NOT EXISTS (SELECT 1 FROM app_usage_events endings ' \
+ "WHERE endings.state = '#{TASK_STOPPED}' AND endings.task_guid = #{qualifier}.task_guid AND endings.id > #{qualifier}.id)"
+ end
+
+ def running_task_states_sql
+ RUNNING_TASK_STATES.map { |state| "'#{state}'" }.join(', ')
+ end
+
+ def app_usage_events_repair_sql(uuid_fn, ids)
+ <<~SQL.squish
+ INSERT INTO app_usage_events (
+ guid, created_at,
+ state, previous_state,
+ instance_count, previous_instance_count,
+ memory_in_mb_per_instance, previous_memory_in_mb_per_instance,
+ app_guid, app_name,
+ parent_app_guid, parent_app_name,
+ process_type,
+ space_guid, space_name, org_guid,
+ buildpack_guid, buildpack_name,
+ package_state, previous_package_state
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP,
+ '#{STOPPED}', b.state,
+ b.instance_count, b.previous_instance_count,
+ b.memory_in_mb_per_instance, b.previous_memory_in_mb_per_instance,
+ b.app_guid, b.app_name,
+ b.parent_app_guid, b.parent_app_name,
+ b.process_type,
+ b.space_guid, b.space_name, b.org_guid,
+ b.buildpack_guid, b.buildpack_name,
+ b.package_state, b.previous_package_state
+ FROM app_usage_events b
+ WHERE b.id IN (#{ids.join(', ')})
+ AND b.state = '#{WAS_RUNNING}'
+ AND #{stale_app_baseline_predicates('b')}
+ SQL
+ end
+
+ def service_usage_events_repair_sql(uuid_fn, ids)
+ <<~SQL.squish
+ INSERT INTO service_usage_events (
+ guid, created_at, state,
+ service_instance_guid, service_instance_name, service_instance_type,
+ service_plan_guid, service_plan_name,
+ service_guid, service_label,
+ service_broker_name, service_broker_guid,
+ space_guid, space_name, org_guid
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP, '#{DELETED}',
+ b.service_instance_guid, b.service_instance_name, b.service_instance_type,
+ b.service_plan_guid, b.service_plan_name,
+ b.service_guid, b.service_label,
+ b.service_broker_name, b.service_broker_guid,
+ b.space_guid, b.space_name, b.org_guid
+ FROM service_usage_events b
+ WHERE b.id IN (#{ids.join(', ')})
+ AND b.state = '#{WAS_RUNNING}'
+ AND #{stale_service_baseline_predicates('b')}
+ SQL
+ end
+
+ def task_usage_events_repair_sql(uuid_fn, ids)
+ <<~SQL.squish
+ INSERT INTO app_usage_events (
+ guid, created_at,
+ state, previous_state,
+ instance_count, previous_instance_count,
+ memory_in_mb_per_instance, previous_memory_in_mb_per_instance,
+ app_guid, app_name,
+ parent_app_guid, parent_app_name,
+ space_guid, space_name, org_guid,
+ package_state, previous_package_state,
+ task_guid, task_name
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP,
+ '#{TASK_STOPPED}', b.state,
+ b.instance_count, b.previous_instance_count,
+ b.memory_in_mb_per_instance, b.previous_memory_in_mb_per_instance,
+ b.app_guid, b.app_name,
+ b.parent_app_guid, b.parent_app_name,
+ b.space_guid, b.space_name, b.org_guid,
+ b.package_state, b.previous_package_state,
+ b.task_guid, b.task_name
+ FROM app_usage_events b
+ WHERE b.id IN (#{ids.join(', ')})
+ AND b.state = '#{TASK_WAS_RUNNING}'
+ AND #{stale_task_baseline_predicates('b')}
+ SQL
+ end
+
+ # Walk a source dataset in id order, one batch at a time, yielding each
+ # batch's id bounds (exclusive low, inclusive high) inside its own
+ # transaction.
+ def each_batch(source, batch_size)
+ cursor = 0
+ loop do
+ high = source.where(Sequel.lit('id > ?', cursor)).order(:id).limit(batch_size).select(:id).max(:id)
+ break if high.nil?
+
+ # READ COMMITTED keeps MySQL's INSERT..SELECT from taking shared next-key
+ # locks on every scanned source row while the API serves traffic (safe:
+ # CF MySQL releases run with binlog_format=ROW). On Postgres it is the
+ # default isolation level anyway.
+ source.db.transaction(isolation: :committed) { yield(cursor, high) }
+ cursor = high
+ end
+ end
+
+ # Add the missing ending events in id-keyed batches, so no single statement
+ # (or the locks it holds) grows big enough to risk the migration statement
+ # timeout. The INSERT..SELECT re-checks the staleness test itself, so a
+ # resource that came back -- or got its real ending -- between collecting
+ # the ids and inserting is simply skipped. Each added ending removes its
+ # baseline from the stale set, so the loop always finishes; if a whole
+ # batch got invalidated in flight, stop rather than risk selecting the same
+ # ids forever (the next backfill run picks up whatever remains).
+ def batch_repair(db, stale_baselines, batch_size)
+ repaired = 0
+ loop do
+ ids = stale_baselines.limit(batch_size).select_map(:id)
+ break if ids.empty?
+
+ # READ COMMITTED for the same reason as the seed batches (see each_batch).
+ inserted = db.transaction(isolation: :committed) { db.execute_dui(yield(ids)) }
+ break if inserted.zero?
+
+ repaired += inserted
+ end
+ repaired
+ end
+
+ def uuid_function(db)
+ case db.database_type
+ when :postgres then 'get_uuid()'
+ when :mysql then 'UUID()'
+ else raise "unsupported database: #{db.database_type}"
+ end
+ end
+ end
+end
+# rubocop:enable Metrics/ModuleLength
diff --git a/lib/tasks/db.rake b/lib/tasks/db.rake
index eeee7cfe20a..8521e2d52da 100644
--- a/lib/tasks/db.rake
+++ b/lib/tasks/db.rake
@@ -213,6 +213,29 @@ namespace :db do
end
end
+ desc 'Seed WAS_RUNNING usage events for running apps, running tasks, and existing service instances, ' \
+ 'and add the missing ending events for baselines whose resource is no longer running. Run it: after ' \
+ 'the seed migrations were skipped via skip_was_running_backfill; once after the deploy that ships ' \
+ 'the seed migrations, to repair anything that slipped through while old API servers were still ' \
+ 'running; or after a destructive usage-event purge, which wipes the task start events that task ' \
+ 'stop events depend on'
+ task :was_running_backfill, %i[batch_size] => :environment do |_t, args|
+ args.with_defaults(batch_size: 1000)
+
+ RakeConfig.context = :migrate
+
+ require 'database/was_running_backfill'
+ logging_output
+ logger = Steno.logger('cc.db.was_running_backfill')
+ RakeConfig.config.load_db_encryption_key
+ db = VCAP::CloudController::DB.connect(RakeConfig.config.get(:db), logger)
+ VCAP::WasRunningBackfill.with_advisory_lock(db) do
+ VCAP::WasRunningBackfill.seed_app_usage_events(db, logger, batch_size: args.batch_size.to_i)
+ VCAP::WasRunningBackfill.seed_task_usage_events(db, logger, batch_size: args.batch_size.to_i)
+ VCAP::WasRunningBackfill.seed_service_usage_events(db, logger, batch_size: args.batch_size.to_i)
+ end
+ end
+
namespace :dev do
desc 'Migrate the database set in spec/support/bootstrap/db_config'
task migrate: :environment do
diff --git a/spec/api/documentation/app_usage_event_api_spec.rb b/spec/api/documentation/app_usage_event_api_spec.rb
index 07ae25f2658..eb331049838 100644
--- a/spec/api/documentation/app_usage_event_api_spec.rb
+++ b/spec/api/documentation/app_usage_event_api_spec.rb
@@ -47,7 +47,7 @@
"The desired state of the app or 'BUILDPACK_SET' when buildpack info has been set.",
required: false,
readonly: true,
- valid_values: %w[STARTED STOPPED BUILDPACK_SET TASK_STARTED TASK_STOPPED]
+ valid_values: %w[STARTED STOPPED WAS_RUNNING BUILDPACK_SET TASK_STARTED TASK_STOPPED TASK_WAS_RUNNING]
field :task_guid, 'The GUID of the task if one exists.', required: false, readonly: true, experimental: true
field :task_name, 'The NAME of the task if one exists.', required: false, readonly: true, experimental: true
diff --git a/spec/api/documentation/service_usage_events_api_spec.rb b/spec/api/documentation/service_usage_events_api_spec.rb
index c8526942f8b..160bb39c9a9 100644
--- a/spec/api/documentation/service_usage_events_api_spec.rb
+++ b/spec/api/documentation/service_usage_events_api_spec.rb
@@ -13,7 +13,7 @@
get '/v2/service_usage_events' do
field :guid, 'The guid of the event.', required: false
- field :state, 'The desired state of the service.', required: false, readonly: true, valid_values: %w[CREATED DELETED UPDATED]
+ field :state, 'The desired state of the service.', required: false, readonly: true, valid_values: %w[CREATED DELETED UPDATED WAS_RUNNING]
field :org_guid, 'The GUID of the organization.', required: false, readonly: true
field :space_guid, 'The GUID of the space.', required: false, readonly: true
field :space_name, 'The name of the space.', required: false, readonly: true
diff --git a/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb b/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb
new file mode 100644
index 00000000000..ba98ca050c7
--- /dev/null
+++ b/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb
@@ -0,0 +1,44 @@
+require 'spec_helper'
+require 'migrations/helpers/migration_shared_context'
+
+RSpec.describe 'migration to add the lifecycle index to the usage event tables', isolation: :truncation, type: :migration do
+ include_context 'migration' do
+ let(:migration_filename) { '20260601120000_add_lifecycle_index_to_usage_events.rb' }
+ end
+
+ let(:run_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true)
+ end
+
+ let(:revert_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true)
+ end
+
+ it 'adds the lifecycle index to both usage event tables (safe to run twice) and removes it on revert' do
+ # Before migration: the lifecycle indexes should not exist.
+ expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index)
+
+ # Up migration adds both indexes with the expected column order.
+ expect { run_migration }.not_to raise_error
+ expect(db.indexes(:app_usage_events)).to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).to include(:service_usage_events_lifecycle_index)
+ expect(db.indexes(:app_usage_events)[:app_usage_events_lifecycle_index][:columns]).to eq(%i[state app_guid id])
+ expect(db.indexes(:service_usage_events)[:service_usage_events_lifecycle_index][:columns]).to eq(%i[state service_instance_guid id])
+
+ # Running the up migration again does not fail.
+ expect { run_migration }.not_to raise_error
+ expect(db.indexes(:app_usage_events)).to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).to include(:service_usage_events_lifecycle_index)
+
+ # Down migration removes both indexes.
+ expect { revert_migration }.not_to raise_error
+ expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index)
+
+ # Running the down migration again does not fail.
+ expect { revert_migration }.not_to raise_error
+ expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index)
+ end
+end
diff --git a/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb b/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb
new file mode 100644
index 00000000000..5e3faa468bc
--- /dev/null
+++ b/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb
@@ -0,0 +1,148 @@
+require 'spec_helper'
+require 'migrations/helpers/migration_shared_context'
+
+RSpec.describe 'migration to seed WAS_RUNNING events for currently-running app processes', isolation: :truncation, type: :migration do
+ include_context 'migration' do
+ let(:migration_filename) { '20260601120100_seed_was_running_app_usage_events.rb' }
+ end
+
+ let(:run_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true)
+ end
+
+ let(:revert_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true)
+ end
+
+ # Builds an org/space scaffold and returns the space guid that apps can reference.
+ def seed_space(suffix)
+ quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true,
+ total_services: 10, memory_limit: 1024, total_routes: 10)
+ org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id)
+ db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id)
+ "space-#{suffix}"
+ end
+
+ def seed_app_event(suffix, state:, app_guid:)
+ db[:app_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state,
+ instance_count: 1, memory_in_mb_per_instance: 1, app_guid: app_guid, app_name: "app-#{suffix}",
+ space_guid: "space-#{suffix}", space_name: "space-#{suffix}", org_guid: "org-#{suffix}")
+ end
+
+ describe 'up migration' do
+ it 'seeds WAS_RUNNING rows only for started processes, skips stopped ones, and preserves existing rows' do
+ space_guid = seed_space('main')
+
+ # A running process with no droplet -> package_state PENDING
+ db[:apps].insert(guid: 'app-pending', name: 'pending-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-pending', app_guid: 'app-pending', state: 'STARTED', instances: 3, memory: 512, type: 'web')
+
+ # A running process whose app points at a STAGED droplet -> package_state STAGED.
+ # Insert order avoids the apps.droplet_guid <-> droplets <-> packages.app_guid FK cycle.
+ db[:apps].insert(guid: 'app-staged', name: 'staged-app', space_guid: space_guid)
+ db[:packages].insert(guid: 'pkg-staged', app_guid: 'app-staged', state: 'READY')
+ db[:droplets].insert(guid: 'drop-staged', package_guid: 'pkg-staged', state: 'STAGED')
+ db[:apps].where(guid: 'app-staged').update(droplet_guid: 'drop-staged')
+ db[:processes].insert(guid: 'proc-staged', app_guid: 'app-staged', state: 'STARTED', instances: 1, memory: 256, type: 'web')
+
+ # A running process whose latest droplet FAILED -> package_state FAILED
+ db[:apps].insert(guid: 'app-faildrop', name: 'faildrop-app', space_guid: space_guid)
+ db[:packages].insert(guid: 'pkg-faildrop', app_guid: 'app-faildrop', state: 'READY')
+ db[:droplets].insert(guid: 'drop-faildrop', package_guid: 'pkg-faildrop', state: 'FAILED')
+ db[:apps].where(guid: 'app-faildrop').update(droplet_guid: 'drop-faildrop')
+ db[:processes].insert(guid: 'proc-faildrop', app_guid: 'app-faildrop', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+
+ # A running process whose latest package FAILED (and has no droplet) -> package_state FAILED
+ db[:apps].insert(guid: 'app-failpkg', name: 'failpkg-app', space_guid: space_guid)
+ db[:packages].insert(guid: 'pkg-failpkg', app_guid: 'app-failpkg', state: 'FAILED')
+ db[:processes].insert(guid: 'proc-failpkg', app_guid: 'app-failpkg', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+
+ # A stopped process -> no WAS_RUNNING row
+ db[:apps].insert(guid: 'app-stopped', name: 'stopped-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-stopped', app_guid: 'app-stopped', state: 'STOPPED', instances: 1, memory: 128, type: 'web')
+
+ # A running process that already has a WAS_RUNNING row -> not duplicated
+ db[:apps].insert(guid: 'app-existing', name: 'existing-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-existing', app_guid: 'app-existing', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+ seed_app_event('existing', state: 'WAS_RUNNING', app_guid: 'proc-existing')
+
+ # A running process that still has its real STARTED event -> no baseline.
+ # A consumer already tracks it; a second start on record would make it
+ # get billed twice.
+ db[:apps].insert(guid: 'app-started-on-record', name: 'started-on-record-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-started-on-record', app_guid: 'app-started-on-record', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+ seed_app_event('started-on-record', state: 'STARTED', app_guid: 'proc-started-on-record')
+
+ # An unrelated pre-existing row that must be preserved (no truncate)
+ preexisting_id = seed_app_event('unrelated', state: 'STARTED', app_guid: 'some-other-guid')
+
+ run_migration
+
+ was_running = db[:app_usage_events].where(state: 'WAS_RUNNING')
+ # One row each for proc-pending, proc-staged, proc-faildrop and proc-failpkg, plus the
+ # pre-seeded proc-existing row (not duplicated).
+ expect(was_running.count).to eq(5)
+ expect(was_running.where(app_guid: 'proc-stopped').count).to eq(0)
+ expect(was_running.where(app_guid: 'proc-existing').count).to eq(1)
+ expect(was_running.where(app_guid: 'proc-started-on-record').count).to eq(0)
+ expect(db[:app_usage_events].where(id: preexisting_id).count).to eq(1)
+
+ pending_row = was_running.where(app_guid: 'proc-pending').first
+ expect(pending_row[:guid]).to be_present
+ expect(pending_row[:previous_state]).to be_nil
+ expect(pending_row[:app_name]).to eq('pending-app')
+ expect(pending_row[:parent_app_guid]).to eq('app-pending')
+ expect(pending_row[:parent_app_name]).to eq('pending-app')
+ expect(pending_row[:process_type]).to eq('web')
+ expect(pending_row[:space_guid]).to eq(space_guid)
+ expect(pending_row[:space_name]).to eq('space-main')
+ expect(pending_row[:org_guid]).to eq('org-main')
+ expect(pending_row[:instance_count]).to eq(3)
+ expect(pending_row[:previous_instance_count]).to eq(3)
+ expect(pending_row[:memory_in_mb_per_instance]).to eq(512)
+ expect(pending_row[:previous_memory_in_mb_per_instance]).to eq(512)
+ expect(pending_row[:package_state]).to eq('PENDING')
+ expect(pending_row[:previous_package_state]).to eq('UNKNOWN')
+
+ expect(was_running.where(app_guid: 'proc-staged').first[:package_state]).to eq('STAGED')
+ expect(was_running.where(app_guid: 'proc-faildrop').first[:package_state]).to eq('FAILED')
+ expect(was_running.where(app_guid: 'proc-failpkg').first[:package_state]).to eq('FAILED')
+
+ # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in
+ # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_app_usage_events
+ # runs twice; the migrator does not re-apply an already-recorded migration.
+ end
+
+ context 'when skip_was_running_backfill is set' do
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true)
+ end
+
+ it 'does not seed any WAS_RUNNING rows' do
+ space_guid = seed_space('main')
+ db[:apps].insert(guid: 'app-skip', name: 'skip-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-skip', app_guid: 'app-skip', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+
+ run_migration
+
+ expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(0)
+ end
+ end
+ end
+
+ describe 'down migration' do
+ it 'keeps the WAS_RUNNING rows: consumers may already have read them' do
+ space_guid = seed_space('main')
+ db[:apps].insert(guid: 'app-down', name: 'down-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-down', app_guid: 'app-down', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+ unrelated_id = seed_app_event('unrelated', state: 'STARTED', app_guid: 'some-other-guid')
+
+ run_migration
+ expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(1)
+
+ revert_migration
+ expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(1)
+ expect(db[:app_usage_events].where(id: unrelated_id).count).to eq(1)
+ end
+ end
+end
diff --git a/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb b/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb
new file mode 100644
index 00000000000..c4762080c2e
--- /dev/null
+++ b/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb
@@ -0,0 +1,131 @@
+require 'spec_helper'
+require 'migrations/helpers/migration_shared_context'
+
+RSpec.describe 'migration to seed WAS_RUNNING events for existing service instances', isolation: :truncation, type: :migration do
+ include_context 'migration' do
+ let(:migration_filename) { '20260601120200_seed_was_running_service_usage_events.rb' }
+ end
+
+ let(:run_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true)
+ end
+
+ let(:revert_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true)
+ end
+
+ # Builds an org/space scaffold and returns the space id that instances can reference.
+ def seed_space(suffix)
+ quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true,
+ total_services: 10, memory_limit: 1024, total_routes: 10)
+ org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id)
+ db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id)
+ end
+
+ # Builds a broker -> service -> plan chain and returns the plan id.
+ def seed_plan(suffix)
+ broker_id = db[:service_brokers].insert(guid: "broker-#{suffix}", name: "broker-#{suffix}", broker_url: 'http://example.com', auth_password: 'pw')
+ service_id = db[:services].insert(guid: "service-#{suffix}", label: "service-#{suffix}", description: 'desc', bindable: true, service_broker_id: broker_id)
+ db[:service_plans].insert(guid: "plan-#{suffix}", name: "plan-#{suffix}", description: 'desc', free: true, service_id: service_id, unique_id: "plan-unique-#{suffix}")
+ end
+
+ def seed_service_event(suffix, state:, service_instance_guid:)
+ db[:service_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state,
+ org_guid: 'org-main', space_guid: 'space-main', space_name: 'space-main',
+ service_instance_guid: service_instance_guid, service_instance_name: "instance-#{suffix}",
+ service_instance_type: 'managed_service_instance')
+ end
+
+ describe 'up migration' do
+ it 'seeds one WAS_RUNNING row per instance with the correct type and preserves existing rows' do
+ space_id = seed_space('main')
+ plan_id = seed_plan('main')
+
+ # A managed instance -> managed_service_instance type with full broker chain.
+ db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+ # A user-provided instance -> user_provided_service_instance type with NULL plan/service/broker.
+ db[:service_instances].insert(guid: 'upsi-guid', name: 'upsi', space_id: space_id, is_gateway_service: false)
+
+ # A managed instance that already has a WAS_RUNNING row -> not duplicated.
+ db[:service_instances].insert(guid: 'existing-guid', name: 'existing', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+ seed_service_event('existing', state: 'WAS_RUNNING', service_instance_guid: 'existing-guid')
+
+ # An instance that still has its real CREATED event -> no baseline. A
+ # consumer already tracks it; a second start on record would make it get
+ # billed twice.
+ db[:service_instances].insert(guid: 'created-guid', name: 'created', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+ seed_service_event('created', state: 'CREATED', service_instance_guid: 'created-guid')
+
+ # An unrelated pre-existing row that must be preserved (no truncate).
+ preexisting_id = seed_service_event('unrelated', state: 'CREATED', service_instance_guid: 'some-other-instance')
+
+ run_migration
+
+ was_running = db[:service_usage_events].where(state: 'WAS_RUNNING')
+ # One row each for managed-guid and upsi-guid, plus the pre-seeded existing-guid row (not duplicated).
+ expect(was_running.count).to eq(3)
+ expect(was_running.where(service_instance_guid: 'existing-guid').count).to eq(1)
+ expect(was_running.where(service_instance_guid: 'created-guid').count).to eq(0)
+ expect(db[:service_usage_events].where(id: preexisting_id).count).to eq(1)
+
+ managed_row = was_running.where(service_instance_guid: 'managed-guid').first
+ expect(managed_row[:guid]).to be_present
+ expect(managed_row[:service_instance_name]).to eq('my-instance')
+ expect(managed_row[:service_instance_type]).to eq('managed_service_instance')
+ expect(managed_row[:service_plan_guid]).to eq('plan-main')
+ expect(managed_row[:service_plan_name]).to eq('plan-main')
+ expect(managed_row[:service_guid]).to eq('service-main')
+ expect(managed_row[:service_label]).to eq('service-main')
+ expect(managed_row[:service_broker_name]).to eq('broker-main')
+ expect(managed_row[:service_broker_guid]).to eq('broker-main')
+ expect(managed_row[:space_guid]).to eq('space-main')
+ expect(managed_row[:space_name]).to eq('space-main')
+ expect(managed_row[:org_guid]).to eq('org-main')
+
+ upsi_row = was_running.where(service_instance_guid: 'upsi-guid').first
+ expect(upsi_row[:service_instance_type]).to eq('user_provided_service_instance')
+ expect(upsi_row[:service_plan_guid]).to be_nil
+ expect(upsi_row[:service_plan_name]).to be_nil
+ expect(upsi_row[:service_guid]).to be_nil
+ expect(upsi_row[:service_label]).to be_nil
+ expect(upsi_row[:service_broker_name]).to be_nil
+ expect(upsi_row[:service_broker_guid]).to be_nil
+
+ # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in
+ # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_service_usage_events
+ # runs twice; the migrator does not re-apply an already-recorded migration.
+ end
+
+ context 'when skip_was_running_backfill is set' do
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true)
+ end
+
+ it 'does not seed any WAS_RUNNING rows' do
+ space_id = seed_space('main')
+ plan_id = seed_plan('main')
+ db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+
+ run_migration
+
+ expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(0)
+ end
+ end
+ end
+
+ describe 'down migration' do
+ it 'keeps the WAS_RUNNING rows: consumers may already have read them' do
+ space_id = seed_space('main')
+ plan_id = seed_plan('main')
+ db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+ unrelated_id = seed_service_event('unrelated', state: 'CREATED', service_instance_guid: 'some-other-instance')
+
+ run_migration
+ expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(1)
+
+ revert_migration
+ expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(1)
+ expect(db[:service_usage_events].where(id: unrelated_id).count).to eq(1)
+ end
+ end
+end
diff --git a/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb b/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb
new file mode 100644
index 00000000000..a0c491f00ce
--- /dev/null
+++ b/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb
@@ -0,0 +1,130 @@
+require 'spec_helper'
+require 'migrations/helpers/migration_shared_context'
+
+RSpec.describe 'migration to seed TASK_WAS_RUNNING events for currently-running tasks', isolation: :truncation, type: :migration do
+ include_context 'migration' do
+ let(:migration_filename) { '20260601120300_seed_was_running_task_usage_events.rb' }
+ end
+
+ let(:run_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true)
+ end
+
+ let(:revert_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true)
+ end
+
+ # Builds an org/space/app scaffold and returns the app guid that tasks can reference.
+ def seed_app(suffix)
+ quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true,
+ total_services: 10, memory_limit: 1024, total_routes: 10)
+ org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id)
+ db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id)
+ db[:apps].insert(guid: "app-#{suffix}", name: "app-#{suffix}", space_guid: "space-#{suffix}")
+ "app-#{suffix}"
+ end
+
+ def seed_task(suffix, state:, app_guid:, memory_in_mb: 256)
+ db[:tasks].insert(guid: "task-#{suffix}", name: "task-#{suffix}", command: 'bundle exec rake', state: state,
+ app_guid: app_guid, droplet_guid: "droplet-#{suffix}", memory_in_mb: memory_in_mb)
+ end
+
+ def seed_task_event(suffix, state:, task_guid:)
+ db[:app_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state,
+ instance_count: 1, memory_in_mb_per_instance: 1, app_guid: '', app_name: '',
+ space_guid: "space-#{suffix}", space_name: "space-#{suffix}", org_guid: "org-#{suffix}",
+ task_guid: task_guid, task_name: "task-#{suffix}")
+ end
+
+ describe 'up migration' do
+ it 'seeds TASK_WAS_RUNNING rows only for running tasks, skips finished ones, and preserves existing rows' do
+ app_guid = seed_app('main')
+
+ seed_task('running', state: 'RUNNING', app_guid: app_guid, memory_in_mb: 512)
+ seed_task('succeeded', state: 'SUCCEEDED', app_guid: app_guid)
+ seed_task('pending', state: 'PENDING', app_guid: app_guid)
+
+ # A CANCELING task is still running (and billable) until Diego reports it
+ # dead -> it gets a baseline like a RUNNING one.
+ seed_task('canceling', state: 'CANCELING', app_guid: app_guid)
+
+ # A running task that already has a TASK_WAS_RUNNING row -> not duplicated
+ seed_task('existing', state: 'RUNNING', app_guid: app_guid)
+ seed_task_event('existing', state: 'TASK_WAS_RUNNING', task_guid: 'task-existing')
+
+ # A running task that still has its real TASK_STARTED event -> no
+ # baseline. A consumer already tracks it; a second start on record would
+ # make it get billed twice.
+ seed_task('started-on-record', state: 'RUNNING', app_guid: app_guid)
+ seed_task_event('started-on-record', state: 'TASK_STARTED', task_guid: 'task-started-on-record')
+
+ # An unrelated pre-existing row that must be preserved (no truncate)
+ preexisting_id = seed_task_event('unrelated', state: 'TASK_STARTED', task_guid: 'some-other-task')
+
+ run_migration
+
+ task_was_running = db[:app_usage_events].where(state: 'TASK_WAS_RUNNING')
+ # One row each for task-running and task-canceling, plus the pre-seeded
+ # task-existing row (not duplicated).
+ expect(task_was_running.count).to eq(3)
+ expect(task_was_running.where(task_guid: 'task-canceling').count).to eq(1)
+ expect(task_was_running.where(task_guid: 'task-succeeded').count).to eq(0)
+ expect(task_was_running.where(task_guid: 'task-pending').count).to eq(0)
+ expect(task_was_running.where(task_guid: 'task-existing').count).to eq(1)
+ expect(task_was_running.where(task_guid: 'task-started-on-record').count).to eq(0)
+ expect(db[:app_usage_events].where(id: preexisting_id).count).to eq(1)
+
+ row = task_was_running.where(task_guid: 'task-running').first
+ expect(row[:guid]).to be_present
+ expect(row[:previous_state]).to be_nil
+ expect(row[:task_name]).to eq('task-running')
+ expect(row[:app_guid]).to eq('')
+ expect(row[:app_name]).to eq('')
+ expect(row[:parent_app_guid]).to eq(app_guid)
+ expect(row[:parent_app_name]).to eq('app-main')
+ expect(row[:space_guid]).to eq('space-main')
+ expect(row[:space_name]).to eq('space-main')
+ expect(row[:org_guid]).to eq('org-main')
+ expect(row[:instance_count]).to eq(1)
+ expect(row[:previous_instance_count]).to eq(1)
+ expect(row[:memory_in_mb_per_instance]).to eq(512)
+ expect(row[:previous_memory_in_mb_per_instance]).to eq(512)
+ expect(row[:package_state]).to eq('STAGED')
+ expect(row[:previous_package_state]).to eq('STAGED')
+
+ # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in
+ # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_task_usage_events
+ # runs twice; the migrator does not re-apply an already-recorded migration.
+ end
+
+ context 'when skip_was_running_backfill is set' do
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true)
+ end
+
+ it 'does not seed any TASK_WAS_RUNNING rows' do
+ app_guid = seed_app('main')
+ seed_task('skip', state: 'RUNNING', app_guid: app_guid)
+
+ run_migration
+
+ expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(0)
+ end
+ end
+ end
+
+ describe 'down migration' do
+ it 'keeps the TASK_WAS_RUNNING rows: consumers may already have read them, and stop events depend on them' do
+ app_guid = seed_app('main')
+ seed_task('down', state: 'RUNNING', app_guid: app_guid)
+ unrelated_id = seed_task_event('unrelated', state: 'TASK_STARTED', task_guid: 'some-other-task')
+
+ run_migration
+ expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(1)
+
+ revert_migration
+ expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(1)
+ expect(db[:app_usage_events].where(id: unrelated_id).count).to eq(1)
+ end
+ end
+end
diff --git a/spec/migrations/helpers/bigint_migration_step1_shared_context.rb b/spec/migrations/helpers/bigint_migration_step1_shared_context.rb
index 81b48f484a7..68416a1318a 100644
--- a/spec/migrations/helpers/bigint_migration_step1_shared_context.rb
+++ b/spec/migrations/helpers/bigint_migration_step1_shared_context.rb
@@ -10,6 +10,10 @@
let(:skip_bigint_id_migration) { nil }
before do
+ # The migration spec harness replays ALL migrations in an after-hook, including other
+ # features' (e.g. the WAS_RUNNING backfill, which reads config in skip?). This default
+ # lets those config reads pass through instead of tripping the strict matchers below.
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration)
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300)
end
diff --git a/spec/migrations/helpers/bigint_migration_step3_shared_context.rb b/spec/migrations/helpers/bigint_migration_step3_shared_context.rb
index 2378c849909..1eff100eeb0 100644
--- a/spec/migrations/helpers/bigint_migration_step3_shared_context.rb
+++ b/spec/migrations/helpers/bigint_migration_step3_shared_context.rb
@@ -14,6 +14,10 @@
let(:logger) { double(:logger, info: nil) }
before do
+ # The migration spec harness replays ALL migrations in an after-hook, including other
+ # features' (e.g. the WAS_RUNNING backfill, which reads config in skip?). This default
+ # lets those config reads pass through instead of tripping the strict matchers below.
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration)
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300)
end
@@ -119,6 +123,10 @@
let(:logger) { double(:logger, info: nil) }
before do
+ # The migration spec harness replays ALL migrations in an after-hook, including other
+ # features' (e.g. the WAS_RUNNING backfill, which reads config in skip?). This default
+ # lets those config reads pass through instead of tripping the strict matchers below.
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration)
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300)
end
diff --git a/spec/unit/actions/task_delete_spec.rb b/spec/unit/actions/task_delete_spec.rb
index 320a71d6730..f61353078e4 100644
--- a/spec/unit/actions/task_delete_spec.rb
+++ b/spec/unit/actions/task_delete_spec.rb
@@ -61,17 +61,23 @@ module VCAP::CloudController
expect(events[0].metadata['task_guid']).to eq(task4.guid)
end
- it 'creates a usage event for non-terminal tasks' do
+ it 'creates a usage event for non-terminal tasks with recorded start evidence' do
+ # task4 and task5 wrote TASK_STARTED when they moved to RUNNING. task3
+ # is still PENDING: no consumer ever saw it start, so destroying it
+ # must not write a TASK_STOPPED that nothing can pair with a start.
+ create(:app_usage_event, task_guid: task4.guid, state: 'TASK_STARTED')
+ create(:app_usage_event, task_guid: task5.guid, state: 'TASK_STARTED')
+
task_delete.delete_for_app(app.guid)
- events = AppUsageEvent.where(parent_app_guid: app.guid).all
- expect(events.size).to eq(3)
- task_guids = [task3.guid, task4.guid, task5.guid]
+ events = AppUsageEvent.where(parent_app_guid: app.guid, state: 'TASK_STOPPED').all
+ expect(events.size).to eq(2)
+ task_guids = [task4.guid, task5.guid]
events.each do |event|
- expect(event.state).to eq('TASK_STOPPED')
expect(task_guids.delete(event.task_guid)).not_to be_nil
end
expect(task_guids).to be_empty
+ expect(AppUsageEvent.where(task_guid: task3.guid, state: 'TASK_STOPPED')).to be_empty
end
end
end
diff --git a/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb b/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb
index ea7f99647e5..032fab56d78 100644
--- a/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb
+++ b/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb
@@ -5,7 +5,7 @@ module Jobs::Runtime
RSpec.describe AppUsageEventsCleanup, job_context: :worker do
let(:cutoff_age_in_days) { 30 }
let(:logger) { double(Steno::Logger, info: nil) }
- let!(:event_before_threshold) { create(:app_usage_event, created_at: (cutoff_age_in_days + 1).days.ago) }
+ let!(:event_before_threshold) { create(:app_usage_event, created_at: (cutoff_age_in_days + 1).days.ago, state: 'STOPPED') }
let!(:event_after_threshold) { create(:app_usage_event, created_at: (cutoff_age_in_days - 1).days.ago) }
subject(:job) do
diff --git a/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb b/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb
index 7b8025e0884..2ceca67b886 100644
--- a/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb
+++ b/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb
@@ -5,7 +5,7 @@ module Jobs::Services
RSpec.describe ServiceUsageEventsCleanup, job_context: :worker do
let(:cutoff_age_in_days) { 30 }
let(:logger) { double(Steno::Logger, info: nil) }
- let!(:event_before_threshold) { create(:service_usage_event, created_at: (cutoff_age_in_days + 1).days.ago) }
+ let!(:event_before_threshold) { create(:service_usage_event, created_at: (cutoff_age_in_days + 1).days.ago, state: 'DELETED') }
let!(:event_after_threshold) { create(:service_usage_event, created_at: (cutoff_age_in_days - 1).days.ago) }
subject(:job) do
diff --git a/spec/unit/lib/database/old_record_cleanup_spec.rb b/spec/unit/lib/database/old_record_cleanup_spec.rb
index 070b5abda81..4a222e6eaad 100644
--- a/spec/unit/lib/database/old_record_cleanup_spec.rb
+++ b/spec/unit/lib/database/old_record_cleanup_spec.rb
@@ -1,6 +1,148 @@
require 'spec_helper'
require 'database/old_record_cleanup'
+# Lifecycle-aware cleanup behavior that is identical for app and service usage
+# events. Expects the including context to define :model, :beginning_state,
+# :ending_state and :guid_column.
+RSpec.shared_examples 'usage event lifecycle cleanup' do
+ def make_event(state, guid, created_at:)
+ create(model.name.demodulize.underscore.to_sym, state: state, created_at: created_at, **{ guid_column => guid })
+ end
+
+ def run_cleanup(**opts)
+ Database::OldRecordCleanup.new(model, cutoff_age_in_days: 1, keep_running_records: true, **opts).delete
+ end
+
+ it 'keeps an old beginning record when there is no corresponding ending record' do
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago)
+
+ run_cleanup
+
+ expect(old_beginning.reload).to be_present
+ expect(model.count).to eq(1)
+ end
+
+ it 'keeps an old beginning record when the ending record is fresh' do
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago)
+ fresh_ending = make_event(ending_state, 'guid1', created_at: 1.day.ago + 1.minute)
+
+ run_cleanup
+
+ expect(old_beginning.reload).to be_present
+ expect(fresh_ending.reload).to be_present
+ end
+
+ it 'keeps an old beginning record when the ending record was inserted first' do
+ old_ending = make_event(ending_state, 'guid1', created_at: 3.days.ago)
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago)
+
+ run_cleanup
+
+ expect(old_beginning.reload).to be_present
+ expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'uses insertion order rather than created_at to pair beginnings with endings' do
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago)
+ # Earlier timestamp but higher id: the resource is no longer running.
+ old_ending = make_event(ending_state, 'guid1', created_at: 3.days.ago)
+
+ run_cleanup
+
+ expect { old_beginning.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'deletes all records of completed runs spanning multiple cycles' do
+ cycle1_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago)
+ cycle1_ending = make_event(ending_state, 'guid1', created_at: 9.days.ago)
+ cycle2_beginning = make_event(beginning_state, 'guid1', created_at: 8.days.ago)
+ cycle2_ending = make_event(ending_state, 'guid1', created_at: 7.days.ago)
+
+ run_cleanup
+
+ expect { cycle1_beginning.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { cycle1_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { cycle2_beginning.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { cycle2_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'deletes an old ending record that has no beginning record' do
+ orphan_ending = make_event(ending_state, 'guid1', created_at: 10.days.ago)
+
+ run_cleanup
+
+ expect { orphan_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keeps an old WAS_RUNNING record when there is no corresponding ending record' do
+ was_running = make_event('WAS_RUNNING', 'guid1', created_at: 2.days.ago)
+
+ run_cleanup
+
+ expect(was_running.reload).to be_present
+ expect(model.count).to eq(1)
+ end
+
+ it 'deletes an old WAS_RUNNING record when a later old ending record exists' do
+ was_running = make_event('WAS_RUNNING', 'guid1', created_at: 5.days.ago)
+ old_ending = make_event(ending_state, 'guid1', created_at: 4.days.ago)
+
+ run_cleanup
+
+ expect { was_running.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keeps both the first beginning and a later WAS_RUNNING record of a running resource' do
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago)
+ was_running = make_event('WAS_RUNNING', 'guid1', created_at: 5.days.ago)
+
+ run_cleanup
+
+ expect(old_beginning.reload).to be_present
+ expect(was_running.reload).to be_present
+ end
+
+ it 'prunes superseded baselines of a running resource, keeping the first beginning (true start) and the latest one (current footprint)' do
+ first = make_event(beginning_state, 'guid1', created_at: 5.days.ago)
+ middle = make_event(beginning_state, 'guid1', created_at: 4.days.ago)
+ latest = make_event(beginning_state, 'guid1', created_at: 3.days.ago)
+
+ run_cleanup
+
+ expect(first.reload).to be_present
+ expect { middle.reload }.to raise_error(Sequel::NoExistingObject)
+ expect(latest.reload).to be_present
+ end
+
+ it 'does not prune a superseded baseline until the beginning that supersedes it is itself old' do
+ first = make_event(beginning_state, 'guid1', created_at: 5.days.ago)
+ middle = make_event(beginning_state, 'guid1', created_at: 4.days.ago)
+ fresh_latest = make_event(beginning_state, 'guid1', created_at: 1.day.ago + 1.minute)
+
+ run_cleanup
+
+ expect(first.reload).to be_present
+ expect(middle.reload).to be_present
+ expect(fresh_latest.reload).to be_present
+ end
+
+ it 'treats the first beginning after an ended run as the true start, not as a superseded baseline' do
+ ended_run_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago)
+ ended_run_ending = make_event(ending_state, 'guid1', created_at: 9.days.ago)
+ current_run_first = make_event(beginning_state, 'guid1', created_at: 8.days.ago)
+ current_run_latest = make_event(beginning_state, 'guid1', created_at: 7.days.ago)
+
+ run_cleanup
+
+ expect { ended_run_beginning.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { ended_run_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ expect(current_run_first.reload).to be_present
+ expect(current_run_latest.reload).to be_present
+ end
+end
+
RSpec.describe Database::OldRecordCleanup do
describe '#delete' do
let!(:stale_event1) { create(:event, created_at: 1.day.ago - 1.minute) }
@@ -9,7 +151,7 @@
let!(:fresh_event) { create(:event, created_at: 1.day.ago + 1.minute) }
it 'deletes records older than specified days' do
- record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 1)
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1)
expect do
record_cleanup.delete
@@ -20,23 +162,14 @@
expect { stale_event2.reload }.to raise_error(Sequel::NoExistingObject)
end
- context "when there are no records at all but you're trying to keep at least one" do
- it "doesn't keep one because there aren't any to keep" do
- record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::ServiceUsageEvent, 1, keep_at_least_one_record: true)
-
- expect { record_cleanup.delete }.not_to raise_error
- expect(VCAP::CloudController::ServiceUsageEvent.count).to eq(0)
- end
- end
-
it 'only retrieves the current timestamp from the database once' do
expect(VCAP::CloudController::Event.db).to receive(:fetch).with('SELECT CURRENT_TIMESTAMP as now').once.and_call_original
- record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 1)
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1)
record_cleanup.delete
end
it 'keeps the last row when :keep_at_least_one_record is true even if it is older than the cutoff date' do
- record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 0, keep_at_least_one_record: true)
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 0, keep_at_least_one_record: true)
expect do
record_cleanup.delete
@@ -46,5 +179,236 @@
expect { stale_event1.reload }.to raise_error(Sequel::NoExistingObject)
expect { stale_event2.reload }.to raise_error(Sequel::NoExistingObject)
end
+
+ context "when there are no records at all but you're trying to keep at least one" do
+ it "doesn't keep one because there aren't any to keep" do
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::ServiceUsageEvent, cutoff_age_in_days: 1, keep_at_least_one_record: true)
+
+ expect { record_cleanup.delete }.not_to raise_error
+ expect(VCAP::CloudController::ServiceUsageEvent.count).to eq(0)
+ end
+ end
+
+ context 'when keep_running_records is requested for a model without usage lifecycles' do
+ it 'raises rather than silently deleting the records of running resources' do
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1, keep_running_records: true)
+
+ expect { record_cleanup.delete }.to raise_error(ArgumentError, /usage_lifecycles/)
+ end
+ end
+
+ describe 'keeping running AppUsageEvent records' do
+ let(:model) { VCAP::CloudController::AppUsageEvent }
+ let(:beginning_state) { 'STARTED' }
+ let(:ending_state) { 'STOPPED' }
+ let(:guid_column) { :app_guid }
+
+ include_examples 'usage event lifecycle cleanup'
+
+ describe 'task lifecycle' do
+ it 'keeps the TASK_STARTED record of a still-running task' do
+ task_started = create(model.name.demodulize.underscore.to_sym, created_at: 2.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1')
+
+ run_cleanup
+
+ expect(task_started.reload).to be_present
+ end
+
+ it 'deletes the TASK_STARTED and TASK_STOPPED records of a completed task' do
+ task_started = create(model.name.demodulize.underscore.to_sym, created_at: 5.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1')
+ task_stopped = create(model.name.demodulize.underscore.to_sym, created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: 'app1')
+
+ run_cleanup
+
+ expect { task_started.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { task_stopped.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keeps the TASK_STARTED record when the TASK_STOPPED record is fresh' do
+ task_started = create(model.name.demodulize.underscore.to_sym, created_at: 2.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1')
+ fresh_task_stopped = create(model.name.demodulize.underscore.to_sym, created_at: 1.day.ago + 1.minute, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: 'app1')
+
+ run_cleanup
+
+ expect(task_started.reload).to be_present
+ expect(fresh_task_stopped.reload).to be_present
+ end
+
+ it 'correlates task records by task_guid, not by app_guid' do
+ running_task_started = create(model.name.demodulize.underscore.to_sym, created_at: 5.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1')
+ other_task_stopped = create(model.name.demodulize.underscore.to_sym, created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task2', app_guid: 'app1')
+
+ run_cleanup
+
+ expect(running_task_started.reload).to be_present
+ expect { other_task_stopped.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keeps the TASK_WAS_RUNNING baseline of a still-running task' do
+ baseline = create(model.name.demodulize.underscore.to_sym, created_at: 2.days.ago, state: 'TASK_WAS_RUNNING', task_guid: 'task1', app_guid: '')
+
+ run_cleanup
+
+ expect(baseline.reload).to be_present
+ end
+
+ it 'deletes the TASK_WAS_RUNNING baseline once a later TASK_STOPPED record is also old' do
+ baseline = create(model.name.demodulize.underscore.to_sym, created_at: 5.days.ago, state: 'TASK_WAS_RUNNING', task_guid: 'task1', app_guid: '')
+ task_stopped = create(model.name.demodulize.underscore.to_sym, created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: '')
+
+ run_cleanup
+
+ expect { baseline.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { task_stopped.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ # Task events carry an empty app_guid. If task baselines shared the
+ # WAS_RUNNING state, the app lifecycle (which matches events by
+ # app_guid) would see every task baseline as belonging to one app whose
+ # guid is '', and wrongly prune them as extra baselines of that one
+ # app. The separate TASK_WAS_RUNNING state keeps the app lifecycle from
+ # seeing them at all.
+ it 'does not prune the baselines of distinct running tasks as superseded baselines of one phantom app' do
+ baselines = %w[task1 task2 task3].each_with_index.map do |task_guid, i|
+ create(model.name.demodulize.underscore.to_sym, created_at: (5 - i).days.ago, state: 'TASK_WAS_RUNNING', task_guid: task_guid, app_guid: '')
+ end
+
+ run_cleanup
+
+ baselines.each { |baseline| expect(baseline.reload).to be_present }
+ end
+ end
+
+ it 'deletes records with non-lifecycle states' do
+ buildpack_event1 = create(model.name.demodulize.underscore.to_sym, created_at: 3.days.ago, state: 'BUILDPACK_SET', app_guid: 'app1')
+ buildpack_event2 = create(model.name.demodulize.underscore.to_sym, created_at: 2.days.ago, state: 'BUILDPACK_SET', app_guid: 'app2')
+
+ run_cleanup
+
+ expect { buildpack_event1.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { buildpack_event2.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'deletes old records with a corresponding stop record even if app_guid is an empty string' do
+ empty_guid_start = create(model.name.demodulize.underscore.to_sym, created_at: 5.days.ago, state: 'STARTED', app_guid: '')
+ different_empty_start = create(model.name.demodulize.underscore.to_sym, created_at: 4.days.ago, state: 'STARTED', app_guid: '')
+ empty_guid_stop = create(model.name.demodulize.underscore.to_sym, created_at: 3.days.ago, state: 'STOPPED', app_guid: '')
+
+ run_cleanup
+
+ # Both STARTs with an empty-string guid have a STOP with an empty-string guid after them.
+ expect { empty_guid_start.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { different_empty_start.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { empty_guid_stop.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'works when cutoff_age_in_days is 0' do
+ old_start = create(model.name.demodulize.underscore.to_sym, created_at: 1.second.ago, state: 'STARTED', app_guid: 'running-app')
+
+ Database::OldRecordCleanup.new(model, cutoff_age_in_days: 0, keep_running_records: true).delete
+
+ expect(old_start.reload).to be_present
+ end
+
+ it 'does not error if the table is empty' do
+ model.dataset.delete
+
+ expect { run_cleanup }.not_to raise_error
+ end
+
+ it 'deletes all old records when keep_running_records is false' do
+ old_start = create(model.name.demodulize.underscore.to_sym, created_at: 5.days.ago, state: 'STARTED', app_guid: 'app1')
+ old_stop = create(model.name.demodulize.underscore.to_sym, created_at: 4.days.ago, state: 'STOPPED', app_guid: 'app1')
+ old_running_start = create(model.name.demodulize.underscore.to_sym, created_at: 3.days.ago, state: 'STARTED', app_guid: 'running-app')
+
+ Database::OldRecordCleanup.new(model, cutoff_age_in_days: 1, keep_running_records: false).delete
+
+ expect { old_start.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { old_stop.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { old_running_start.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keep_at_least_one_record preserves the last record while still pruning its paired START' do
+ old_start = create(model.name.demodulize.underscore.to_sym, created_at: 10.days.ago, state: 'STARTED', app_guid: 'app1')
+ last_stop = create(model.name.demodulize.underscore.to_sym, created_at: 9.days.ago, state: 'STOPPED', app_guid: 'app1')
+
+ run_cleanup(keep_at_least_one_record: true)
+
+ expect { old_start.reload }.to raise_error(Sequel::NoExistingObject) # paired with STOP, prunable
+ expect(last_stop.reload).to be_present # kept by keep_at_least_one_record
+ end
+
+ it 'prunes a paired set larger than the delete batch size while keeping running records' do
+ old = 5.days.ago
+
+ # 1,500 completed START/STOP pairs (3,000 rows) -> well above the 1,000-row
+ # batch size, so the delete spans multiple batches across the two passes.
+ paired_rows = []
+ 1_500.times do |i|
+ guid = "paired-#{i}"
+ common = { app_name: guid, space_guid: 'sp', space_name: 'sp', org_guid: 'o',
+ instance_count: 1, memory_in_mb_per_instance: 1, created_at: old }
+ paired_rows << common.merge(guid: "start-#{i}", state: 'STARTED', app_guid: guid)
+ paired_rows << common.merge(guid: "stop-#{i}", state: 'STOPPED', app_guid: guid)
+ end
+ model.dataset.multi_insert(paired_rows)
+
+ # Still-running apps (START with no later STOP) that must survive cleanup.
+ running = Array.new(50) do |i|
+ create(model.name.demodulize.underscore.to_sym, created_at: old, state: 'STARTED', app_guid: "running-#{i}")
+ end
+
+ run_cleanup
+
+ # Every completed pair is gone; only the running records remain.
+ running.each { |event| expect(event.reload).to be_present }
+ expect(model.count).to eq(running.size)
+ end
+ end
+
+ describe 'keeping running ServiceUsageEvent records' do
+ let(:model) { VCAP::CloudController::ServiceUsageEvent }
+ let(:beginning_state) { 'CREATED' }
+ let(:ending_state) { 'DELETED' }
+ let(:guid_column) { :service_instance_guid }
+
+ include_examples 'usage event lifecycle cleanup'
+
+ describe 'UPDATED records' do
+ it 'keeps the CREATED record and the latest UPDATED record while the service instance exists, pruning superseded UPDATED records' do
+ stale_created = create(model.name.demodulize.underscore.to_sym, created_at: 10.days.ago, state: 'CREATED', service_instance_guid: 'guid1')
+ superseded_update = create(model.name.demodulize.underscore.to_sym, created_at: 8.days.ago, state: 'UPDATED', service_instance_guid: 'guid1')
+ latest_update = create(model.name.demodulize.underscore.to_sym, created_at: 6.days.ago, state: 'UPDATED', service_instance_guid: 'guid1')
+
+ run_cleanup
+
+ expect(stale_created.reload).to be_present
+ expect { superseded_update.reload }.to raise_error(Sequel::NoExistingObject)
+ expect(latest_update.reload).to be_present
+ end
+
+ it 'keeps UPDATED records when the corresponding delete record is fresh' do
+ stale_updated = create(model.name.demodulize.underscore.to_sym, created_at: 2.days.ago, state: 'UPDATED', service_instance_guid: 'guid1')
+ fresh_delete = create(model.name.demodulize.underscore.to_sym, created_at: 1.day.ago + 1.minute, state: 'DELETED', service_instance_guid: 'guid1')
+
+ run_cleanup
+
+ expect(stale_updated.reload).to be_present
+ expect(fresh_delete.reload).to be_present
+ end
+
+ it 'deletes UPDATED records when there is a corresponding old delete record' do
+ stale_created = create(model.name.demodulize.underscore.to_sym, created_at: 10.days.ago, state: 'CREATED', service_instance_guid: 'guid1')
+ stale_updated = create(model.name.demodulize.underscore.to_sym, created_at: 8.days.ago, state: 'UPDATED', service_instance_guid: 'guid1')
+ stale_delete = create(model.name.demodulize.underscore.to_sym, created_at: 6.days.ago, state: 'DELETED', service_instance_guid: 'guid1')
+
+ run_cleanup
+
+ expect { stale_created.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { stale_updated.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { stale_delete.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+ end
+ end
end
end
diff --git a/spec/unit/lib/database/was_running_backfill_spec.rb b/spec/unit/lib/database/was_running_backfill_spec.rb
new file mode 100644
index 00000000000..66e41fe6f6f
--- /dev/null
+++ b/spec/unit/lib/database/was_running_backfill_spec.rb
@@ -0,0 +1,486 @@
+require 'spec_helper'
+require 'database/was_running_backfill'
+
+RSpec.describe VCAP::WasRunningBackfill do
+ let(:db) { Sequel::Model.db }
+ let(:logger) { double(Steno::Logger, info: nil) }
+
+ def was_running
+ db[:service_usage_events].where(state: 'WAS_RUNNING')
+ end
+
+ def app_was_running
+ db[:app_usage_events].where(state: 'WAS_RUNNING')
+ end
+
+ def task_was_running
+ db[:app_usage_events].where(state: 'TASK_WAS_RUNNING')
+ end
+
+ # The backfill exists for resources whose real start events were already
+ # pruned by the events cleanup. The test factories, though, write those
+ # events (a STARTED row when making a started process, a CREATED row when
+ # making a service instance), and the seed guards deliberately skip resources
+ # that have them. So tests that expect a resource to be seeded first delete
+ # its events, putting it in the same state as a resource pruned in the wild.
+ def prune_usage_events!
+ db[:app_usage_events].delete
+ db[:service_usage_events].delete
+ end
+
+ describe 'usage event state literals' do
+ # The backfill is raw SQL (no CC code), so it can't reference the model and
+ # repository constants directly. This guard catches any drift between the
+ # literals the backfill writes/probes and the state values the
+ # repositories/cleanup recognise.
+ it 'match the state values used by the models and repositories' do
+ expect(described_class::WAS_RUNNING).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::WAS_RUNNING_EVENT_STATE)
+ expect(described_class::WAS_RUNNING).to eq(VCAP::CloudController::Repositories::ServiceUsageEventRepository::WAS_RUNNING_EVENT_STATE)
+ expect(described_class::TASK_WAS_RUNNING).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE)
+ expect(described_class::STARTED).to eq(VCAP::CloudController::ProcessModel::STARTED)
+ expect(described_class::STOPPED).to eq(VCAP::CloudController::ProcessModel::STOPPED)
+ expect(described_class::TASK_STARTED).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::TASK_STARTED_EVENT_STATE)
+ expect(described_class::TASK_STOPPED).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::TASK_STOPPED_EVENT_STATE)
+ expect(described_class::CREATED).to eq(VCAP::CloudController::Repositories::ServiceUsageEventRepository::CREATED_EVENT_STATE)
+ expect(described_class::UPDATED).to eq(VCAP::CloudController::Repositories::ServiceUsageEventRepository::UPDATED_EVENT_STATE)
+ expect(described_class::DELETED).to eq(VCAP::CloudController::Repositories::ServiceUsageEventRepository::DELETED_EVENT_STATE)
+ expect(described_class::RUNNING_TASK_STATES).to contain_exactly(VCAP::CloudController::TaskModel::RUNNING_STATE,
+ VCAP::CloudController::TaskModel::CANCELING_STATE)
+ end
+ end
+
+ describe '.skip?' do
+ let(:skip_was_running_backfill) { nil }
+
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(skip_was_running_backfill)
+ end
+
+ context 'when skip_was_running_backfill is false' do
+ let(:skip_was_running_backfill) { false }
+
+ it 'returns false' do
+ expect(described_class.skip?).to be(false)
+ end
+ end
+
+ context 'when skip_was_running_backfill is true' do
+ let(:skip_was_running_backfill) { true }
+
+ it 'returns true' do
+ expect(described_class.skip?).to be(true)
+ end
+ end
+
+ context 'when skip_was_running_backfill is nil' do
+ let(:skip_was_running_backfill) { nil }
+
+ it 'returns false' do
+ expect(described_class.skip?).to be(false)
+ end
+ end
+
+ context 'when reading the config raises InvalidConfigPath' do
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_raise(VCAP::CloudController::Config::InvalidConfigPath)
+ end
+
+ it 'returns false rather than aborting the migration' do
+ expect(described_class.skip?).to be(false)
+ end
+ end
+ end
+
+ describe '.with_advisory_lock' do
+ it 'runs the block and releases the lock for subsequent runs' do
+ order = []
+ described_class.with_advisory_lock(db) { order << :first }
+ described_class.with_advisory_lock(db) { order << :second }
+ expect(order).to eq(%i[first second])
+ end
+
+ it 'raises without running the block when another session already holds the lock' do
+ other_db = Sequel.connect(DbConfig.new.connection_string)
+ begin
+ described_class.with_advisory_lock(other_db) do
+ expect { described_class.with_advisory_lock(db) { raise 'the block must not run' } }.
+ to raise_error(/another WAS_RUNNING backfill is already running/)
+ end
+ ensure
+ other_db.disconnect
+ end
+ end
+
+ it 'releases the lock when the block raises' do
+ expect { described_class.with_advisory_lock(db) { raise ArgumentError.new('boom') } }.to raise_error(ArgumentError, 'boom')
+ expect { |b| described_class.with_advisory_lock(db, &b) }.to yield_control
+ end
+ end
+
+ describe '.seed_app_usage_events' do
+ it 'seeds one WAS_RUNNING row per started process across batches, skips stopped processes, and adds nothing when run again' do
+ started1 = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ started2 = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED')
+ prune_usage_events!
+
+ # batch_size: 1 forces the keyset loop to iterate once per process.
+ described_class.seed_app_usage_events(db, logger, batch_size: 1)
+
+ expect(app_was_running.select_map(:app_guid)).to contain_exactly(started1.guid, started2.guid)
+ expect { described_class.seed_app_usage_events(db, logger, batch_size: 1) }.not_to change(app_was_running, :count)
+ end
+
+ it 'seeds a separate row per process when an app has multiple started processes' do
+ app = create(:app_model)
+ web = VCAP::CloudController::ProcessModelFactory.make(app: app, type: 'web', state: 'STARTED')
+ worker = VCAP::CloudController::ProcessModelFactory.make(app: app, type: 'worker', state: 'STARTED')
+ prune_usage_events!
+
+ described_class.seed_app_usage_events(db, logger, batch_size: 1)
+
+ scope = app_was_running.where(parent_app_guid: app.guid)
+ expect(scope.select_map(:app_guid)).to contain_exactly(web.guid, worker.guid)
+ expect(scope.select_map(:process_type)).to contain_exactly('web', 'worker')
+ end
+
+ it 'tolerates legacy NULLs in nullable process and app columns' do
+ process = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ prune_usage_events!
+ # Bypass the model layer, which would backfill these defaults.
+ db[:processes].where(guid: process.guid).update(memory: nil, instances: nil)
+
+ described_class.seed_app_usage_events(db, logger)
+
+ row = app_was_running.first(app_guid: process.guid)
+ expect(row[:memory_in_mb_per_instance]).to eq(0)
+ expect(row[:instance_count]).to eq(0)
+ end
+
+ it 'does not seed a baseline for a process that still has its real STARTED event' do
+ process = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ expect(db[:app_usage_events].where(state: 'STARTED', app_guid: process.guid).count).to eq(1)
+
+ described_class.seed_app_usage_events(db, logger)
+
+ # A consumer already tracks this process through its STARTED event.
+ # Giving it a second start on record would make such a consumer bill it
+ # twice.
+ expect(app_was_running.where(app_guid: process.guid).count).to eq(0)
+ end
+
+ describe 'repairing stale baselines' do
+ it 'appends a STOPPED event pairing baselines whose process is not running, without deleting any baseline' do
+ running = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ stopped = VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED')
+ prune_usage_events!
+ stale = create(:app_usage_event, state: 'WAS_RUNNING', app_guid: stopped.guid)
+ create(:app_usage_event, state: 'WAS_RUNNING', app_guid: 'no-such-process')
+
+ described_class.seed_app_usage_events(db, logger)
+
+ # Baselines are never deleted: a consumer may already have read them.
+ expect(app_was_running.select_map(:app_guid)).to contain_exactly(running.guid, stopped.guid, 'no-such-process')
+
+ repair = db[:app_usage_events].where(state: 'STOPPED', app_guid: stopped.guid).first
+ expect(repair).not_to be_nil
+ expect(repair[:id]).to be > stale.id
+ expect(repair[:guid]).to be_present
+ expect(repair[:guid]).not_to eq(stale.guid)
+ # previous_state is the baseline's state -- true (it was the last event
+ # a consumer saw) and a marker: no normal STOPPED ever carries it.
+ expect(repair[:previous_state]).to eq('WAS_RUNNING')
+ expect(repair[:app_name]).to eq(stale.app_name)
+ expect(repair[:space_guid]).to eq(stale.space_guid)
+ expect(repair[:org_guid]).to eq(stale.org_guid)
+ expect(repair[:instance_count]).to eq(stale.instance_count)
+ expect(repair[:memory_in_mb_per_instance]).to eq(stale.memory_in_mb_per_instance)
+
+ expect(db[:app_usage_events].where(state: 'STOPPED', app_guid: 'no-such-process').count).to eq(1)
+ expect(db[:app_usage_events].where(state: 'STOPPED', app_guid: running.guid).count).to eq(0)
+ end
+
+ it 'leaves a baseline alone when a later real ending already pairs it, even across re-runs' do
+ # Day 0: baseline seeded. Day 5: the app stops normally (a real
+ # STOPPED, higher id). Day 10: a rake re-run must not touch either row
+ # -- deleting the baseline would leave the STOPPED a consumer already
+ # read pointing at nothing.
+ stopped = VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED')
+ prune_usage_events!
+ create(:app_usage_event, state: 'WAS_RUNNING', app_guid: stopped.guid)
+ create(:app_usage_event, state: 'STOPPED', app_guid: stopped.guid)
+
+ expect { described_class.seed_app_usage_events(db, logger) }.not_to(change { db[:app_usage_events].count })
+ expect(app_was_running.where(app_guid: stopped.guid).count).to eq(1)
+ end
+
+ it 'appends a later ending when the only stop event landed before the baseline (backfill/API race)' do
+ stopped = VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED')
+ prune_usage_events!
+ create(:app_usage_event, state: 'STOPPED', app_guid: stopped.guid)
+ baseline = create(:app_usage_event, state: 'WAS_RUNNING', app_guid: stopped.guid)
+
+ described_class.seed_app_usage_events(db, logger)
+
+ # A consumer reading forward saw the early stop first (a stop with no
+ # start before it, which the docs say to ignore), then the baseline.
+ # Only a LATER ending closes the baseline.
+ expect(db[:app_usage_events].where(state: 'STOPPED', app_guid: stopped.guid).where { id > baseline.id }.count).to eq(1)
+ end
+
+ it 'does not add a second ending on a re-run: the one it added satisfies the next check' do
+ stopped = VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED')
+ prune_usage_events!
+ create(:app_usage_event, state: 'WAS_RUNNING', app_guid: stopped.guid)
+
+ described_class.seed_app_usage_events(db, logger)
+ expect { described_class.seed_app_usage_events(db, logger) }.not_to(change { db[:app_usage_events].count })
+ end
+ end
+ end
+
+ describe '.seed_task_usage_events' do
+ it 'seeds one TASK_WAS_RUNNING row per running task across batches, skips finished tasks, and adds nothing when run again' do
+ running1 = create(:task_model, state: 'RUNNING', memory_in_mb: 256)
+ running2 = create(:task_model, state: 'RUNNING')
+ create(:task_model, state: 'SUCCEEDED')
+
+ # batch_size: 1 forces the keyset loop to iterate once per task.
+ described_class.seed_task_usage_events(db, logger, batch_size: 1)
+
+ expect(task_was_running.select_map(:task_guid)).to contain_exactly(running1.guid, running2.guid)
+
+ row = task_was_running.first(task_guid: running1.guid)
+ expect(row[:previous_state]).to be_nil
+ expect(row[:task_name]).to eq(running1.name)
+ expect(row[:app_guid]).to eq('')
+ expect(row[:app_name]).to eq('')
+ expect(row[:parent_app_guid]).to eq(running1.app.guid)
+ expect(row[:parent_app_name]).to eq(running1.app.name)
+ expect(row[:instance_count]).to eq(1)
+ expect(row[:memory_in_mb_per_instance]).to eq(256)
+ expect(row[:previous_memory_in_mb_per_instance]).to eq(256)
+ expect(row[:package_state]).to eq('STAGED')
+ expect(row[:previous_package_state]).to eq('STAGED')
+ expect(row[:space_guid]).to eq(running1.space.guid)
+ expect(row[:space_name]).to eq(running1.space.name)
+ expect(row[:org_guid]).to eq(running1.space.organization.guid)
+
+ expect { described_class.seed_task_usage_events(db, logger, batch_size: 1) }.not_to change(task_was_running, :count)
+ end
+
+ it 'seeds CANCELING tasks, which are still running and billable until Diego reports them dead' do
+ canceling = create(:task_model, state: 'CANCELING')
+
+ described_class.seed_task_usage_events(db, logger)
+
+ expect(task_was_running.select_map(:task_guid)).to contain_exactly(canceling.guid)
+ # The repair must agree that CANCELING counts as running: writing a stop
+ # here would make TaskModel think the task already stopped, and it would
+ # skip the real stop later.
+ expect(db[:app_usage_events].where(state: 'TASK_STOPPED', task_guid: canceling.guid).count).to eq(0)
+ end
+
+ it 'tolerates a legacy NULL task memory' do
+ task = create(:task_model, state: 'RUNNING')
+ # Bypass the model layer, which would backfill the default.
+ db[:tasks].where(guid: task.guid).update(memory_in_mb: nil)
+
+ described_class.seed_task_usage_events(db, logger)
+
+ row = task_was_running.first(task_guid: task.guid)
+ expect(row[:memory_in_mb_per_instance]).to eq(0)
+ end
+
+ it 'does not seed a baseline for a task that still has its real TASK_STARTED event' do
+ running = create(:task_model, state: 'RUNNING')
+ create(:app_usage_event, state: 'TASK_STARTED', task_guid: running.guid)
+
+ described_class.seed_task_usage_events(db, logger)
+
+ expect(task_was_running.count).to eq(0)
+ end
+
+ describe 'repairing stale baselines' do
+ it 'appends a TASK_STOPPED event pairing baselines whose task is no longer running, without deleting any baseline' do
+ running = create(:task_model, state: 'RUNNING')
+ completed = create(:task_model, state: 'SUCCEEDED')
+ stale = create(:app_usage_event, state: 'TASK_WAS_RUNNING', task_guid: completed.guid)
+ create(:app_usage_event, state: 'TASK_WAS_RUNNING', task_guid: 'no-such-task')
+
+ described_class.seed_task_usage_events(db, logger)
+
+ # Baselines are never deleted: a consumer may already have read them.
+ expect(task_was_running.select_map(:task_guid)).to contain_exactly(running.guid, completed.guid, 'no-such-task')
+
+ repair = db[:app_usage_events].where(state: 'TASK_STOPPED', task_guid: completed.guid).first
+ expect(repair).not_to be_nil
+ expect(repair[:id]).to be > stale.id
+ expect(repair[:previous_state]).to eq('TASK_WAS_RUNNING')
+ expect(repair[:task_name]).to eq(stale.task_name)
+ expect(repair[:app_guid]).to eq(stale.app_guid)
+ expect(repair[:parent_app_guid]).to eq(stale.parent_app_guid)
+ expect(repair[:memory_in_mb_per_instance]).to eq(stale.memory_in_mb_per_instance)
+
+ expect(db[:app_usage_events].where(state: 'TASK_STOPPED', task_guid: 'no-such-task').count).to eq(1)
+ expect(db[:app_usage_events].where(state: 'TASK_STOPPED', task_guid: running.guid).count).to eq(0)
+ end
+
+ it 'leaves a baseline alone when a later gated TASK_STOPPED already pairs it, even across re-runs' do
+ completed = create(:task_model, state: 'SUCCEEDED')
+ create(:app_usage_event, state: 'TASK_WAS_RUNNING', task_guid: completed.guid)
+ create(:app_usage_event, state: 'TASK_STOPPED', task_guid: completed.guid)
+
+ expect { described_class.seed_task_usage_events(db, logger) }.not_to(change { db[:app_usage_events].count })
+ expect(task_was_running.where(task_guid: completed.guid).count).to eq(1)
+ end
+ end
+
+ describe 'seed / stop / repair interleavings' do
+ # An earlier version of the backfill DELETED baselines it thought were
+ # stale, deciding from current resource state alone -- and deleted rows
+ # that consumers (and TaskModel's stop-event check) had already used.
+ # These scenarios are the regression net for that class of bug. Steps a
+ # live foundation can interleave:
+ #
+ # :backfill -- a full backfill run (seed + repair), the way the
+ # migration or the rake task runs it
+ # :task_canceled -- an operator asks to cancel the task (-> CANCELING)
+ # :task_finishes -- Diego reports the task dead; the model decides
+ # whether to write the stop event
+ # :task_destroyed -- the task row is destroyed while still running
+ # (the app-deletion path)
+ # :task_row_wiped -- the task row disappears with no hooks running
+ # (a dataset-level delete)
+ # :phantom_baseline -- what a seed batch writes when its snapshot is
+ # older than the task's death: a baseline for a
+ # task that already stopped, silently
+ #
+ # The rule being checked: after any of these orderings, every beginning
+ # event a consumer could have read for a now-stopped task has a later
+ # ending event, and no baseline row has been deleted.
+ [
+ { name: 'gated stop after seed; a re-run must not unpair it',
+ steps: %i[backfill task_finishes backfill], baselines: 1, stops: 1 },
+ { name: 'task already CANCELING at seed time is baselined and eventually paired by its gated stop',
+ steps: %i[task_canceled backfill task_finishes backfill], baselines: 1, stops: 1 },
+ { name: 'task canceled after seeding keeps its baseline and gets no stop while it is still CANCELING',
+ steps: %i[backfill task_canceled backfill], baselines: 1, stops: 0 },
+ { name: 'phantom baseline for a task that already died silently; repair appends the missing stop',
+ steps: %i[task_finishes phantom_baseline backfill], baselines: 1, stops: 1 },
+ { name: 'task destroyed mid-run emits a gated stop against the baseline; a re-run is a no-op',
+ steps: %i[backfill task_destroyed backfill], baselines: 1, stops: 1 },
+ { name: 'task row wiped without hooks; repair appends the missing stop',
+ steps: %i[backfill task_row_wiped backfill], baselines: 1, stops: 1 }
+ ].each do |scenario|
+ it "pairs every readable beginning: #{scenario[:name]}" do
+ task = create(:task_model, state: 'RUNNING')
+
+ scenario[:steps].each do |step|
+ case step
+ when :backfill then described_class.seed_task_usage_events(db, logger)
+ when :task_canceled then task.update(state: 'CANCELING')
+ when :task_finishes then task.update(state: 'FAILED')
+ when :task_destroyed then task.destroy
+ when :task_row_wiped then db[:tasks].where(guid: task.guid).delete
+ when :phantom_baseline then create(:app_usage_event, state: 'TASK_WAS_RUNNING', task_guid: task.guid)
+ end
+ end
+
+ baselines = task_was_running.where(task_guid: task.guid)
+ stops = db[:app_usage_events].where(state: 'TASK_STOPPED', task_guid: task.guid)
+ expect(baselines.count).to eq(scenario[:baselines])
+ expect(stops.count).to eq(scenario[:stops])
+
+ still_running = db[:tasks].where(guid: task.guid, state: %w[RUNNING CANCELING]).any?
+ next if still_running
+
+ db[:app_usage_events].where(state: %w[TASK_STARTED TASK_WAS_RUNNING], task_guid: task.guid).each do |beginning|
+ expect(stops.where { id > beginning[:id] }.count).to be_positive,
+ "beginning #{beginning[:state]} (id #{beginning[:id]}) has no later ending event"
+ end
+ end
+ end
+ end
+ end
+
+ describe '.seed_service_usage_events' do
+ it 'seeds one WAS_RUNNING row per instance across multiple batches, with the right type, adding nothing when run again' do
+ managed = create(:managed_service_instance)
+ upsi = create(:user_provided_service_instance)
+ prune_usage_events!
+
+ # batch_size: 1 forces the keyset loop to iterate once per instance.
+ described_class.seed_service_usage_events(db, logger, batch_size: 1)
+
+ expect(was_running.select_map(:service_instance_guid)).to contain_exactly(managed.guid, upsi.guid)
+
+ managed_row = was_running.first(service_instance_guid: managed.guid)
+ expect(managed_row[:service_instance_type]).to eq('managed_service_instance')
+ expect(managed_row[:service_plan_guid]).to eq(managed.service_plan.guid)
+ expect(managed_row[:service_broker_name]).to eq(managed.service_plan.service.service_broker.name)
+
+ upsi_row = was_running.first(service_instance_guid: upsi.guid)
+ expect(upsi_row[:service_instance_type]).to eq('user_provided_service_instance')
+ expect(upsi_row[:service_plan_guid]).to be_nil
+
+ expect { described_class.seed_service_usage_events(db, logger, batch_size: 1) }.not_to change(was_running, :count)
+ end
+
+ it 'does not seed a baseline for an instance that still has its real CREATED or UPDATED event' do
+ create(:managed_service_instance) # the factory itself writes the real CREATED event
+ updated_only = create(:managed_service_instance)
+ db[:service_usage_events].where(service_instance_guid: updated_only.guid).update(state: 'UPDATED')
+
+ described_class.seed_service_usage_events(db, logger)
+
+ expect(was_running.count).to eq(0)
+ end
+
+ describe 'repairing stale baselines' do
+ it 'appends a DELETED event pairing baselines whose instance no longer exists, without deleting any baseline' do
+ kept = create(:managed_service_instance)
+ doomed = create(:managed_service_instance)
+ prune_usage_events!
+
+ described_class.seed_service_usage_events(db, logger)
+ baseline = was_running.first(service_instance_guid: doomed.guid)
+ db[:service_instances].where(guid: doomed.guid).delete
+
+ described_class.seed_service_usage_events(db, logger)
+
+ # Baselines are never deleted: a consumer may already have read them.
+ expect(was_running.select_map(:service_instance_guid)).to contain_exactly(kept.guid, doomed.guid)
+
+ repair = db[:service_usage_events].where(state: 'DELETED', service_instance_guid: doomed.guid).first
+ expect(repair).not_to be_nil
+ expect(repair[:id]).to be > baseline[:id]
+ expect(repair[:guid]).to be_present
+ expect(repair[:guid]).not_to eq(baseline[:guid])
+ expect(repair[:service_instance_name]).to eq(baseline[:service_instance_name])
+ expect(repair[:service_instance_type]).to eq(baseline[:service_instance_type])
+ expect(repair[:service_plan_guid]).to eq(baseline[:service_plan_guid])
+ expect(repair[:service_broker_guid]).to eq(baseline[:service_broker_guid])
+ expect(repair[:space_guid]).to eq(baseline[:space_guid])
+ expect(repair[:org_guid]).to eq(baseline[:org_guid])
+
+ expect(db[:service_usage_events].where(state: 'DELETED', service_instance_guid: kept.guid).count).to eq(0)
+ end
+
+ it 'leaves a baseline alone when a later real DELETED already pairs it, even across re-runs' do
+ create(:service_usage_event, state: 'WAS_RUNNING', service_instance_guid: 'gone-instance')
+ create(:service_usage_event, state: 'DELETED', service_instance_guid: 'gone-instance')
+
+ expect { described_class.seed_service_usage_events(db, logger) }.not_to(change { db[:service_usage_events].count })
+ expect(was_running.where(service_instance_guid: 'gone-instance').count).to eq(1)
+ end
+
+ it 'does not add a second DELETED on a re-run: the one it added satisfies the next check' do
+ create(:service_usage_event, state: 'WAS_RUNNING', service_instance_guid: 'gone-instance')
+
+ described_class.seed_service_usage_events(db, logger)
+ expect { described_class.seed_service_usage_events(db, logger) }.not_to(change { db[:service_usage_events].count })
+ end
+ end
+ end
+end
diff --git a/spec/unit/models/runtime/task_model_spec.rb b/spec/unit/models/runtime/task_model_spec.rb
index 731301a5644..7bd69d63a9a 100644
--- a/spec/unit/models/runtime/task_model_spec.rb
+++ b/spec/unit/models/runtime/task_model_spec.rb
@@ -28,6 +28,30 @@ module VCAP::CloudController
expect(event.task_guid).to eq(task.guid)
expect(event.parent_app_guid).to eq(task.app.guid)
end
+
+ context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do
+ let!(:start_event) { create(:app_usage_event, task_guid: task.guid, state: 'TASK_WAS_RUNNING') }
+
+ it 'still creates a TASK_STOPPED event' do
+ task.update(state: TaskModel::SUCCEEDED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).not_to be_nil
+ expect(event.task_guid).to eq(task.guid)
+ expect(event.parent_app_guid).to eq(task.app.guid)
+ end
+ end
+
+ context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do
+ let!(:start_event) { nil }
+
+ it 'does not create a TASK_STOPPED event, since no consumer ever saw the task start' do
+ task.update(state: TaskModel::SUCCEEDED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).to be_nil
+ end
+ end
end
context 'when the task is moving to the FAILED_STATE' do
@@ -43,6 +67,28 @@ module VCAP::CloudController
expect(event.task_guid).to eq(task.guid)
expect(event.parent_app_guid).to eq(task.app.guid)
end
+
+ context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do
+ let!(:start_event) { create(:app_usage_event, task_guid: task.guid, state: 'TASK_WAS_RUNNING') }
+
+ it 'still creates a TASK_STOPPED event' do
+ task.update(state: TaskModel::FAILED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).not_to be_nil
+ end
+ end
+
+ context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do
+ let!(:start_event) { nil }
+
+ it 'does not create a TASK_STOPPED event, since no consumer ever saw the task start' do
+ task.update(state: TaskModel::FAILED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).to be_nil
+ end
+ end
end
context 'when the task is moving from the PENDING state' do
@@ -72,6 +118,17 @@ module VCAP::CloudController
end
end
+ context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do
+ let!(:start_event) { create(:app_usage_event, task_guid: task.guid, state: 'TASK_WAS_RUNNING') }
+
+ it 'still creates a TASK_STOPPED event' do
+ task.update(state: TaskModel::FAILED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).not_to be_nil
+ end
+ end
+
context 'when the task does not have a TASK_STARTED event' do
let!(:start_event) { nil }
@@ -132,7 +189,8 @@ module VCAP::CloudController
end
describe 'after destroy' do
- let(:task) { create(:task_model, app: parent_app, state: TaskModel::PENDING_STATE) }
+ let(:task) { create(:task_model, app: parent_app, state: TaskModel::RUNNING_STATE) }
+ let!(:start_event) { create(:app_usage_event, task_guid: task.guid, state: 'TASK_STARTED') }
it 'creates a TASK_STOPPED event' do
task.destroy
@@ -143,6 +201,42 @@ module VCAP::CloudController
expect(event.parent_app_guid).to eq(task.app.guid)
end
+ context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do
+ let!(:start_event) { create(:app_usage_event, task_guid: task.guid, state: 'TASK_WAS_RUNNING') }
+
+ it 'still creates a TASK_STOPPED event' do
+ task.destroy
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).not_to be_nil
+ end
+ end
+
+ context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do
+ let(:task) { create(:task_model, app: parent_app, state: TaskModel::PENDING_STATE) }
+ let!(:start_event) { nil }
+
+ it 'does not create a TASK_STOPPED event, since no consumer ever saw the task start' do
+ task.destroy
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).to be_nil
+ end
+ end
+
+ context 'when the task already has a TASK_STOPPED event' do
+ before do
+ create(:app_usage_event, task_guid: task.guid, state: 'TASK_STOPPED')
+ end
+
+ it 'does not create an additional TASK_STOPPED event' do
+ task.destroy
+
+ events = AppUsageEvent.where(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(events.count).to equal(1)
+ end
+ end
+
context 'when the task is already in a terminal state (and thus already has a stop event)' do
describe 'when the task is failed' do
let(:task) { create(:task_model, app: parent_app, state: TaskModel::FAILED_STATE) }
|