diff --git a/CHANGELOG.md b/CHANGELOG.md index dec931a..62fb073 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ ## [Unreleased] +## [0.6.0] - 2026-04-16 + +### Features + +- Nueva opción `purge_where_clause` en `Engine#initialize`. Permite especificar una condición SQL independiente para el DELETE, distinta de `where_clause` (que aplica a export/verify). Caso de uso: archivar subset (`isp_id IS NOT NULL`) pero purgar superset (todo el rango). Valores: `nil` = no purge, `""` = purge todo el rango, `"x"` = rango AND x. Backwards compatible vía `fetch(:purge_where_clause, @where_clause)`. Fixes #3. + +### Refactor + +- Extraído helper `date_range_sql` en Engine para eliminar duplicación entre `base_where_sql` y `purge_where_sql`. + +### YARD + +- Documentación actualizada en `Engine#initialize` para los tres casos de `purge_where_clause`. +- `Engine#build_delete_sql` ahora documenta retorno `String|nil`. + +### Telemetry + +- Nuevo evento `engine.purge_skipped` cuando no hay cláusula de purge (`delete_sql.nil?`). + +### Tests + +- 5 nuevos tests para `purge_where_clause`: backwards compatible, empty string purge all, integrity usa base_where_sql, independiente de where_clause, y use case primario (archive subset / purge superset). + ## [0.5.2] - 2026-04-16 ### Correcciones diff --git a/Gemfile.lock b/Gemfile.lock index 0128d06..f159589 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - data_drain (0.5.1) + data_drain (0.6.0) activemodel (>= 6.0) aws-sdk-glue (~> 1.0) aws-sdk-s3 (~> 1.114) diff --git a/README.md b/README.md index c4d212c..b18e3eb 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,26 @@ DataDrain::Engine.new( ).call ``` +### Purge subset vs archive superset + +Caso común: archivar filas válidas (`isp_id IS NOT NULL`) pero borrar superset (válidas + trash). + +```ruby +# Archiva solo isp_id NOT NULL, verifica integridad solo sobre esos, +# pero purga TODO el mes (NULL + NOT NULL) con batching/throttling/vacuum +DataDrain::Engine.new( + bucket: 'my-bucket-store', + start_date: 6.months.ago.beginning_of_month, + end_date: 6.months.ago.end_of_month, + table_name: 'versions', + partition_keys: %w[year month], + where_clause: 'isp_id IS NOT NULL', # filtra qué se archiva + purge_where_clause: '' # purge TODO el mes (vacío = sin filtro adicional) +).call +``` + +**Resultado:** Export/verify cuentan y comparan solo `isp_id NOT NULL`. Purge borra el mes completo con batching, throttling y vacuum del `purge_loop`. + ### Orquestación con AWS Glue (tablas 1TB+) ```ruby diff --git a/lib/data_drain/engine.rb b/lib/data_drain/engine.rb index 46eaacd..e8ab512 100644 --- a/lib/data_drain/engine.rb +++ b/lib/data_drain/engine.rb @@ -21,7 +21,16 @@ class Engine # @option options [String] :select_sql (Opcional) Sentencia SELECT personalizada. # @option options [Array] :partition_keys Columnas para particionar. # @option options [String] :primary_key (Opcional) Clave primaria para borrado. Por defecto 'id'. - # @option options [String] :where_clause (Opcional) Condición SQL extra. + # @option options [String] :where_clause (Opcional) Condición SQL extra + # que filtra export, count e integrity check. Define "qué se archiva". + # @option options [String] :purge_where_clause (Opcional) Condición SQL + # para el DELETE. Si se omite, usa :where_clause (backwards compatible). + # Pasar nil explícito para desactivar purga. Pasar '' (vacío) para purgar + # todo el rango de fechas sin filtro adicional (útil para archivar subset + # y borrar superset). + # Puede ser más amplia que :where_clause; filas que matchean + # :purge_where_clause pero no :where_clause se borran sin archivar ni + # verificar. Útil para limpieza de orphans/trash que no debe respaldarse. # @option options [Boolean] :skip_export (Opcional) Si true, no exporta # a Parquet — solo valida y purga (para uso con GlueRunner). def initialize(options) @@ -38,6 +47,7 @@ def initialize(options) @primary_key = options.fetch(:primary_key, "id") Validations.validate_identifier!(:primary_key, @primary_key) @where_clause = options[:where_clause] + @purge_where_clause = options.fetch(:purge_where_clause, @where_clause) @bucket = options[:bucket] @skip_export = options.fetch(:skip_export, false) @@ -140,11 +150,27 @@ def integrity_failed(start_time) # @api private # @return [String] def base_where_sql - sql = "created_at >= '#{@start_date.to_fs(:db)}' AND created_at < '#{@end_date.to_fs(:db)}'" + sql = date_range_sql sql += " AND #{@where_clause}" if @where_clause && !@where_clause.empty? sql end + # @api private + # @return [String] + def purge_where_sql + return nil if @purge_where_clause.nil? + + sql = date_range_sql + sql += " AND #{@purge_where_clause}" unless @purge_where_clause.empty? + sql + end + + # @api private + # @return [String] + def date_range_sql + "created_at >= '#{@start_date.to_fs(:db)}' AND created_at < '#{@end_date.to_fs(:db)}'" + end + # @api private def setup_duckdb @duckdb.query("INSTALL postgres; LOAD postgres;") @@ -289,13 +315,19 @@ def fetch_dead_tuple_count(conn) # @param conn [PG::Connection] # @return [Integer] total de filas borradas def purge_loop(conn) + delete_sql = build_delete_sql + if delete_sql.nil? + safe_log(:info, "engine.purge_skipped", { table: @table_name, reason: "no_purge_clause" }) + return 0 + end + batches_processed = 0 total_deleted = 0 slow_batch_streak = 0 loop do batch_start = monotonic - result = conn.exec(build_delete_sql) + result = conn.exec(delete_sql) batch_duration = monotonic - batch_start count = result.cmd_tuples break if count.zero? @@ -349,12 +381,16 @@ def emit_heartbeat_if_due(batches_processed, total_deleted) end # @api private + # @return [String, nil] SQL DELETE statement or nil if no purge clause def build_delete_sql + where = purge_where_sql + return nil if where.nil? + <<~SQL DELETE FROM #{@table_name} WHERE #{@primary_key} IN ( SELECT #{@primary_key} FROM #{@table_name} - WHERE #{base_where_sql} + WHERE #{where} LIMIT #{@config.batch_size} ) SQL diff --git a/lib/data_drain/version.rb b/lib/data_drain/version.rb index 8c81ecd..3f1c404 100644 --- a/lib/data_drain/version.rb +++ b/lib/data_drain/version.rb @@ -2,5 +2,5 @@ module DataDrain # @return [String] versión semver de la gema - VERSION = "0.5.2" + VERSION = "0.6.0" end diff --git a/skill/SKILL.md b/skill/SKILL.md index 694288e..60f441f 100644 --- a/skill/SKILL.md +++ b/skill/SKILL.md @@ -14,6 +14,7 @@ Skill de conocimiento completo sobre DataDrain. Consultame para cualquier pregun - **Hive Partitioning** — Estructura de carpetas `key1=val1/key2=val2/...` que DuckDB genera y consume nativamente para prefix scans eficientes. - **Semi-abierto** — Convención de rangos `[start, end)` con `<` (no `<=`) para evitar pérdida de microsegundos en límites de fecha. - **skip_export** — Modo del Engine donde delega export a herramienta externa (Glue/EMR) y solo verifica + purga. +- **purge_where_clause** — Condición SQL independiente para el DELETE. Permite archivar subset y purgar superset. nil = skip, "" = purge todo el rango, "x" = rango AND x. - **ensure_job** — Wrapper idempotente de GlueRunner que crea o actualiza un job según config deseada. Incluye diffing de configuración para evitar API calls innecesarios. - **changed_fields** — Helper privado de ensure_job que compara config deseada vs actual de un Glue Job y retorna qué campos difieren. - **Heartbeat** — Log de progreso emitido cada 100 lotes en purgas masivas (tablas 1TB). @@ -70,7 +71,7 @@ DataDrain resuelve el ciclo de vida de datos históricos en bases relacionales c - Ruby `>= 3.2.0` - Runtime: `activemodel >= 6.0`, `duckdb ~> 1.4`, `pg >= 1.2`, `aws-sdk-s3 ~> 1.114`, `aws-sdk-glue ~> 1.0` -- Versión actual: `0.5.1` +- Versión actual: `0.6.0` ## API Pública (resumen) @@ -98,12 +99,22 @@ DataDrain::Engine.new( bucket:, start_date:, end_date:, table_name:, partition_keys: %w[isp_id year month], primary_key: "id", # opcional - where_clause: nil, # opcional, SQL extra + where_clause: nil, # opcional, SQL extra para export/verify + purge_where_clause: nil, # opcional, SQL para DELETE (nil=skip, ""=full range, "x"=range+x) skip_export: false, # true delega export a Glue folder_name: nil, # default = table_name select_sql: "*" # default ).call # => true (ok) | false (integrity fail) +# Purge subset vs archive superset (v0.6.0+) +DataDrain::Engine.new( + bucket:, start_date:, end_date:, table_name:, + partition_keys: %w[year month], + where_clause: "isp_id IS NOT NULL", # filtra qué se archiva + purge_where_clause: "" # purge TODO el rango (vacío = sin filtro adicional) +).call +# Resultado: export/verify sobre isp_id NOT NULL, purge sobre todo el rango + # 2. Ingesta de archivos crudos DataDrain::FileIngestor.new( bucket:, source_path:, folder_name:, diff --git a/spec/data_drain/engine_refactor_spec.rb b/spec/data_drain/engine_refactor_spec.rb index 49fee40..7d4c975 100644 --- a/spec/data_drain/engine_refactor_spec.rb +++ b/spec/data_drain/engine_refactor_spec.rb @@ -27,7 +27,12 @@ describe "#purge_loop (refactor)" do it "retorna total_deleted (suma de cmd_tuples por lote)" do - engine = described_class.new(base_options.merge(table_name: "versions")) + engine = described_class.new( + base_options.merge( + table_name: "versions", + purge_where_clause: "created_at >= '2026-03-01'" + ) + ) values = [100, 50, 0] call_count = 0 @@ -42,7 +47,12 @@ end it "retorna 0 cuando no hay filas para borrar" do - engine = described_class.new(base_options.merge(table_name: "versions")) + engine = described_class.new( + base_options.merge( + table_name: "versions", + purge_where_clause: "created_at >= '2026-03-01'" + ) + ) allow(mock_pg_result).to receive(:cmd_tuples).and_return(0) allow(mock_pg_conn).to receive(:exec).with(/DELETE FROM versions/).and_return(mock_pg_result) @@ -54,7 +64,12 @@ describe "#durations hash" do it "acumula timings en @durations" do - engine = described_class.new(base_options.merge(table_name: "versions")) + engine = described_class.new( + base_options.merge( + table_name: "versions", + purge_where_clause: "created_at >= '2026-03-01'" + ) + ) allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[0]]) @@ -69,7 +84,12 @@ describe "#timed helper" do it "guarda la duración del bloque en @durations" do - engine = described_class.new(base_options.merge(table_name: "versions")) + engine = described_class.new( + base_options.merge( + table_name: "versions", + purge_where_clause: "created_at >= '2026-03-01'" + ) + ) engine.instance_variable_set(:@durations, {}) result = engine.send(:timed, :test_step) { 42 } diff --git a/spec/data_drain/engine_spec.rb b/spec/data_drain/engine_spec.rb index 4080115..a75e3a6 100644 --- a/spec/data_drain/engine_spec.rb +++ b/spec/data_drain/engine_spec.rb @@ -74,7 +74,13 @@ end it "skip_export omite export_to_parquet pero ejecuta verify_integrity" do - engine = described_class.new(base_options.merge(table_name: "versions", skip_export: true)) + engine = described_class.new( + base_options.merge( + table_name: "versions", + skip_export: true, + purge_where_clause: "created_at >= '2026-03-01'" + ) + ) allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[100]]) allow(mock_duckdb).to receive(:query).with(/FROM read_parquet/).and_return([[100]]) @@ -86,7 +92,8 @@ end it "setea idle_in_transaction_session_timeout = 0" do - engine = described_class.new(base_options.merge(table_name: "versions")) + engine = described_class.new(base_options.merge(table_name: "versions", + purge_where_clause: "created_at >= '2026-03-01'")) allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[100]]) allow(mock_duckdb).to receive(:query).with(/COPY \(/) @@ -115,7 +122,8 @@ end it "loop de purge termina cuando cmd_tuples devuelve 0" do - engine = described_class.new(base_options.merge(table_name: "versions")) + engine = described_class.new(base_options.merge(table_name: "versions", + purge_where_clause: "created_at >= '2026-03-01'")) allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[100]]) allow(mock_duckdb).to receive(:query).with(/COPY \(/) @@ -136,6 +144,9 @@ end it "ejecuta el flujo ETL completo si la integridad es exitosa" do + engine = described_class.new(base_options.merge(table_name: "versions", + purge_where_clause: "created_at >= '2026-03-01'")) + # 1. Setup expect(mock_duckdb).to receive(:query).with(/INSTALL postgres/).ordered allow(mock_duckdb).to receive(:query).with(/SET max_memory/) @@ -196,7 +207,8 @@ it "ejecuta VACUUM ANALYZE cuando vacuum_after_purge es true y hay deletes" do DataDrain.configure { |c| c.vacuum_after_purge = true } - engine = described_class.new(base_options.merge(table_name: "versions")) + engine = described_class.new(base_options.merge(table_name: "versions", + purge_where_clause: "created_at >= '2026-03-01'")) allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[100]]) @@ -316,4 +328,114 @@ DataDrain.reset_configuration! end end + + describe "purge_where_clause" do + let(:base_options_with_purge) do + { + bucket: bucket, + start_date: Time.new(2026, 3, 1), + end_date: Time.new(2026, 3, 31), + partition_keys: %w[year month], + table_name: "versions", + where_clause: "isp_id IS NOT NULL" + } + end + + it "purges using where_clause when purge_where_clause not provided (backwards compatible)" do + engine = described_class.new(base_options_with_purge) + + allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) + allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[10]]) + allow(mock_duckdb).to receive(:query).with(/COPY \(/) + allow(mock_duckdb).to receive(:query).with(/FROM read_parquet/).and_return([[10]]) + allow(mock_pg_conn).to receive(:exec).with(/SET idle_in_transaction_session_timeout/) + + expect(mock_pg_conn).to receive(:exec).with(/DELETE FROM versions/).twice.and_return(mock_pg_result) + allow(mock_pg_result).to receive(:cmd_tuples).and_return(10, 0) + + expect(engine.call).to be true + end + + it "purges all when purge_where_clause is empty (no extra filter)" do + engine = described_class.new( + base_options_with_purge.merge(purge_where_clause: "") + ) + + allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) + allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[10]]) + allow(mock_duckdb).to receive(:query).with(/COPY \(/) + allow(mock_duckdb).to receive(:query).with(/FROM read_parquet/).and_return([[10]]) + + allow(mock_pg_conn).to receive(:exec).with(/SET idle_in_transaction_session_timeout/) + allow(mock_pg_result).to receive(:cmd_tuples).and_return(10, 0) + expect(mock_pg_conn).to receive(:exec).with(/DELETE FROM versions/).twice.and_return(mock_pg_result) + + expect(engine.call).to be true + end + + it "integrity check uses base_where_sql, not purge_where_clause" do + engine = described_class.new( + base_options_with_purge.merge(purge_where_clause: "status = 'deleted'") + ) + + allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) + expect(mock_duckdb).to receive(:query).with(/isp_id IS NOT NULL/).at_least(:once).and_return([[10]]) + expect(mock_duckdb).not_to receive(:query).with(/status = 'deleted'/) + allow(mock_duckdb).to receive(:query).with(/COPY \(/) + allow(mock_duckdb).to receive(:query).with(/FROM read_parquet/).and_return([[10]]) + + allow(mock_pg_conn).to receive(:exec).with(/SET idle_in_transaction_session_timeout/) + allow(mock_pg_result).to receive(:cmd_tuples).and_return(5, 0) + expect(mock_pg_conn).to receive(:exec) + .with(/DELETE FROM versions.*status = 'deleted'/m).twice + .and_return(mock_pg_result) + + expect(engine.call).to be true + end + + it "archives subset but purges superset (primary use case)" do + engine = described_class.new( + base_options_with_purge.merge( + where_clause: "isp_id IS NOT NULL", + purge_where_clause: "" + ) + ) + + allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) + expect(mock_duckdb).to receive(:query).with(/isp_id IS NOT NULL/).at_least(:once).and_return([[10]]) + allow(mock_duckdb).to receive(:query).with(/COPY \(/) + allow(mock_duckdb).to receive(:query).with(/FROM read_parquet/).and_return([[10]]) + + allow(mock_pg_conn).to receive(:exec).with(/SET idle_in_transaction_session_timeout/) + allow(mock_pg_result).to receive(:cmd_tuples).and_return(10, 0) + expect(mock_pg_conn).to receive(:exec).with(/DELETE FROM versions/).twice do |sql| + expect(sql).not_to include("isp_id IS NOT NULL") + mock_pg_result + end + + expect(engine.call).to be true + end + + it "purge_where_clause independent of where_clause" do + engine = described_class.new( + base_options_with_purge.merge( + where_clause: "isp_id IS NOT NULL", + purge_where_clause: "status = 'deleted'" + ) + ) + + allow(mock_duckdb).to receive(:query).with(/INSTALL postgres|SET max_memory|SET temp_directory|ATTACH/) + allow(mock_duckdb).to receive(:query).with(/SELECT row_count FROM postgres_query/).and_return([[10]]) + allow(mock_duckdb).to receive(:query).with(/COPY \(/) + allow(mock_duckdb).to receive(:query).with(/FROM read_parquet/).and_return([[10]]) + + allow(mock_pg_conn).to receive(:exec).with(/SET idle_in_transaction_session_timeout/) + allow(mock_pg_result).to receive(:cmd_tuples).and_return(5, 0) + expect(mock_pg_conn).to receive(:exec) + .with(/DELETE FROM versions.*status = 'deleted'/m).twice + .and_return(mock_pg_result) + + expect(engine.call).to be true + end + end end