Skip to content

Commit f138f93

Browse files
authored
Merge pull request #3 from indexdata/master
Cascade-delete logs on delete of channel or import-job
2 parents f5225be + 38bec6e commit f138f93

6 files changed

Lines changed: 55 additions & 44 deletions

File tree

src/main/java/org/folio/inventoryupdate/importing/moduledata/ImportJob.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import io.vertx.core.json.JsonObject;
77
import io.vertx.sqlclient.SqlResult;
88
import io.vertx.sqlclient.templates.RowMapper;
9+
import io.vertx.sqlclient.templates.SqlTemplate;
910
import io.vertx.sqlclient.templates.TupleMapper;
1011
import java.time.LocalDateTime;
12+
import java.util.Collections;
1113
import java.util.HashMap;
1214
import java.util.Map;
1315
import java.util.UUID;
@@ -327,8 +329,9 @@ public Future<Void> createDatabase(TenantPgPool pool) {
327329
"CREATE TABLE IF NOT EXISTS " + pool.getSchema() + "." + table()
328330
+ "("
329331
+ dbColumnNameAndType(ID) + " PRIMARY KEY, "
330-
+ dbColumnNameAndType(CHANNEL_ID) + " NOT NULL "
331-
+ " REFERENCES " + pool.getSchema() + "." + Tables.CHANNEL + " (" + new Channel().dbColumnName(ID) + "), "
332+
+ dbColumnNameAndType(CHANNEL_ID) + " NOT NULL CONSTRAINT import_job_channel_id_fkey "
333+
+ "REFERENCES " + pool.getSchema() + "." + Tables.CHANNEL + " (" + new Channel().dbColumnName(ID) + ") "
334+
+ " ON DELETE CASCADE, "
332335
+ dbColumnNameAndType(CHANNEL_NAME) + ", "
333336
+ dbColumnNameAndType(IMPORT_TYPE) + ", "
334337
+ dbColumnNameAndType(TRANSFORMATION) + ", "
@@ -343,6 +346,14 @@ public Future<Void> createDatabase(TenantPgPool pool) {
343346
).mapEmpty();
344347
}
345348

349+
public Future<Integer> countImportJobsByChannelId(TenantPgPool pool, String channelId) {
350+
return SqlTemplate.forQuery(pool.getPool(),
351+
"SELECT COUNT(*) AS import_jobs_count FROM " + pool.getSchema() + "." + table()
352+
+ " WHERE " + dbColumnName(CHANNEL_ID) + " = #{channelId}")
353+
.execute(Collections.singletonMap("channelId", channelId))
354+
.map(rows -> rows.iterator().next().getInteger("import_jobs_count"));
355+
}
356+
346357
public record ImportJobRecord(UUID id,
347358
UUID channelId,
348359
String channelName,

src/main/java/org/folio/inventoryupdate/importing/moduledata/LogLine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,14 @@ public Future<Void> createDatabase(TenantPgPool pool) {
182182
"CREATE TABLE IF NOT EXISTS " + pool.getSchema() + "." + table()
183183
+ "("
184184
+ dbColumnName(ID) + " UUID PRIMARY KEY, "
185-
+ dbColumnName(IMPORT_JOB_ID) + " UUID NOT NULL REFERENCES "
186-
+ pool.getSchema() + "." + Tables.IMPORT_JOB + " (" + new ImportJob().dbColumnName(ID) + "), "
185+
+ dbColumnName(IMPORT_JOB_ID) + " UUID NOT NULL CONSTRAINT log_statement_import_job_id_fkey REFERENCES "
186+
+ pool.getSchema() + "." + Tables.IMPORT_JOB + " (" + new ImportJob().dbColumnName(ID) + ") "
187+
+ "ON DELETE CASCADE, "
187188
+ dbColumnName(TIME_STAMP) + " TIMESTAMP NOT NULL, "
188189
+ dbColumnName(JOB_LABEL) + " TEXT NOT NULL, "
189190
+ dbColumnName(LOG_STATEMENT) + " TEXT NOT NULL, "
190191
+ metadata.columnsDdl()
191192
+ ")",
192-
193193
"CREATE INDEX IF NOT EXISTS log_statement_import_job_id_idx "
194194
+ " ON " + pool.getSchema() + "." + table() + "(" + dbColumnName(IMPORT_JOB_ID) + ")"
195195
).mapEmpty();

src/main/java/org/folio/inventoryupdate/importing/moduledata/RecordFailure.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,9 @@ public Future<Void> createDatabase(TenantPgPool pool) {
254254
"CREATE TABLE IF NOT EXISTS " + pool.getSchema() + "." + table()
255255
+ "("
256256
+ dbColumnNameAndType(ID) + " PRIMARY KEY, "
257-
+ dbColumnNameAndType(IMPORT_JOB_ID) + " NOT NULL REFERENCES "
258-
+ pool.getSchema() + "." + Tables.IMPORT_JOB + "(" + new ImportJob().dbColumnName(ID) + "), "
257+
+ dbColumnNameAndType(IMPORT_JOB_ID) + " NOT NULL CONSTRAINT record_failure_import_job_id_fkey "
258+
+ "REFERENCES " + pool.getSchema() + "." + Tables.IMPORT_JOB
259+
+ "(" + new ImportJob().dbColumnName(ID) + ") ON DELETE CASCADE, "
259260
+ dbColumnNameAndType(RECORD_NUMBER) + ", "
260261
+ dbColumnNameAndType(TIME_STAMP) + ", "
261262
+ dbColumnNameAndType(SOURCE_FILE_NAME) + ", "
@@ -264,7 +265,6 @@ public Future<Void> createDatabase(TenantPgPool pool) {
264265
+ dbColumnNameAndType(TRANSFORMED_RECORD) + " NOT NULL, "
265266
+ metadata.columnsDdl()
266267
+ ")",
267-
268268
"CREATE INDEX IF NOT EXISTS record_failure_import_job_id_idx "
269269
+ " ON " + pool.getSchema() + "." + table() + "(" + dbColumnName(IMPORT_JOB_ID) + ")"
270270
).mapEmpty();

src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.UUID;
1010
import java.util.concurrent.atomic.AtomicBoolean;
1111
import org.folio.inventoryupdate.importing.moduledata.Channel;
12+
import org.folio.inventoryupdate.importing.moduledata.ImportJob;
1213
import org.folio.inventoryupdate.importing.moduledata.database.EntityStorage;
1314
import org.folio.inventoryupdate.importing.moduledata.database.SqlQuery;
1415
import org.folio.inventoryupdate.importing.moduledata.database.Tables;
@@ -91,12 +92,25 @@ public static Future<Void> putChannel(ServiceRequest request) {
9192

9293
public static Future<Void> deleteChannel(ServiceRequest request) {
9394
String channelId = request.requestParam("id");
95+
boolean force = "true".equalsIgnoreCase(request.requestParam("force"));
9496
return getChannelByTagOrUuid(request, channelId).compose(channel -> {
9597
if (channel == null) {
9698
return responseText(request.routingContext(), 404)
9799
.end("Found no channel with tag or id " + channelId + " to delete.").mapEmpty();
98100
} else {
99-
return deleteEntityAndRespond(request, new Channel()).compose(na -> decommission(request)).mapEmpty();
101+
if (force) {
102+
return deleteEntityAndRespond(request, new Channel()).compose(na -> decommission(request)).mapEmpty();
103+
} else {
104+
return new ImportJob().countImportJobsByChannelId(request.entityStorage().getTenantPool(), channelId)
105+
.compose(jobsCount -> {
106+
if (jobsCount > 0) {
107+
return responseText(request.routingContext(), 400).end("Channel not deleted because it has " + jobsCount
108+
+ " logged import jobs. To delete all logs together with the channel, use parameter ?force=true");
109+
} else {
110+
return deleteEntityAndRespond(request, new Channel()).compose(na -> decommission(request)).mapEmpty();
111+
}
112+
});
113+
}
100114
}
101115
});
102116
}

src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/LogPurging.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.folio.inventoryupdate.importing.foliodata.SettingsClient;
1313
import org.folio.inventoryupdate.importing.moduledata.ImportJob;
14-
import org.folio.inventoryupdate.importing.moduledata.LogLine;
15-
import org.folio.inventoryupdate.importing.moduledata.RecordFailure;
1614
import org.folio.inventoryupdate.importing.moduledata.database.Tables;
1715
import org.folio.inventoryupdate.importing.service.ServiceRequest;
1816
import org.folio.inventoryupdate.importing.utils.Miscellaneous;
@@ -49,38 +47,6 @@ private Future<Void> purgePastJobsBySetting(ServiceRequest request, String purge
4947
}
5048

5149
private Future<Void> purgePreviousJobsByAge(LocalDateTime untilDate) {
52-
return deleteLogs(untilDate)
53-
.compose(deletedLogs -> deleteFailedRecords(untilDate))
54-
.compose(deletedFailedRecords -> deleteImportJobs(untilDate));
55-
}
56-
57-
private Future<Void> deleteLogs(LocalDateTime untilDate) {
58-
return SqlTemplate.forUpdate(pool.getPool(),
59-
"DELETE FROM " + pool.getSchema() + "." + Tables.LOG_STATEMENT
60-
+ " WHERE " + new LogLine().field(LogLine.IMPORT_JOB_ID).columnName()
61-
+ " IN (SELECT " + new ImportJob().field(ImportJob.ID).columnName()
62-
+ " FROM " + pool.getSchema() + "." + Tables.IMPORT_JOB
63-
+ " WHERE " + new ImportJob().field(ImportJob.STARTED).columnName() + " < #{untilDate} )")
64-
.execute(Collections.singletonMap("untilDate", untilDate))
65-
.onSuccess(result -> logger.info("{} log lines deleted", result.rowCount()))
66-
.onFailure(error -> logger.error("{} (occurred when attempting to delete logs)", error.getMessage()))
67-
.mapEmpty();
68-
}
69-
70-
private Future<Void> deleteFailedRecords(LocalDateTime untilDate) {
71-
return SqlTemplate.forUpdate(pool.getPool(),
72-
"DELETE FROM " + pool.getSchema() + "." + Tables.RECORD_FAILURE
73-
+ " WHERE " + new RecordFailure().field(LogLine.IMPORT_JOB_ID).columnName()
74-
+ " IN (SELECT " + new ImportJob().field(ImportJob.ID).columnName()
75-
+ " FROM " + pool.getSchema() + "." + Tables.IMPORT_JOB
76-
+ " WHERE " + new ImportJob().field(ImportJob.STARTED).columnName() + " < #{untilDate} )")
77-
.execute(Collections.singletonMap("untilDate", untilDate))
78-
.onSuccess(result -> logger.info("{} failed records deleted", result.rowCount()))
79-
.onFailure(error -> logger.error("{} (occurred when attempting to delete failed records)", error.getMessage()))
80-
.mapEmpty();
81-
}
82-
83-
private Future<Void> deleteImportJobs(LocalDateTime untilDate) {
8450
return SqlTemplate.forUpdate(pool.getPool(),
8551
"DELETE FROM " + pool.getSchema() + "." + Tables.IMPORT_JOB
8652
+ " WHERE " + new ImportJob().field(ImportJob.STARTED).columnName() + " <#{untilDate} ")

src/test/java/org/folio/inventoryupdate/unittests/ImportTests.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,11 +683,21 @@ public void canPostGetPutDeleteChannel() {
683683
putJsonObject(Service.PATH_CHANNELS + "/" + Files.JSON_CHANNEL.getString("id"), update, 200);
684684
putJsonObject(Service.PATH_CHANNELS + "/" + UUID.randomUUID(), update, 404);
685685
getRecords(Service.PATH_CHANNELS).body("totalRecords", is(1));
686+
// Can delete channel with no logged jobs
686687
deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), 200);
687688
getRecords(Service.PATH_CHANNELS).body("totalRecords", is(0));
688-
deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), 404);
689+
689690
// Can create disabled channel
690691
postJsonObject(PATH_CHANNELS, Files.JSON_CHANNEL.copy().put("enabled", false));
692+
// Can only delete channel with logged jobs if `force` set to `true`
693+
postJsonObject(Service.PATH_IMPORT_JOBS, Files.JSON_IMPORT_JOB);
694+
deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), 400);
695+
getRecords(Service.PATH_CHANNELS).body("totalRecords", is(1));
696+
deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), "force=true", 200);
697+
getRecords(Service.PATH_CHANNELS).body("totalRecords", is(0));
698+
deleteRecord(Service.PATH_CHANNELS, Files.JSON_CHANNEL.getString("id"), 404);
699+
700+
691701
}
692702

693703
@Test
@@ -1545,6 +1555,16 @@ ValidatableResponse deleteRecord(String api, String id, int statusCode) {
15451555
.statusCode(statusCode);
15461556
}
15471557

1558+
ValidatableResponse deleteRecord(String api, String id, String argument, int statusCode) {
1559+
return given()
1560+
.baseUri(BASE_URI_INVENTORY_UPDATE)
1561+
.header(Service.OKAPI_TENANT)
1562+
.header(Service.OKAPI_URL)
1563+
.delete(api + "/" + id + "?" + argument)
1564+
.then()
1565+
.statusCode(statusCode);
1566+
}
1567+
15481568
ValidatableResponse getRecords(String api) {
15491569
return given()
15501570
.baseUri(BASE_URI_INVENTORY_UPDATE)

0 commit comments

Comments
 (0)