Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
## [Unreleased]

## [0.6.0] - 2026-04-16

### Features

- Nueva opción `purge_where_clause` en `Engine#initialize`. Permite especificar una condición SQL independiente para el DELETE, distinta de `where_clause` (que aplica a export/verify). Caso de uso: archivar subset (`isp_id IS NOT NULL`) pero purgar superset (todo el rango). Valores: `nil` = no purge, `""` = purge todo el rango, `"x"` = rango AND x. Backwards compatible vía `fetch(:purge_where_clause, @where_clause)`. Fixes #3.

### Refactor

- Extraído helper `date_range_sql` en Engine para eliminar duplicación entre `base_where_sql` y `purge_where_sql`.

### YARD

- Documentación actualizada en `Engine#initialize` para los tres casos de `purge_where_clause`.
- `Engine#build_delete_sql` ahora documenta retorno `String|nil`.

### Telemetry

- Nuevo evento `engine.purge_skipped` cuando no hay cláusula de purge (`delete_sql.nil?`).

### Tests

- 5 nuevos tests para `purge_where_clause`: backwards compatible, empty string purge all, integrity usa base_where_sql, independiente de where_clause, y use case primario (archive subset / purge superset).

## [0.5.2] - 2026-04-16

### Correcciones
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
data_drain (0.5.1)
data_drain (0.6.0)
activemodel (>= 6.0)
aws-sdk-glue (~> 1.0)
aws-sdk-s3 (~> 1.114)
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ DataDrain::Engine.new(
).call
```

### Purge subset vs archive superset

Caso común: archivar filas válidas (`isp_id IS NOT NULL`) pero borrar superset (válidas + trash).

```ruby
# Archiva solo isp_id NOT NULL, verifica integridad solo sobre esos,
# pero purga TODO el mes (NULL + NOT NULL) con batching/throttling/vacuum
DataDrain::Engine.new(
bucket: 'my-bucket-store',
start_date: 6.months.ago.beginning_of_month,
end_date: 6.months.ago.end_of_month,
table_name: 'versions',
partition_keys: %w[year month],
where_clause: 'isp_id IS NOT NULL', # filtra qué se archiva
purge_where_clause: '' # purge TODO el mes (vacío = sin filtro adicional)
).call
```

**Resultado:** Export/verify cuentan y comparan solo `isp_id NOT NULL`. Purge borra el mes completo con batching, throttling y vacuum del `purge_loop`.

### Orquestación con AWS Glue (tablas 1TB+)

```ruby
Expand Down
44 changes: 40 additions & 4 deletions lib/data_drain/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ class Engine
# @option options [String] :select_sql (Opcional) Sentencia SELECT personalizada.
# @option options [Array<String, Symbol>] :partition_keys Columnas para particionar.
# @option options [String] :primary_key (Opcional) Clave primaria para borrado. Por defecto 'id'.
# @option options [String] :where_clause (Opcional) Condición SQL extra.
# @option options [String] :where_clause (Opcional) Condición SQL extra
# que filtra export, count e integrity check. Define "qué se archiva".
# @option options [String] :purge_where_clause (Opcional) Condición SQL
# para el DELETE. Si se omite, usa :where_clause (backwards compatible).
# Pasar nil explícito para desactivar purga. Pasar '' (vacío) para purgar
# todo el rango de fechas sin filtro adicional (útil para archivar subset
# y borrar superset).
# Puede ser más amplia que :where_clause; filas que matchean
# :purge_where_clause pero no :where_clause se borran sin archivar ni
# verificar. Útil para limpieza de orphans/trash que no debe respaldarse.
# @option options [Boolean] :skip_export (Opcional) Si true, no exporta
# a Parquet — solo valida y purga (para uso con GlueRunner).
def initialize(options)
Expand All @@ -38,6 +47,7 @@ def initialize(options)
@primary_key = options.fetch(:primary_key, "id")
Validations.validate_identifier!(:primary_key, @primary_key)
@where_clause = options[:where_clause]
@purge_where_clause = options.fetch(:purge_where_clause, @where_clause)
@bucket = options[:bucket]
@skip_export = options.fetch(:skip_export, false)

Expand Down Expand Up @@ -140,11 +150,27 @@ def integrity_failed(start_time)
# @api private
# @return [String]
def base_where_sql
sql = "created_at >= '#{@start_date.to_fs(:db)}' AND created_at < '#{@end_date.to_fs(:db)}'"
sql = date_range_sql
sql += " AND #{@where_clause}" if @where_clause && !@where_clause.empty?
sql
end

# @api private
# @return [String]
def purge_where_sql
return nil if @purge_where_clause.nil?

sql = date_range_sql
sql += " AND #{@purge_where_clause}" unless @purge_where_clause.empty?
sql
end

# @api private
# @return [String]
def date_range_sql
"created_at >= '#{@start_date.to_fs(:db)}' AND created_at < '#{@end_date.to_fs(:db)}'"
end

# @api private
def setup_duckdb
@duckdb.query("INSTALL postgres; LOAD postgres;")
Expand Down Expand Up @@ -289,13 +315,19 @@ def fetch_dead_tuple_count(conn)
# @param conn [PG::Connection]
# @return [Integer] total de filas borradas
def purge_loop(conn)
delete_sql = build_delete_sql
if delete_sql.nil?
safe_log(:info, "engine.purge_skipped", { table: @table_name, reason: "no_purge_clause" })
return 0
end

batches_processed = 0
total_deleted = 0
slow_batch_streak = 0

loop do
batch_start = monotonic
result = conn.exec(build_delete_sql)
result = conn.exec(delete_sql)
batch_duration = monotonic - batch_start
count = result.cmd_tuples
break if count.zero?
Expand Down Expand Up @@ -349,12 +381,16 @@ def emit_heartbeat_if_due(batches_processed, total_deleted)
end

# @api private
# @return [String, nil] SQL DELETE statement or nil if no purge clause
def build_delete_sql
where = purge_where_sql
return nil if where.nil?

<<~SQL
DELETE FROM #{@table_name}
WHERE #{@primary_key} IN (
SELECT #{@primary_key} FROM #{@table_name}
WHERE #{base_where_sql}
WHERE #{where}
LIMIT #{@config.batch_size}
)
SQL
Expand Down
2 changes: 1 addition & 1 deletion lib/data_drain/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

module DataDrain
# @return [String] versión semver de la gema
VERSION = "0.5.2"
VERSION = "0.6.0"
end
15 changes: 13 additions & 2 deletions skill/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Skill de conocimiento completo sobre DataDrain. Consultame para cualquier pregun
- **Hive Partitioning** — Estructura de carpetas `key1=val1/key2=val2/...` que DuckDB genera y consume nativamente para prefix scans eficientes.
- **Semi-abierto** — Convención de rangos `[start, end)` con `<` (no `<=`) para evitar pérdida de microsegundos en límites de fecha.
- **skip_export** — Modo del Engine donde delega export a herramienta externa (Glue/EMR) y solo verifica + purga.
- **purge_where_clause** — Condición SQL independiente para el DELETE. Permite archivar subset y purgar superset. nil = skip, "" = purge todo el rango, "x" = rango AND x.
- **ensure_job** — Wrapper idempotente de GlueRunner que crea o actualiza un job según config deseada. Incluye diffing de configuración para evitar API calls innecesarios.
- **changed_fields** — Helper privado de ensure_job que compara config deseada vs actual de un Glue Job y retorna qué campos difieren.
- **Heartbeat** — Log de progreso emitido cada 100 lotes en purgas masivas (tablas 1TB).
Expand Down Expand Up @@ -70,7 +71,7 @@ DataDrain resuelve el ciclo de vida de datos históricos en bases relacionales c

- Ruby `>= 3.2.0`
- Runtime: `activemodel >= 6.0`, `duckdb ~> 1.4`, `pg >= 1.2`, `aws-sdk-s3 ~> 1.114`, `aws-sdk-glue ~> 1.0`
- Versión actual: `0.5.1`
- Versión actual: `0.6.0`

## API Pública (resumen)

Expand Down Expand Up @@ -98,12 +99,22 @@ DataDrain::Engine.new(
bucket:, start_date:, end_date:, table_name:,
partition_keys: %w[isp_id year month],
primary_key: "id", # opcional
where_clause: nil, # opcional, SQL extra
where_clause: nil, # opcional, SQL extra para export/verify
purge_where_clause: nil, # opcional, SQL para DELETE (nil=skip, ""=full range, "x"=range+x)
skip_export: false, # true delega export a Glue
folder_name: nil, # default = table_name
select_sql: "*" # default
).call # => true (ok) | false (integrity fail)

# Purge subset vs archive superset (v0.6.0+)
DataDrain::Engine.new(
bucket:, start_date:, end_date:, table_name:,
partition_keys: %w[year month],
where_clause: "isp_id IS NOT NULL", # filtra qué se archiva
purge_where_clause: "" # purge TODO el rango (vacío = sin filtro adicional)
).call
# Resultado: export/verify sobre isp_id NOT NULL, purge sobre todo el rango

# 2. Ingesta de archivos crudos
DataDrain::FileIngestor.new(
bucket:, source_path:, folder_name:,
Expand Down
28 changes: 24 additions & 4 deletions spec/data_drain/engine_refactor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@

describe "#purge_loop (refactor)" do
it "retorna total_deleted (suma de cmd_tuples por lote)" do
engine = described_class.new(base_options.merge(table_name: "versions"))
engine = described_class.new(
base_options.merge(
table_name: "versions",
purge_where_clause: "created_at >= '2026-03-01'"
)
)

values = [100, 50, 0]
call_count = 0
Expand All @@ -42,7 +47,12 @@
end

it "retorna 0 cuando no hay filas para borrar" do
engine = described_class.new(base_options.merge(table_name: "versions"))
engine = described_class.new(
base_options.merge(
table_name: "versions",
purge_where_clause: "created_at >= '2026-03-01'"
)
)

allow(mock_pg_result).to receive(:cmd_tuples).and_return(0)
allow(mock_pg_conn).to receive(:exec).with(/DELETE FROM versions/).and_return(mock_pg_result)
Expand All @@ -54,7 +64,12 @@

describe "#durations hash" do
it "acumula timings en @durations" do
engine = described_class.new(base_options.merge(table_name: "versions"))
engine = described_class.new(
base_options.merge(
table_name: "versions",
purge_where_clause: "created_at >= '2026-03-01'"
)
)

allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/)
allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[0]])
Expand All @@ -69,7 +84,12 @@

describe "#timed helper" do
it "guarda la duración del bloque en @durations" do
engine = described_class.new(base_options.merge(table_name: "versions"))
engine = described_class.new(
base_options.merge(
table_name: "versions",
purge_where_clause: "created_at >= '2026-03-01'"
)
)
engine.instance_variable_set(:@durations, {})

result = engine.send(:timed, :test_step) { 42 }
Expand Down
Loading
Loading