From 0496c1560180b83faba086b5b5f6fa3f65427412 Mon Sep 17 00:00:00 2001 From: Marcos Filipe Date: Tue, 18 Nov 2025 08:34:49 -0300 Subject: [PATCH] feat(zebra): implement data deletion workers --- .../20251118091829_add_expires_at_to_jobs.rb | 5 + ...add_expires_created_index_at_jobs_table.rb | 12 + ...rganization_created_index_at_jobs_table.rb | 12 + zebra/Makefile | 2 + zebra/config/config.exs | 8 + .../lib/protos/internal_api/artifacthub.pb.ex | 6 +- zebra/lib/protos/internal_api/rbac.pb.ex | 29 ++ zebra/lib/protos/internal_api/usage.pb.ex | 358 ++++++++++++++++++ zebra/lib/zebra/models/job.ex | 81 +++- zebra/lib/zebra/workers.ex | 2 + .../workers/job_deletion_policy_marker.ex | 113 ++++++ .../workers/job_deletion_policy_worker.ex | 141 +++++++ .../20251118091829_add_expires_at_to_jobs.exs | 9 + ...dd_expires_created_index_at_jobs_table.exs | 14 + ...ganization_created_index_at_jobs_table.exs | 14 + zebra/scripts/internal_protos.sh | 3 +- .../job_deletion_policy_marker_test.exs | 287 ++++++++++++++ .../job_deletion_policy_worker_test.exs | 201 ++++++++++ zebra/test/zebra/workers_test.exs | 12 +- 19 files changed, 1303 insertions(+), 6 deletions(-) create mode 100644 github_hooks/db/migrate/20251118091829_add_expires_at_to_jobs.rb create mode 100644 github_hooks/db/migrate/20251119103829_add_expires_created_index_at_jobs_table.rb create mode 100644 github_hooks/db/migrate/20251119104017_add_organization_created_index_at_jobs_table.rb create mode 100644 zebra/lib/protos/internal_api/usage.pb.ex create mode 100644 zebra/lib/zebra/workers/job_deletion_policy_marker.ex create mode 100644 zebra/lib/zebra/workers/job_deletion_policy_worker.ex create mode 100644 zebra/priv/legacy_repo/migrations/20251118091829_add_expires_at_to_jobs.exs create mode 100644 zebra/priv/legacy_repo/migrations/20251119103829_add_expires_created_index_at_jobs_table.exs create mode 100644 zebra/priv/legacy_repo/migrations/20251119104017_add_organization_created_index_at_jobs_table.exs create mode 100644 zebra/test/zebra/workers/job_deletion_policy_marker_test.exs create mode 100644 zebra/test/zebra/workers/job_deletion_policy_worker_test.exs diff --git a/github_hooks/db/migrate/20251118091829_add_expires_at_to_jobs.rb b/github_hooks/db/migrate/20251118091829_add_expires_at_to_jobs.rb new file mode 100644 index 000000000..e0cb8b35e --- /dev/null +++ b/github_hooks/db/migrate/20251118091829_add_expires_at_to_jobs.rb @@ -0,0 +1,5 @@ +class AddExpiresAtToJobs < ActiveRecord::Migration[6.1] + def change + add_column :jobs, :expires_at, :datetime, if_not_exists: true + end +end diff --git a/github_hooks/db/migrate/20251119103829_add_expires_created_index_at_jobs_table.rb b/github_hooks/db/migrate/20251119103829_add_expires_created_index_at_jobs_table.rb new file mode 100644 index 000000000..08dbcaea7 --- /dev/null +++ b/github_hooks/db/migrate/20251119103829_add_expires_created_index_at_jobs_table.rb @@ -0,0 +1,12 @@ +class AddExpiresCreatedIndexAtJobsTable < ActiveRecord::Migration[6.1] + disable_ddl_transaction! + + def change + add_index :jobs, + %i[expires_at created_at], + name: "index_jobs_on_expires_created_not_null", + algorithm: :concurrently, + where: "expires_at IS NOT NULL", + if_not_exists: true + end +end diff --git a/github_hooks/db/migrate/20251119104017_add_organization_created_index_at_jobs_table.rb b/github_hooks/db/migrate/20251119104017_add_organization_created_index_at_jobs_table.rb new file mode 100644 index 000000000..022e500bd --- /dev/null +++ b/github_hooks/db/migrate/20251119104017_add_organization_created_index_at_jobs_table.rb @@ -0,0 +1,12 @@ +class AddOrganizationCreatedIndexAtJobsTable < ActiveRecord::Migration[6.1] + disable_ddl_transaction! + + def change + add_index :jobs, + %i[organization_id created_at], + name: "index_jobs_on_organization_created_expires_is_null", + algorithm: :concurrently, + where: "expires_at IS NULL", + if_not_exists: true + end +end diff --git a/zebra/Makefile b/zebra/Makefile index 31e7fcaa3..41b515d5d 100644 --- a/zebra/Makefile +++ b/zebra/Makefile @@ -36,6 +36,8 @@ ZEBRA_CALLBACK_TOKEN_KEYS?="testing" CONTAINER_ENV_VARS= \ -e CI=$(CI) \ -e MIX_ENV=$(MIX_ENV) \ + -e START_JOB_DELETION_POLICY_MARKER=$(START_JOB_DELETION_POLICY_MARKER) \ + -e START_JOB_DELETION_POLICY_WORKER=$(START_JOB_DELETION_POLICY_WORKER) \ -e START_PUBLIC_JOB_API=$(START_PUBLIC_JOB_API) \ -e START_INTERNAL_JOB_API=$(START_INTERNAL_JOB_API) \ -e START_INTERNAL_TASK_API=$(START_INTERNAL_TASK_API) \ diff --git a/zebra/config/config.exs b/zebra/config/config.exs index 675756cdb..a36ad8e0d 100644 --- a/zebra/config/config.exs +++ b/zebra/config/config.exs @@ -24,6 +24,14 @@ config :zebra, Zebra.Workers.TaskFinisher, timeout: 10_000 config :zebra, Zebra.Workers.Dispatcher, timeout: 1_000 config :zebra, Zebra.Workers.Monitor, timeout: 60_000 +config :zebra, Zebra.Workers.JobDeletionPolicyWorker, + naptime: 1_000, # 1 second + longnaptime: 3_600_000, # 1 hour + limit: 100 + +config :zebra, Zebra.Workers.JobDeletionPolicyMarker, + days: 14 + config :zebra, Zebra.Workers.Scheduler, cooldown_period: 1_000, batch_size: 3 diff --git a/zebra/lib/protos/internal_api/artifacthub.pb.ex b/zebra/lib/protos/internal_api/artifacthub.pb.ex index d952ea7c3..46c16957b 100644 --- a/zebra/lib/protos/internal_api/artifacthub.pb.ex +++ b/zebra/lib/protos/internal_api/artifacthub.pb.ex @@ -388,12 +388,14 @@ defmodule InternalApi.Artifacthub.ListItem do @type t :: %__MODULE__{ name: String.t(), - is_directory: boolean + is_directory: boolean, + size: integer } - defstruct [:name, :is_directory] + defstruct [:name, :is_directory, :size] field(:name, 1, type: :string) field(:is_directory, 2, type: :bool) + field(:size, 3, type: :int64) end defmodule InternalApi.Artifacthub.Artifact do diff --git a/zebra/lib/protos/internal_api/rbac.pb.ex b/zebra/lib/protos/internal_api/rbac.pb.ex index 29fb21c76..2aa5958d1 100644 --- a/zebra/lib/protos/internal_api/rbac.pb.ex +++ b/zebra/lib/protos/internal_api/rbac.pb.ex @@ -508,12 +508,39 @@ defmodule InternalApi.RBAC.Permission do field(:scope, 4, type: InternalApi.RBAC.Scope, enum: true) end +defmodule InternalApi.RBAC.ListSubjectsRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + subject_ids: [String.t()] + } + defstruct [:org_id, :subject_ids] + + field(:org_id, 1, type: :string) + field(:subject_ids, 2, repeated: true, type: :string) +end + +defmodule InternalApi.RBAC.ListSubjectsResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + subjects: [InternalApi.RBAC.Subject.t()] + } + defstruct [:subjects] + + field(:subjects, 1, repeated: true, type: InternalApi.RBAC.Subject) +end + defmodule InternalApi.RBAC.SubjectType do @moduledoc false use Protobuf, enum: true, syntax: :proto3 field(:USER, 0) field(:GROUP, 1) + field(:SERVICE_ACCOUNT, 2) end defmodule InternalApi.RBAC.Scope do @@ -588,6 +615,8 @@ defmodule InternalApi.RBAC.RBAC.Service do InternalApi.RBAC.RefreshCollaboratorsRequest, InternalApi.RBAC.RefreshCollaboratorsResponse ) + + rpc(:ListSubjects, InternalApi.RBAC.ListSubjectsRequest, InternalApi.RBAC.ListSubjectsResponse) end defmodule InternalApi.RBAC.RBAC.Stub do diff --git a/zebra/lib/protos/internal_api/usage.pb.ex b/zebra/lib/protos/internal_api/usage.pb.ex new file mode 100644 index 000000000..b72ab1d74 --- /dev/null +++ b/zebra/lib/protos/internal_api/usage.pb.ex @@ -0,0 +1,358 @@ +defmodule InternalApi.Usage.ListDailyUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :period_started_at, :period_finished_at] + + field(:org_id, 1, type: :string) + field(:period_started_at, 2, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 3, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ListDailyUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + status: Google.Rpc.Status.t(), + daily_usages: [InternalApi.Usage.DailyUsage.t()] + } + defstruct [:status, :daily_usages] + + field(:status, 1, type: Google.Rpc.Status) + field(:daily_usages, 2, repeated: true, type: InternalApi.Usage.DailyUsage) +end + +defmodule InternalApi.Usage.DailyUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + resource_usages: [InternalApi.Usage.DailyResourceUsage.t()], + date: Google.Protobuf.Timestamp.t() + } + defstruct [:resource_usages, :date] + + field(:resource_usages, 1, repeated: true, type: InternalApi.Usage.DailyResourceUsage) + field(:date, 2, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.DailyResourceUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + machine_type: String.t(), + minutes_used: integer, + seconds_used: integer + } + defstruct [:machine_type, :minutes_used, :seconds_used] + + field(:machine_type, 1, type: :string) + field(:minutes_used, 2, type: :int32) + field(:seconds_used, 3, type: :int32) +end + +defmodule InternalApi.Usage.ProjectsUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :period_started_at, :period_finished_at] + + field(:org_id, 1, type: :string) + field(:period_started_at, 2, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 3, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ProjectsUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + status: Google.Rpc.Status.t(), + project_usages: [InternalApi.Usage.ProjectUsage.t()] + } + defstruct [:status, :project_usages] + + field(:status, 1, type: Google.Rpc.Status) + field(:project_usages, 2, repeated: true, type: InternalApi.Usage.ProjectUsage) +end + +defmodule InternalApi.Usage.ProjectUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + project_id: String.t(), + resource_usages: [InternalApi.Usage.ResourceUsage.t()] + } + defstruct [:project_id, :resource_usages] + + field(:project_id, 1, type: :string) + field(:resource_usages, 2, repeated: true, type: InternalApi.Usage.ResourceUsage) +end + +defmodule InternalApi.Usage.TotalUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t(), + org_id: String.t() + } + defstruct [:period_started_at, :period_finished_at, :org_id] + + field(:period_started_at, 1, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 2, type: Google.Protobuf.Timestamp) + field(:org_id, 3, type: :string) +end + +defmodule InternalApi.Usage.TotalUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + status: Google.Rpc.Status.t(), + resource_usages: [InternalApi.Usage.ResourceUsage.t()] + } + defstruct [:status, :resource_usages] + + field(:status, 1, type: Google.Rpc.Status) + field(:resource_usages, 2, repeated: true, type: InternalApi.Usage.ResourceUsage) +end + +defmodule InternalApi.Usage.TotalMembersUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :period_started_at, :period_finished_at] + + field(:org_id, 1, type: :string) + field(:period_started_at, 3, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 4, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.TotalMembersUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + members: integer, + requesters: integer + } + defstruct [:members, :requesters] + + field(:members, 1, type: :int32) + field(:requesters, 2, type: :int32) +end + +defmodule InternalApi.Usage.ResourceUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + machine_type: String.t(), + seconds_used: integer + } + defstruct [:machine_type, :seconds_used] + + field(:machine_type, 1, type: :string) + field(:seconds_used, 2, type: :int32) +end + +defmodule InternalApi.Usage.ListQuotaUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + points: integer, + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :points, :period_started_at, :period_finished_at] + + field(:org_id, 1, type: :string) + field(:points, 2, type: :int32) + field(:period_started_at, 3, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 4, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ListQuotaUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + status: Google.Rpc.Status.t(), + usages: [InternalApi.Usage.QuotaUsage.t()] + } + defstruct [:status, :usages] + + field(:status, 1, type: Google.Rpc.Status) + field(:usages, 2, repeated: true, type: InternalApi.Usage.QuotaUsage) +end + +defmodule InternalApi.Usage.QuotaUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + machine_type: String.t(), + points: [InternalApi.Usage.QuotaUsage.Point.t()] + } + defstruct [:machine_type, :points] + + field(:machine_type, 1, type: :string) + field(:points, 2, repeated: true, type: InternalApi.Usage.QuotaUsage.Point) +end + +defmodule InternalApi.Usage.QuotaUsage.Point do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + usage: integer, + date: Google.Protobuf.Timestamp.t() + } + defstruct [:usage, :date] + + field(:usage, 1, type: :int32) + field(:date, 2, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ListSeatsRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + from_gte: Google.Protobuf.Timestamp.t(), + to_lt: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :from_gte, :to_lt] + + field(:org_id, 1, type: :string) + field(:from_gte, 2, type: Google.Protobuf.Timestamp) + field(:to_lt, 3, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ListSeatsResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + seats: [InternalApi.Usage.Seat.t()] + } + defstruct [:seats] + + field(:seats, 1, repeated: true, type: InternalApi.Usage.Seat) +end + +defmodule InternalApi.Usage.Seat do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + user_id: String.t(), + display_name: String.t(), + origin: integer, + status: integer, + date: Google.Protobuf.Timestamp.t() + } + defstruct [:user_id, :display_name, :origin, :status, :date] + + field(:user_id, 1, type: :string) + field(:display_name, 2, type: :string) + field(:origin, 3, type: InternalApi.Usage.SeatOrigin, enum: true) + field(:status, 4, type: InternalApi.Usage.SeatStatus, enum: true) + field(:date, 5, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.OrganizationPolicyApply do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + cutoff_date: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :cutoff_date] + + field(:org_id, 1, type: :string) + field(:cutoff_date, 2, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.SeatOrigin do + @moduledoc false + use Protobuf, enum: true, syntax: :proto3 + + field(:SEAT_ORIGIN_UNSPECIFIED, 0) + field(:SEAT_ORIGIN_SEMAPHORE, 1) + field(:SEAT_ORIGIN_GITHUB, 2) + field(:SEAT_ORIGIN_BITBUCKET, 3) + field(:SEAT_ORIGIN_GITLAB, 4) +end + +defmodule InternalApi.Usage.SeatStatus do + @moduledoc false + use Protobuf, enum: true, syntax: :proto3 + + field(:SEAT_TYPE_UNSPECIFIED, 0) + field(:SEAT_TYPE_ACTIVE_MEMBER, 1) + field(:SEAT_TYPE_NON_ACTIVE_MEMBER, 2) + field(:SEAT_TYPE_NON_MEMBER, 3) +end + +defmodule InternalApi.Usage.UsageService.Service do + @moduledoc false + use GRPC.Service, name: "InternalApi.Usage.UsageService" + + rpc( + :ListDailyUsage, + InternalApi.Usage.ListDailyUsageRequest, + InternalApi.Usage.ListDailyUsageResponse + ) + + rpc( + :ProjectsUsage, + InternalApi.Usage.ProjectsUsageRequest, + InternalApi.Usage.ProjectsUsageResponse + ) + + rpc(:TotalUsage, InternalApi.Usage.TotalUsageRequest, InternalApi.Usage.TotalUsageResponse) + + rpc( + :ListQuotaUsage, + InternalApi.Usage.ListQuotaUsageRequest, + InternalApi.Usage.ListQuotaUsageResponse + ) + + rpc( + :TotalMembersUsage, + InternalApi.Usage.TotalMembersUsageRequest, + InternalApi.Usage.TotalMembersUsageResponse + ) + + rpc(:ListSeats, InternalApi.Usage.ListSeatsRequest, InternalApi.Usage.ListSeatsResponse) +end + +defmodule InternalApi.Usage.UsageService.Stub do + @moduledoc false + use GRPC.Stub, service: InternalApi.Usage.UsageService.Service +end diff --git a/zebra/lib/zebra/models/job.ex b/zebra/lib/zebra/models/job.ex index 8a30ff760..d9a0656a9 100644 --- a/zebra/lib/zebra/models/job.ex +++ b/zebra/lib/zebra/models/job.ex @@ -43,7 +43,7 @@ defmodule Zebra.Models.Job do @primary_key {:id, :binary_id, autogenerate: true} @foreign_key_type :binary_id @required_fields ~w(name organization_id project_id aasm_state created_at updated_at machine_type spec)a - @optional_fields ~w(build_id priority execution_time_limit deployment_target_id repository_id enqueued_at scheduled_at started_at finished_at request index port name machine_os_image failure_reason result agent_id agent_name agent_ip_address agent_ctrl_port agent_auth_token private_ssh_key)a + @optional_fields ~w(build_id priority execution_time_limit deployment_target_id repository_id enqueued_at scheduled_at started_at finished_at request index port name machine_os_image failure_reason result agent_id agent_name agent_ip_address agent_ctrl_port agent_auth_token private_ssh_key expires_at)a schema "jobs" do belongs_to(:task, Zebra.Models.Task, foreign_key: :build_id) @@ -79,6 +79,7 @@ defmodule Zebra.Models.Job do field(:scheduled_at, :utc_datetime) field(:started_at, :utc_datetime) field(:finished_at, :utc_datetime) + field(:expires_at, :utc_datetime) end def create(params) do @@ -356,6 +357,84 @@ defmodule Zebra.Models.Job do ) end + def mark_jobs_for_deletion(org_id, cutoff_date, deletion_days) do + import Ecto.Query, only: [from: 2] + + mark_query = + from(j in Zebra.Models.Job, + where: + is_nil(j.expires_at) and + j.organization_id == ^org_id and + j.created_at <= ^cutoff_date, + update: [ + set: [ + expires_at: fragment("CURRENT_TIMESTAMP + (? * INTERVAL '1 day')", ^deletion_days) + ] + ] + ) + + unmark_query = + from(j in Zebra.Models.Job, + where: + not is_nil(j.expires_at) and + j.organization_id == ^org_id and + j.created_at > ^cutoff_date, + update: [ + set: [ + expires_at: nil + ] + ] + ) + + {marked_count, _} = Zebra.LegacyRepo.update_all(mark_query, []) + {unmarked_count, _} = Zebra.LegacyRepo.update_all(unmark_query, []) + + {marked_count, unmarked_count} + end + + def delete_old_job_stop_requests(limit) do + import Ecto.Query, + only: [from: 2, where: 3, subquery: 1, limit: 2, order_by: 2] + + jobs_subquery = + from(j in Zebra.Models.Job, + where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"), + order_by: [asc: j.created_at], + limit: ^limit, + select: j.id + ) + + query = + from(jsr in Zebra.Models.JobStopRequest, + where: jsr.job_id in subquery(jobs_subquery) + ) + + {deleted_count, _} = Zebra.LegacyRepo.delete_all(query) + + {:ok, deleted_count} + end + + def delete_old_jobs(limit) do + import Ecto.Query, only: [from: 2, subquery: 1] + + jobs_subquery = + from(j in Zebra.Models.Job, + where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"), + order_by: [asc: j.created_at], + limit: ^limit, + select: j.id + ) + + query = + from(j in Zebra.Models.Job, + where: j.id in subquery(jobs_subquery) + ) + + {deleted_count, _} = Zebra.LegacyRepo.delete_all(query) + + {:ok, deleted_count} + end + def wait_for_agent(job) do if valid_transition?(job.aasm_state, state_waiting_for_agent()) do update(job, %{aasm_state: state_waiting_for_agent()}) diff --git a/zebra/lib/zebra/workers.ex b/zebra/lib/zebra/workers.ex index 9dd482b28..3a0093823 100644 --- a/zebra/lib/zebra/workers.ex +++ b/zebra/lib/zebra/workers.ex @@ -1,5 +1,7 @@ defmodule Zebra.Workers do @all [ + %{name: Zebra.Workers.JobDeletionPolicyMarker, flag: "START_JOB_DELETION_POLICY_MARKER"}, + %{name: Zebra.Workers.JobDeletionPolicyWorker, flag: "START_JOB_DELETION_POLICY_WORKER"}, %{name: Zebra.Workers.JobStartedCallbackWorker, flag: "START_JOB_STARTED_CALLBACK_WORKER"}, %{name: Zebra.Workers.JobFinishedCallbackWorker, flag: "START_JOB_FINISHED_CALLBACK_WORKER"}, %{name: Zebra.Workers.JobTeardownCallbackWorker, flag: "START_JOB_TEARDOWN_CALLBACK_WORKER"}, diff --git a/zebra/lib/zebra/workers/job_deletion_policy_marker.ex b/zebra/lib/zebra/workers/job_deletion_policy_marker.ex new file mode 100644 index 000000000..2b3138776 --- /dev/null +++ b/zebra/lib/zebra/workers/job_deletion_policy_marker.ex @@ -0,0 +1,113 @@ +defmodule Zebra.Workers.JobDeletionPolicyMarker do + require Logger + + alias Zebra.Models.Job + alias Google.Protobuf.Timestamp + + @default_grace_period_days 14 + @min_grace_period_days 7 + + use Tackle.Consumer, + url: Application.get_env(:zebra, :amqp_url), + service: "zebra", + exchange: "usage_internal_api", + routing_key: "usage.apply_organization_policy", + retry_limit: 10, + retry_delay: 10 + + def handle_message(message) do + with {:ok, decoded} <- decode_message(message), + {:ok, org_id} <- validate_org_id(decoded.org_id), + {:ok, cutoff_date} <- parse_cutoff_date(decoded.cutoff_date), + {:ok, days} <- validate_policy_days() do + {marked, unmarked} = Job.mark_jobs_for_deletion(org_id, cutoff_date, days) + + Watchman.submit({"retention.marked", [org_id]}, marked, :count) + Watchman.submit({"retention.unmarked", [org_id]}, unmarked, :count) + + Logger.info( + "Marked #{marked} jobs for deletion, unmarked #{unmarked} jobs for org #{org_id}." + ) + else + {:error, reason} -> + Logger.error("Failed to process policy message: #{reason}") + raise ArgumentError, reason + end + end + + defp decode_message(message) do + {:ok, InternalApi.Usage.OrganizationPolicyApply.decode(message)} + rescue + error -> + {:error, "Failed to decode message: #{inspect(error)}"} + end + + defp validate_org_id(org_id) when is_binary(org_id) and byte_size(org_id) > 0 do + case Ecto.UUID.cast(org_id) do + {:ok, valid_uuid} -> {:ok, valid_uuid} + :error -> {:error, "Invalid org_id format: expected UUID, got #{inspect(org_id)}"} + end + end + + defp validate_org_id(nil), do: {:error, "org_id is missing in policy payload"} + defp validate_org_id(""), do: {:error, "org_id is missing in policy payload"} + defp validate_org_id(invalid), do: {:error, "Invalid org_id: #{inspect(invalid)}"} + + defp parse_cutoff_date(nil), do: {:error, "cutoff_date is missing in policy payload"} + + defp parse_cutoff_date(%Timestamp{seconds: seconds, nanos: nanos}) + when is_integer(seconds) and is_integer(nanos) and seconds >= 0 and nanos >= 0 do + total_nanoseconds = seconds * 1_000_000_000 + nanos + + cutoff_date = + total_nanoseconds + |> DateTime.from_unix!(:nanosecond) + |> DateTime.truncate(:second) + + {:ok, cutoff_date} + rescue + error -> + {:error, "Failed to parse cutoff_date: #{inspect(error)}"} + end + + defp parse_cutoff_date(invalid), + do: {:error, "Invalid cutoff_date format: #{inspect(invalid)}"} + + defp validate_policy_days do + case Application.fetch_env(:zebra, __MODULE__) do + {:ok, config} -> + case Keyword.fetch(config, :days) do + {:ok, days} when is_integer(days) and days >= @min_grace_period_days -> + {:ok, days} + + {:ok, days} when is_integer(days) and days > 0 -> + Logger.warning( + "Configured grace period #{days} days is below minimum #{@min_grace_period_days} days, using minimum" + ) + + {:ok, @min_grace_period_days} + + {:ok, invalid} -> + Logger.warning( + "Invalid grace period configuration: expected positive integer >= #{@min_grace_period_days}, got #{inspect(invalid)}, using default #{@default_grace_period_days} days" + ) + + {:ok, @default_grace_period_days} + + :error -> + Logger.info( + "Grace period configuration missing, using default #{@default_grace_period_days} days" + ) + + {:ok, @default_grace_period_days} + end + + :error -> + Logger.info( + "Worker configuration missing, using default grace period #{@default_grace_period_days} days" + ) + + {:ok, @default_grace_period_days} + end + end +end diff --git a/zebra/lib/zebra/workers/job_deletion_policy_worker.ex b/zebra/lib/zebra/workers/job_deletion_policy_worker.ex new file mode 100644 index 000000000..6195f2e26 --- /dev/null +++ b/zebra/lib/zebra/workers/job_deletion_policy_worker.ex @@ -0,0 +1,141 @@ +defmodule Zebra.Workers.JobDeletionPolicyWorker do + require Logger + + alias Zebra.Models.Job + + defstruct [ + # period of sleep between worker ticks when jobs are deleted + :naptime, + # longer period of sleep when there is nothing to delete + :longnaptime, + # limit for deletions per batch + :limit + ] + + def start_link do + with {:ok, worker_config} <- fetch_config(), + {:ok, worker} <- validate_and_build_worker(worker_config) do + pid = spawn_link(fn -> loop(worker) end) + Logger.info("JobDeletionPolicyWorker started with config: #{inspect(worker)}") + {:ok, pid} + else + {:error, reason} -> + Logger.error("Failed to start JobDeletionPolicyWorker: #{reason}") + {:error, reason} + end + end + + def loop(worker) do + deleted_any? = tick(worker) + + sleep_for = + if deleted_any? do + worker.naptime + else + worker.longnaptime || worker.naptime + end + + Logger.debug("Sleeping for #{sleep_for}ms...") + :timer.sleep(sleep_for) + + loop(worker) + end + + def tick(worker) do + Logger.info("Starting cleanup tick (limit: #{worker.limit})...") + + case delete_expired_data(worker.limit) do + {:ok, 0, 0} -> + Logger.info("No expired jobs found for deletion.") + false + + {:ok, deleted_stop_requests, deleted_jobs} -> + total_deleted = deleted_stop_requests + deleted_jobs + + Watchman.submit({"retention.deleted"}, deleted_jobs, :count) + + Logger.info( + "Cleanup complete: deleted #{deleted_stop_requests} job stop requests and #{deleted_jobs} jobs (total: #{total_deleted})." + ) + + true + + {:error, reason} -> + Logger.error("Cleanup tick failed: #{reason}") + false + end + end + + defp delete_expired_data(limit) do + with {:ok, deleted_stop_requests} <- Job.delete_old_job_stop_requests(limit), + {:ok, deleted_jobs} <- Job.delete_old_jobs(limit) do + {:ok, deleted_stop_requests, deleted_jobs} + else + {:error, reason} -> {:error, reason} + error -> {:error, "Unexpected error: #{inspect(error)}"} + end + end + + defp fetch_config do + case Application.fetch_env(:zebra, __MODULE__) do + {:ok, config} -> {:ok, config} + :error -> {:error, "Worker configuration is missing"} + end + end + + defp validate_and_build_worker(config) do + with {:ok, naptime} <- validate_naptime(config), + {:ok, longnaptime} <- validate_longnaptime(config), + {:ok, limit} <- validate_limit(config) do + worker = %__MODULE__{ + naptime: naptime, + longnaptime: longnaptime, + limit: limit + } + + {:ok, worker} + end + end + + defp validate_naptime(config) do + case Keyword.fetch(config, :naptime) do + {:ok, naptime} when is_integer(naptime) and naptime > 0 -> + {:ok, naptime} + + {:ok, invalid} -> + {:error, "Invalid naptime: expected positive integer, got #{inspect(invalid)}"} + + :error -> + {:error, "naptime configuration is missing"} + end + end + + defp validate_longnaptime(config) do + case Keyword.fetch(config, :longnaptime) do + {:ok, longnaptime} when is_integer(longnaptime) and longnaptime > 0 -> + {:ok, longnaptime} + + {:ok, nil} -> + {:ok, nil} + + {:ok, invalid} -> + {:error, "Invalid longnaptime: expected positive integer or nil, got #{inspect(invalid)}"} + + :error -> + {:ok, nil} + end + end + + defp validate_limit(config) do + case Keyword.fetch(config, :limit) do + {:ok, limit} when is_integer(limit) and limit > 0 -> + {:ok, limit} + + {:ok, invalid} -> + {:error, "Invalid limit: expected positive integer, got #{inspect(invalid)}"} + + :error -> + {:error, "limit configuration is missing"} + end + end +end diff --git a/zebra/priv/legacy_repo/migrations/20251118091829_add_expires_at_to_jobs.exs b/zebra/priv/legacy_repo/migrations/20251118091829_add_expires_at_to_jobs.exs new file mode 100644 index 000000000..47bfa1c9e --- /dev/null +++ b/zebra/priv/legacy_repo/migrations/20251118091829_add_expires_at_to_jobs.exs @@ -0,0 +1,9 @@ +defmodule Zebra.LegacyRepo.Migrations.AddExpiresAtToJobs do + use Ecto.Migration + + def change do + alter table(:jobs) do + add :expires_at, :utc_datetime, if_not_exists: true + end + end +end diff --git a/zebra/priv/legacy_repo/migrations/20251119103829_add_expires_created_index_at_jobs_table.exs b/zebra/priv/legacy_repo/migrations/20251119103829_add_expires_created_index_at_jobs_table.exs new file mode 100644 index 000000000..0bde23fd0 --- /dev/null +++ b/zebra/priv/legacy_repo/migrations/20251119103829_add_expires_created_index_at_jobs_table.exs @@ -0,0 +1,14 @@ +defmodule Zebra.LegacyRepo.Migrations.AddExpiresCreatedIndexAtJobsTable do + use Ecto.Migration + @disable_migration_lock true + @disable_ddl_transaction true + + def change do + create index(:jobs, [:expires_at, :created_at], + name: "index_jobs_on_expires_created_not_null", + concurrently: true, + where: "expires_at IS NOT NULL", + if_not_exists: true + ) + end +end diff --git a/zebra/priv/legacy_repo/migrations/20251119104017_add_organization_created_index_at_jobs_table.exs b/zebra/priv/legacy_repo/migrations/20251119104017_add_organization_created_index_at_jobs_table.exs new file mode 100644 index 000000000..cce273556 --- /dev/null +++ b/zebra/priv/legacy_repo/migrations/20251119104017_add_organization_created_index_at_jobs_table.exs @@ -0,0 +1,14 @@ +defmodule Zebra.LegacyRepo.Migrations.AddOrganizationCreatedIndexAtJobsTable do + use Ecto.Migration + @disable_migration_lock true + @disable_ddl_transaction true + + def change do + create index(:jobs, [:organization_id, :created_at], + name: "index_jobs_on_organization_created_expires_is_null", + concurrently: true, + where: "expires_at IS NULL", + if_not_exists: true + ) + end +end diff --git a/zebra/scripts/internal_protos.sh b/zebra/scripts/internal_protos.sh index 741affd71..afcbc1ee6 100755 --- a/zebra/scripts/internal_protos.sh +++ b/zebra/scripts/internal_protos.sh @@ -18,7 +18,8 @@ secrethub self_hosted server_farm.job server_farm.mq.job_state_exchange -task' +task +usage' for element in $list;do echo "$element" diff --git a/zebra/test/zebra/workers/job_deletion_policy_marker_test.exs b/zebra/test/zebra/workers/job_deletion_policy_marker_test.exs new file mode 100644 index 000000000..98e5c94fa --- /dev/null +++ b/zebra/test/zebra/workers/job_deletion_policy_marker_test.exs @@ -0,0 +1,287 @@ +defmodule Zebra.Workers.JobDeletionPolicyMarkerTest do + use Zebra.DataCase + + alias Google.Protobuf.Timestamp + alias InternalApi.Usage.OrganizationPolicyApply + alias Zebra.Models.Job + alias Zebra.Workers.JobDeletionPolicyMarker, as: Worker + + describe ".handle_message" do + setup do + original_config = Application.get_env(:zebra, Worker) + + on_exit(fn -> + Application.put_env(:zebra, Worker, original_config) + end) + + {:ok, original_config: original_config || []} + end + + test "marks eligible jobs for deletion", %{original_config: original_config} do + days = 3 + Application.put_env(:zebra, Worker, Keyword.put(original_config, :days, days)) + + org_id = Ecto.UUID.generate() + + cutoff_date = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + |> DateTime.truncate(:second) + + older_created_at = DateTime.add(cutoff_date, -3600, :second) + newer_created_at = DateTime.add(cutoff_date, 3600, :second) + + {:ok, job_to_mark} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: older_created_at, + updated_at: older_created_at + }) + + {:ok, newer_job} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: newer_created_at, + updated_at: newer_created_at + }) + + {:ok, other_org_job} = + Support.Factories.Job.create(:finished, %{ + organization_id: Ecto.UUID.generate(), + created_at: older_created_at, + updated_at: older_created_at + }) + + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: org_id, cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + Worker.handle_message(message) + + {:ok, updated_job} = Job.find(job_to_mark.id) + + assert updated_job.expires_at + assert DateTime.diff(updated_job.expires_at, DateTime.utc_now()) > 0 + + assert {:ok, newer_job} = Job.find(newer_job.id) + assert is_nil(newer_job.expires_at) + + assert {:ok, other_org_job} = Job.find(other_org_job.id) + assert is_nil(other_org_job.expires_at) + end + + test "raises when cutoff date is missing", %{original_config: original_config} do + Application.put_env(:zebra, Worker, original_config) + + message = + %OrganizationPolicyApply{org_id: Ecto.UUID.generate(), cutoff_date: nil} + |> OrganizationPolicyApply.encode() + + assert_raise ArgumentError, "cutoff_date is missing in policy payload", fn -> + Worker.handle_message(message) + end + end + + test "raises when org_id is empty string (proto3 default)", %{ + original_config: original_config + } do + Application.put_env(:zebra, Worker, Keyword.put(original_config, :days, 3)) + + cutoff_date = DateTime.utc_now() |> DateTime.truncate(:second) + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: "", cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + assert_raise ArgumentError, "org_id is missing in policy payload", fn -> + Worker.handle_message(message) + end + end + + test "raises when org_id is not a valid UUID", %{original_config: original_config} do + Application.put_env(:zebra, Worker, Keyword.put(original_config, :days, 3)) + + cutoff_date = DateTime.utc_now() |> DateTime.truncate(:second) + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: "invalid-uuid", cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + assert_raise ArgumentError, ~r/Invalid org_id format/, fn -> + Worker.handle_message(message) + end + end + + test "uses default grace period (14 days) when configuration is missing" do + Application.delete_env(:zebra, Worker) + + org_id = Ecto.UUID.generate() + + cutoff_date = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + |> DateTime.truncate(:second) + + older_created_at = DateTime.add(cutoff_date, -3600, :second) + + {:ok, job_to_mark} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: older_created_at, + updated_at: older_created_at + }) + + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: org_id, cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + # Should not raise, should use default grace period (14 days) + Worker.handle_message(message) + + {:ok, updated_job} = Job.find(job_to_mark.id) + assert updated_job.expires_at + end + + test "uses default grace period (14 days) when days value is missing", %{ + original_config: original_config + } do + Application.put_env(:zebra, Worker, Keyword.delete(original_config, :days)) + + org_id = Ecto.UUID.generate() + + cutoff_date = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + |> DateTime.truncate(:second) + + older_created_at = DateTime.add(cutoff_date, -3600, :second) + + {:ok, job_to_mark} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: older_created_at, + updated_at: older_created_at + }) + + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: org_id, cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + # Should not raise, should use default grace period (14 days) + Worker.handle_message(message) + + {:ok, updated_job} = Job.find(job_to_mark.id) + assert updated_job.expires_at + end + + test "uses minimum grace period (7 days) when configured below minimum", %{ + original_config: original_config + } do + Application.put_env(:zebra, Worker, Keyword.put(original_config, :days, 5)) + + org_id = Ecto.UUID.generate() + + cutoff_date = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + |> DateTime.truncate(:second) + + older_created_at = DateTime.add(cutoff_date, -3600, :second) + + {:ok, job_to_mark} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: older_created_at, + updated_at: older_created_at + }) + + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: org_id, cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + # Should not raise, should use minimum grace period (7 days) + Worker.handle_message(message) + + {:ok, updated_job} = Job.find(job_to_mark.id) + assert updated_job.expires_at + end + + test "uses default grace period (14 days) when days is invalid", %{ + original_config: original_config + } do + Application.put_env(:zebra, Worker, Keyword.put(original_config, :days, "invalid")) + + org_id = Ecto.UUID.generate() + + cutoff_date = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + |> DateTime.truncate(:second) + + older_created_at = DateTime.add(cutoff_date, -3600, :second) + + {:ok, job_to_mark} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: older_created_at, + updated_at: older_created_at + }) + + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: org_id, cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + # Should not raise, should use default grace period (14 days) + Worker.handle_message(message) + + {:ok, updated_job} = Job.find(job_to_mark.id) + assert updated_job.expires_at + end + + test "uses minimum grace period (7 days) when days is zero", %{ + original_config: original_config + } do + Application.put_env(:zebra, Worker, Keyword.put(original_config, :days, 0)) + + org_id = Ecto.UUID.generate() + + cutoff_date = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + |> DateTime.truncate(:second) + + older_created_at = DateTime.add(cutoff_date, -3600, :second) + + {:ok, job_to_mark} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: older_created_at, + updated_at: older_created_at + }) + + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: org_id, cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + # Should not raise, should use default grace period (14 days) since 0 is invalid + Worker.handle_message(message) + + {:ok, updated_job} = Job.find(job_to_mark.id) + assert updated_job.expires_at + end + end +end diff --git a/zebra/test/zebra/workers/job_deletion_policy_worker_test.exs b/zebra/test/zebra/workers/job_deletion_policy_worker_test.exs new file mode 100644 index 000000000..82377033f --- /dev/null +++ b/zebra/test/zebra/workers/job_deletion_policy_worker_test.exs @@ -0,0 +1,201 @@ +defmodule Zebra.Workers.JobDeletionPolicyWorkerTest do + use Zebra.DataCase + + alias Zebra.Models.{Job, JobStopRequest} + alias Zebra.Workers.JobDeletionPolicyWorker, as: Worker + + describe ".start_link" do + setup do + original_config = Application.get_env(:zebra, Worker) + + on_exit(fn -> + if original_config do + Application.put_env(:zebra, Worker, original_config) + else + Application.delete_env(:zebra, Worker) + end + end) + + {:ok, original_config: original_config || []} + end + + test "starts successfully with valid configuration", %{original_config: _original_config} do + Application.put_env(:zebra, Worker, naptime: 1000, longnaptime: 5000, limit: 100) + + assert {:ok, pid} = Worker.start_link() + assert Process.alive?(pid) + + Process.exit(pid, :kill) + end + + test "starts successfully with nil longnaptime", %{original_config: _original_config} do + Application.put_env(:zebra, Worker, naptime: 1000, longnaptime: nil, limit: 100) + + assert {:ok, pid} = Worker.start_link() + assert Process.alive?(pid) + + Process.exit(pid, :kill) + end + + test "starts successfully without longnaptime key", %{original_config: _original_config} do + Application.put_env(:zebra, Worker, naptime: 1000, limit: 100) + + assert {:ok, pid} = Worker.start_link() + assert Process.alive?(pid) + + Process.exit(pid, :kill) + end + + test "returns error when configuration is missing" do + Application.delete_env(:zebra, Worker) + + assert {:error, "Worker configuration is missing"} = Worker.start_link() + end + + test "returns error when naptime is missing", %{original_config: _original_config} do + Application.put_env(:zebra, Worker, limit: 100) + + assert {:error, "naptime configuration is missing"} = Worker.start_link() + end + + test "returns error when naptime is not a positive integer", %{ + original_config: _original_config + } do + Application.put_env(:zebra, Worker, naptime: 0, limit: 100) + + assert {:error, error} = Worker.start_link() + assert error =~ "Invalid naptime" + end + + test "returns error when naptime is negative", %{original_config: _original_config} do + Application.put_env(:zebra, Worker, naptime: -100, limit: 100) + + assert {:error, error} = Worker.start_link() + assert error =~ "Invalid naptime" + end + + test "returns error when limit is missing", %{original_config: _original_config} do + Application.put_env(:zebra, Worker, naptime: 1000) + + assert {:error, "limit configuration is missing"} = Worker.start_link() + end + + test "returns error when limit is not a positive integer", %{ + original_config: _original_config + } do + Application.put_env(:zebra, Worker, naptime: 1000, limit: 0) + + assert {:error, error} = Worker.start_link() + assert error =~ "Invalid limit" + end + + test "returns error when limit is negative", %{original_config: _original_config} do + Application.put_env(:zebra, Worker, naptime: 1000, limit: -10) + + assert {:error, error} = Worker.start_link() + assert error =~ "Invalid limit" + end + + test "returns error when longnaptime is invalid (not integer or nil)", %{ + original_config: _original_config + } do + Application.put_env(:zebra, Worker, naptime: 1000, longnaptime: "invalid", limit: 100) + + assert {:error, error} = Worker.start_link() + assert error =~ "Invalid longnaptime" + end + + test "returns error when longnaptime is zero", %{original_config: _original_config} do + Application.put_env(:zebra, Worker, naptime: 1000, longnaptime: 0, limit: 100) + + assert {:error, error} = Worker.start_link() + assert error =~ "Invalid longnaptime" + end + end + + describe ".tick" do + test "deletes expired jobs and related stop requests" do + worker = %Worker{limit: 10, naptime: 1000, longnaptime: 5000} + + {:ok, job} = Support.Factories.Job.create(:finished) + {:ok, _} = JobStopRequest.create(job.build_id, job.id) + + expired_at = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + + {:ok, _} = Job.update(job, %{expires_at: expired_at}) + + assert Worker.tick(worker) + + assert {:error, :not_found} = Job.find(job.id) + assert {:error, :not_found} = JobStopRequest.find_by_job_id(job.id) + end + + test "returns false when nothing is eligible for deletion" do + worker = %Worker{limit: 10, naptime: 1000, longnaptime: 5000} + + {:ok, job} = Support.Factories.Job.create(:finished) + + future_expiration = + DateTime.utc_now() + |> DateTime.add(3600, :second) + + {:ok, _} = Job.update(job, %{expires_at: future_expiration}) + + refute Worker.tick(worker) + + assert {:ok, _} = Job.find(job.id) + end + + test "respects the batch limit" do + worker = %Worker{limit: 2, naptime: 1000, longnaptime: 5000} + + expired_at = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + + {:ok, job1} = Support.Factories.Job.create(:finished) + {:ok, _} = Job.update(job1, %{expires_at: expired_at}) + + {:ok, job2} = Support.Factories.Job.create(:finished) + {:ok, _} = Job.update(job2, %{expires_at: expired_at}) + + {:ok, job3} = Support.Factories.Job.create(:finished) + {:ok, _} = Job.update(job3, %{expires_at: expired_at}) + + assert Worker.tick(worker) + + deleted_count = + [job1, job2, job3] + |> Enum.count(fn job -> + match?({:error, :not_found}, Job.find(job.id)) + end) + + assert deleted_count == 2 + end + + test "returns true when only stop requests are deleted" do + worker = %Worker{limit: 10, naptime: 1000, longnaptime: 5000} + + {:ok, job} = Support.Factories.Job.create(:finished) + {:ok, _} = JobStopRequest.create(job.build_id, job.id) + + expired_at = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + + {:ok, _} = Job.update(job, %{expires_at: expired_at}) + + assert Worker.tick(worker) + end + + test "returns false when no jobs have expired" do + worker = %Worker{limit: 10, naptime: 1000, longnaptime: 5000} + + {:ok, _job} = Support.Factories.Job.create(:finished) + + refute Worker.tick(worker) + end + end +end diff --git a/zebra/test/zebra/workers_test.exs b/zebra/test/zebra/workers_test.exs index e728ab6db..420c5daaf 100644 --- a/zebra/test/zebra/workers_test.exs +++ b/zebra/test/zebra/workers_test.exs @@ -1,8 +1,10 @@ defmodule Zebra.Workers.Test do use ExUnit.Case, async: false - test "no environment variables set => only feature provider invalidator starts" do - assert Zebra.Workers.active() == [Zebra.FeatureProviderInvalidatorWorker] + test "no environment variables set => only default workers start" do + assert Zebra.Workers.active() == [ + Zebra.FeatureProviderInvalidatorWorker + ] end describe "with environment variables set" do @@ -10,16 +12,22 @@ defmodule Zebra.Workers.Test do System.put_env("START_JOB_STOPPER", "true") System.put_env("START_TASK_FAIL_FAST_WORKER", "true") System.put_env("START_TASK_FINISHER_WORKER", "true") + System.put_env("START_JOB_DELETION_POLICY_MARKER", "true") + System.put_env("START_JOB_DELETION_POLICY_WORKER", "true") on_exit(fn -> System.put_env("START_JOB_STOPPER", "false") System.put_env("START_TASK_FAIL_FAST_WORKER", "false") System.put_env("START_TASK_FINISHER_WORKER", "false") + System.put_env("START_JOB_DELETION_POLICY_MARKER", "false") + System.put_env("START_JOB_DELETION_POLICY_WORKER", "false") end) end test "active workers are returned" do assert Zebra.Workers.active() == [ + Zebra.Workers.JobDeletionPolicyMarker, + Zebra.Workers.JobDeletionPolicyWorker, Zebra.Workers.TaskFinisher, Zebra.Workers.TaskFailFast, Zebra.Workers.JobStopper,