From c7b7299e323d7bf1e390a776dafa486f4a3bbda0 Mon Sep 17 00:00:00 2001 From: Niels Erik Date: Sun, 22 Feb 2026 22:22:08 +0100 Subject: [PATCH] MODINVUP-180 cascade delete logs (#199) * MODINVUP-180 - cascade-delete logs on delete channel, import-job --- .../importing/moduledata/ImportJob.java | 15 ++++++-- .../importing/moduledata/LogLine.java | 6 ++-- .../importing/moduledata/RecordFailure.java | 6 ++-- .../service/delivery/respond/Channels.java | 16 ++++++++- .../service/delivery/respond/LogPurging.java | 34 ------------------- .../unittests/ImportTests.java | 22 +++++++++++- 6 files changed, 55 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/folio/inventoryupdate/importing/moduledata/ImportJob.java b/src/main/java/org/folio/inventoryupdate/importing/moduledata/ImportJob.java index f016c72a..3c649ad4 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/moduledata/ImportJob.java +++ b/src/main/java/org/folio/inventoryupdate/importing/moduledata/ImportJob.java @@ -6,8 +6,10 @@ import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.templates.RowMapper; +import io.vertx.sqlclient.templates.SqlTemplate; import io.vertx.sqlclient.templates.TupleMapper; import java.time.LocalDateTime; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -327,8 +329,9 @@ public Future createDatabase(TenantPgPool pool) { "CREATE TABLE IF NOT EXISTS " + pool.getSchema() + "." + table() + "(" + dbColumnNameAndType(ID) + " PRIMARY KEY, " - + dbColumnNameAndType(CHANNEL_ID) + " NOT NULL " - + " REFERENCES " + pool.getSchema() + "." + Tables.CHANNEL + " (" + new Channel().dbColumnName(ID) + "), " + + dbColumnNameAndType(CHANNEL_ID) + " NOT NULL CONSTRAINT import_job_channel_id_fkey " + + "REFERENCES " + pool.getSchema() + "." + Tables.CHANNEL + " (" + new Channel().dbColumnName(ID) + ") " + + " ON DELETE CASCADE, " + dbColumnNameAndType(CHANNEL_NAME) + ", " + dbColumnNameAndType(IMPORT_TYPE) + ", " + dbColumnNameAndType(TRANSFORMATION) + ", " @@ -343,6 +346,14 @@ public Future createDatabase(TenantPgPool pool) { ).mapEmpty(); } + public Future countImportJobsByChannelId(TenantPgPool pool, String channelId) { + return SqlTemplate.forQuery(pool.getPool(), + "SELECT COUNT(*) AS import_jobs_count FROM " + pool.getSchema() + "." + table() + + " WHERE " + dbColumnName(CHANNEL_ID) + " = #{channelId}") + .execute(Collections.singletonMap("channelId", channelId)) + .map(rows -> rows.iterator().next().getInteger("import_jobs_count")); + } + public record ImportJobRecord(UUID id, UUID channelId, String channelName, diff --git a/src/main/java/org/folio/inventoryupdate/importing/moduledata/LogLine.java b/src/main/java/org/folio/inventoryupdate/importing/moduledata/LogLine.java index 886de213..57aea281 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/moduledata/LogLine.java +++ b/src/main/java/org/folio/inventoryupdate/importing/moduledata/LogLine.java @@ -182,14 +182,14 @@ public Future createDatabase(TenantPgPool pool) { "CREATE TABLE IF NOT EXISTS " + pool.getSchema() + "." + table() + "(" + dbColumnName(ID) + " UUID PRIMARY KEY, " - + dbColumnName(IMPORT_JOB_ID) + " UUID NOT NULL REFERENCES " - + pool.getSchema() + "." + Tables.IMPORT_JOB + " (" + new ImportJob().dbColumnName(ID) + "), " + + dbColumnName(IMPORT_JOB_ID) + " UUID NOT NULL CONSTRAINT log_statement_import_job_id_fkey REFERENCES " + + pool.getSchema() + "." + Tables.IMPORT_JOB + " (" + new ImportJob().dbColumnName(ID) + ") " + + "ON DELETE CASCADE, " + dbColumnName(TIME_STAMP) + " TIMESTAMP NOT NULL, " + dbColumnName(JOB_LABEL) + " TEXT NOT NULL, " + dbColumnName(LOG_STATEMENT) + " TEXT NOT NULL, " + metadata.columnsDdl() + ")", - "CREATE INDEX IF NOT EXISTS log_statement_import_job_id_idx " + " ON " + pool.getSchema() + "." + table() + "(" + dbColumnName(IMPORT_JOB_ID) + ")" ).mapEmpty(); diff --git a/src/main/java/org/folio/inventoryupdate/importing/moduledata/RecordFailure.java b/src/main/java/org/folio/inventoryupdate/importing/moduledata/RecordFailure.java index 902f2058..a54ffc80 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/moduledata/RecordFailure.java +++ b/src/main/java/org/folio/inventoryupdate/importing/moduledata/RecordFailure.java @@ -254,8 +254,9 @@ public Future createDatabase(TenantPgPool pool) { "CREATE TABLE IF NOT EXISTS " + pool.getSchema() + "." + table() + "(" + dbColumnNameAndType(ID) + " PRIMARY KEY, " - + dbColumnNameAndType(IMPORT_JOB_ID) + " NOT NULL REFERENCES " - + pool.getSchema() + "." + Tables.IMPORT_JOB + "(" + new ImportJob().dbColumnName(ID) + "), " + + dbColumnNameAndType(IMPORT_JOB_ID) + " NOT NULL CONSTRAINT record_failure_import_job_id_fkey " + + "REFERENCES " + pool.getSchema() + "." + Tables.IMPORT_JOB + + "(" + new ImportJob().dbColumnName(ID) + ") ON DELETE CASCADE, " + dbColumnNameAndType(RECORD_NUMBER) + ", " + dbColumnNameAndType(TIME_STAMP) + ", " + dbColumnNameAndType(SOURCE_FILE_NAME) + ", " @@ -264,7 +265,6 @@ public Future createDatabase(TenantPgPool pool) { + dbColumnNameAndType(TRANSFORMED_RECORD) + " NOT NULL, " + metadata.columnsDdl() + ")", - "CREATE INDEX IF NOT EXISTS record_failure_import_job_id_idx " + " ON " + pool.getSchema() + "." + table() + "(" + dbColumnName(IMPORT_JOB_ID) + ")" ).mapEmpty(); diff --git a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java index 6f5089b3..c2493d37 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java +++ b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java @@ -9,6 +9,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.folio.inventoryupdate.importing.moduledata.Channel; +import org.folio.inventoryupdate.importing.moduledata.ImportJob; import org.folio.inventoryupdate.importing.moduledata.database.EntityStorage; import org.folio.inventoryupdate.importing.moduledata.database.SqlQuery; import org.folio.inventoryupdate.importing.moduledata.database.Tables; @@ -91,12 +92,25 @@ public static Future putChannel(ServiceRequest request) { public static Future deleteChannel(ServiceRequest request) { String channelId = request.requestParam("id"); + boolean force = "true".equalsIgnoreCase(request.requestParam("force")); return getChannelByTagOrUuid(request, channelId).compose(channel -> { if (channel == null) { return responseText(request.routingContext(), 404) .end("Found no channel with tag or id " + channelId + " to delete.").mapEmpty(); } else { - return deleteEntityAndRespond(request, new Channel()).compose(na -> decommission(request)).mapEmpty(); + if (force) { + return deleteEntityAndRespond(request, new Channel()).compose(na -> decommission(request)).mapEmpty(); + } else { + return new ImportJob().countImportJobsByChannelId(request.entityStorage().getTenantPool(), channelId) + .compose(jobsCount -> { + if (jobsCount > 0) { + return responseText(request.routingContext(), 400).end("Channel not deleted because it has " + jobsCount + + " logged import jobs. To delete all logs together with the channel, use parameter ?force=true"); + } else { + return deleteEntityAndRespond(request, new Channel()).compose(na -> decommission(request)).mapEmpty(); + } + }); + } } }); } diff --git a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/LogPurging.java b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/LogPurging.java index fefb63ab..0a4076dc 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/LogPurging.java +++ b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/LogPurging.java @@ -11,8 +11,6 @@ import org.apache.logging.log4j.Logger; import org.folio.inventoryupdate.importing.foliodata.SettingsClient; import org.folio.inventoryupdate.importing.moduledata.ImportJob; -import org.folio.inventoryupdate.importing.moduledata.LogLine; -import org.folio.inventoryupdate.importing.moduledata.RecordFailure; import org.folio.inventoryupdate.importing.moduledata.database.Tables; import org.folio.inventoryupdate.importing.service.ServiceRequest; import org.folio.inventoryupdate.importing.utils.Miscellaneous; @@ -49,38 +47,6 @@ private Future purgePastJobsBySetting(ServiceRequest request, String purge } private Future purgePreviousJobsByAge(LocalDateTime untilDate) { - return deleteLogs(untilDate) - .compose(deletedLogs -> deleteFailedRecords(untilDate)) - .compose(deletedFailedRecords -> deleteImportJobs(untilDate)); - } - - private Future deleteLogs(LocalDateTime untilDate) { - return SqlTemplate.forUpdate(pool.getPool(), - "DELETE FROM " + pool.getSchema() + "." + Tables.LOG_STATEMENT - + " WHERE " + new LogLine().field(LogLine.IMPORT_JOB_ID).columnName() - + " IN (SELECT " + new ImportJob().field(ImportJob.ID).columnName() - + " FROM " + pool.getSchema() + "." + Tables.IMPORT_JOB - + " WHERE " + new ImportJob().field(ImportJob.STARTED).columnName() + " < #{untilDate} )") - .execute(Collections.singletonMap("untilDate", untilDate)) - .onSuccess(result -> logger.info("{} log lines deleted", result.rowCount())) - .onFailure(error -> logger.error("{} (occurred when attempting to delete logs)", error.getMessage())) - .mapEmpty(); - } - - private Future deleteFailedRecords(LocalDateTime untilDate) { - return SqlTemplate.forUpdate(pool.getPool(), - "DELETE FROM " + pool.getSchema() + "." + Tables.RECORD_FAILURE - + " WHERE " + new RecordFailure().field(LogLine.IMPORT_JOB_ID).columnName() - + " IN (SELECT " + new ImportJob().field(ImportJob.ID).columnName() - + " FROM " + pool.getSchema() + "." + Tables.IMPORT_JOB - + " WHERE " + new ImportJob().field(ImportJob.STARTED).columnName() + " < #{untilDate} )") - .execute(Collections.singletonMap("untilDate", untilDate)) - .onSuccess(result -> logger.info("{} failed records deleted", result.rowCount())) - .onFailure(error -> logger.error("{} (occurred when attempting to delete failed records)", error.getMessage())) - .mapEmpty(); - } - - private Future deleteImportJobs(LocalDateTime untilDate) { return SqlTemplate.forUpdate(pool.getPool(), "DELETE FROM " + pool.getSchema() + "." + Tables.IMPORT_JOB + " WHERE " + new ImportJob().field(ImportJob.STARTED).columnName() + " <#{untilDate} ") diff --git a/src/test/java/org/folio/inventoryupdate/unittests/ImportTests.java b/src/test/java/org/folio/inventoryupdate/unittests/ImportTests.java index f6c49a7a..3d295990 100644 --- a/src/test/java/org/folio/inventoryupdate/unittests/ImportTests.java +++ b/src/test/java/org/folio/inventoryupdate/unittests/ImportTests.java @@ -683,11 +683,21 @@ public void canPostGetPutDeleteChannel() { putJsonObject(Service.PATH_CHANNELS + "/" + Files.JSON_CHANNEL.getString("id"), update, 200); putJsonObject(Service.PATH_CHANNELS + "/" + UUID.randomUUID(), update, 404); getRecords(Service.PATH_CHANNELS).body("totalRecords", is(1)); + // Can delete channel with no logged jobs deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), 200); getRecords(Service.PATH_CHANNELS).body("totalRecords", is(0)); - deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), 404); + // Can create disabled channel postJsonObject(PATH_CHANNELS, Files.JSON_CHANNEL.copy().put("enabled", false)); + // Can only delete channel with logged jobs if `force` set to `true` + postJsonObject(Service.PATH_IMPORT_JOBS, Files.JSON_IMPORT_JOB); + deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), 400); + getRecords(Service.PATH_CHANNELS).body("totalRecords", is(1)); + deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), "force=true", 200); + getRecords(Service.PATH_CHANNELS).body("totalRecords", is(0)); + deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), 404); + + } @Test @@ -1545,6 +1555,16 @@ ValidatableResponse deleteRecord(String api, String id, int statusCode) { .statusCode(statusCode); } + ValidatableResponse deleteRecord(String api, String id, String argument, int statusCode) { + return given() + .baseUri(BASE_URI_INVENTORY_UPDATE) + .header(Service.OKAPI_TENANT) + .header(Service.OKAPI_URL) + .delete(api + "/" + id + "?" + argument) + .then() + .statusCode(statusCode); + } + ValidatableResponse getRecords(String api) { return given() .baseUri(BASE_URI_INVENTORY_UPDATE)