From 0151fe4e4bd0883ba46798e59c32642e5633701b Mon Sep 17 00:00:00 2001 From: bercianor Date: Fri, 20 Mar 2026 23:03:14 +0000 Subject: [PATCH 1/4] feat: add SQL Test Kit --- .github/workflows/release.yml | 14 + .../flamingock.project-structure.gradle.kts | 3 +- .../build.gradle.kts | 1 + .../store/sql/SqlAuditStoreTest.java | 329 +++++-------- settings.gradle.kts | 4 + utils/sql-test-kit/build.gradle.kts | 13 + .../flamingock/sql/kit/SqlAuditStorage.java | 180 +++++++ .../flamingock/sql/kit/SqlDialectHelper.java | 459 ++++++++++++++++++ .../io/flamingock/sql/kit/SqlLockStorage.java | 261 ++++++++++ .../io/flamingock/sql/kit/SqlTestKit.java | 88 ++++ 10 files changed, 1133 insertions(+), 219 deletions(-) create mode 100644 utils/sql-test-kit/build.gradle.kts create mode 100644 utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java create mode 100644 utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlDialectHelper.java create mode 100644 utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java create mode 100644 utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b357d0066..4b5bf6231 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -500,6 +500,19 @@ jobs: FLAMINGOCK_JRELEASER_GPG_SECRET_KEY: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_SECRET_KEY }} FLAMINGOCK_JRELEASER_GPG_PASSPHRASE: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_PASSPHRASE }} + sql-test-kit: + needs: [ build ] + uses: ./.github/workflows/module-release-graalvm.yml + with: + module: sql-test-kit + secrets: + FLAMINGOCK_JRELEASER_GITHUB_TOKEN: ${{ secrets.FLAMINGOCK_JRELEASER_GITHUB_TOKEN }} + FLAMINGOCK_JRELEASER_MAVENCENTRAL_USERNAME: ${{ secrets.FLAMINGOCK_JRELEASER_MAVENCENTRAL_USERNAME }} + FLAMINGOCK_JRELEASER_MAVENCENTRAL_PASSWORD: ${{ secrets.FLAMINGOCK_JRELEASER_MAVENCENTRAL_PASSWORD }} + FLAMINGOCK_JRELEASER_GPG_PUBLIC_KEY: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_PUBLIC_KEY }} + FLAMINGOCK_JRELEASER_GPG_SECRET_KEY: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_SECRET_KEY }} + FLAMINGOCK_JRELEASER_GPG_PASSPHRASE: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_PASSPHRASE }} + mongock-support: needs: [ build ] uses: ./.github/workflows/module-release-graalvm.yml @@ -587,6 +600,7 @@ jobs: dynamodb-test-kit, couchbase-util, sql-util, + sql-test-kit, mongock-support, mongock-importer-mongodb, mongock-importer-dynamodb, diff --git a/buildSrc/src/main/kotlin/flamingock.project-structure.gradle.kts b/buildSrc/src/main/kotlin/flamingock.project-structure.gradle.kts index 3bb00f701..f70a34315 100644 --- a/buildSrc/src/main/kotlin/flamingock.project-structure.gradle.kts +++ b/buildSrc/src/main/kotlin/flamingock.project-structure.gradle.kts @@ -68,7 +68,8 @@ val legacyProjects = setOf( val testKitsProjects = setOf( "mongodb-test-kit", - "dynamodb-test-kit" + "dynamodb-test-kit", + "sql-test-kit" ) val allProjects = coreProjects + cloudProjects + communityProjects + pluginProjects + targetSystemProjects + externalSystemProjects + utilProjects + legacyProjects + testKitsProjects diff --git a/community/flamingock-auditstore-sql/build.gradle.kts b/community/flamingock-auditstore-sql/build.gradle.kts index 4b1c51ae4..e0d0ff827 100644 --- a/community/flamingock-auditstore-sql/build.gradle.kts +++ b/community/flamingock-auditstore-sql/build.gradle.kts @@ -20,6 +20,7 @@ dependencies { testImplementation("org.testcontainers:testcontainers-postgresql:2.0.2") testImplementation("org.testcontainers:testcontainers-mariadb:2.0.2") testImplementation(project(":utils:test-util")) + testImplementation(project(":utils:sql-test-kit")) testImplementation("com.zaxxer:HikariCP:3.4.5") testImplementation("org.testcontainers:testcontainers-junit-jupiter:2.0.2") testImplementation("com.h2database:h2:2.2.224") diff --git a/community/flamingock-auditstore-sql/src/test/java/io/flamingock/store/sql/SqlAuditStoreTest.java b/community/flamingock-auditstore-sql/src/test/java/io/flamingock/store/sql/SqlAuditStoreTest.java index 192ea6f11..024d9e251 100644 --- a/community/flamingock-auditstore-sql/src/test/java/io/flamingock/store/sql/SqlAuditStoreTest.java +++ b/community/flamingock-auditstore-sql/src/test/java/io/flamingock/store/sql/SqlAuditStoreTest.java @@ -17,23 +17,21 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; -import io.flamingock.internal.common.core.util.Deserializer; +import io.flamingock.common.test.pipeline.CodeChangeTestDefinition; +import io.flamingock.core.kit.TestKit; +import io.flamingock.core.kit.audit.AuditTestSupport; import io.flamingock.internal.common.sql.SqlDialect; -import io.flamingock.internal.core.builder.FlamingockFactory; import io.flamingock.internal.core.operation.OperationException; -import io.flamingock.internal.util.Trio; -import io.flamingock.internal.util.constants.CommunityPersistenceConstants; import io.flamingock.store.sql.changes.postgresql.failedWithoutRollback._001__create_index; import io.flamingock.store.sql.changes.postgresql.failedWithoutRollback._002__insert_document; import io.flamingock.store.sql.changes.postgresql.failedWithoutRollback._003__execution_with_exception; import io.flamingock.store.sql.changes.postgresql.happyPath._003__insert_another_document; import io.flamingock.targetsystem.sql.SqlTargetSystem; +import io.flamingock.sql.kit.SqlTestKit; import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.sqlite.SQLiteDataSource; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.junit.jupiter.Testcontainers; @@ -44,6 +42,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static io.flamingock.core.kit.audit.AuditEntryExpectation.*; import static org.junit.jupiter.api.Assertions.*; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -282,73 +281,37 @@ private Class[] getChangeClasses(String dialectName, String scenario) { @DisplayName("When standalone runs the AuditStore should persist the audit logs and the test data") void happyPathWithMockedPipeline(SqlDialect sqlDialect, String dialectName) throws Exception { context = setupTest(sqlDialect, dialectName); - - try (MockedStatic mocked = Mockito.mockStatic(Deserializer.class)) { - Class[] changeClasses = getChangeClasses(dialectName, "happyPath"); - - mocked.when(Deserializer::readMetadataFromFile).thenReturn(PipelineTestHelper.getPreviewPipeline( - new Trio<>(changeClasses[0], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[1], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[2], Collections.singletonList(Connection.class), null) - )); - - SqlTargetSystem targetSystem = new SqlTargetSystem("sql", context.dataSource); - SqlAuditStore auditStore = SqlAuditStore.from(targetSystem); - - FlamingockFactory.getCommunityBuilder() - .setAuditStore(auditStore) - .addTargetSystem(targetSystem) - .build() - .run(); - } - - // Verify audit logs - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement( - "SELECT change_id, state FROM " + CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME + " ORDER BY id ASC"); - ResultSet rs = ps.executeQuery()) { - - String[] expectedTaskIds = {"create-index", "insert-document", "insert-another-document"}; - int recordCount = 0; - int startedCount = 0; - int appliedCount = 0; - - while (rs.next()) { - String taskId = rs.getString("change_id"); - String state = rs.getString("state"); - assertTrue( - java.util.Arrays.asList(expectedTaskIds).contains(taskId), - "Unexpected change_id: " + taskId - ); - assertTrue( - state.equals("STARTED") || state.equals("APPLIED"), - "Unexpected state: " + state - ); - if (state.equals("STARTED")) startedCount++; - if (state.equals("APPLIED")) appliedCount++; - recordCount++; - } - - assertEquals(6, recordCount, "Audit log should have 6 records"); - assertEquals(3, startedCount, "Should have 3 STARTED records"); - assertEquals(3, appliedCount, "Should have 3 APPLIED records"); - } - - // Verify test data - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { - ps.setString(1, "test-client-Federico"); - try (ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Federico", rs.getString("name")); - } - ps.setString(1, "test-client-Jorge"); - try (ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Jorge", rs.getString("name")); - } - } - + SqlTargetSystem sqlTargetSystem = new SqlTargetSystem("sql", context.dataSource); + SqlAuditStore sqlAuditStore = SqlAuditStore.from(sqlTargetSystem); + TestKit testKit = SqlTestKit.create(sqlAuditStore, context.dataSource); + + Class[] changeClasses = getChangeClasses(dialectName, "happyPath"); + String[] expectedTaskIds = {"create-index", "insert-document", "insert-another-document"}; + //Given-When-Then + AuditTestSupport.withTestKit(testKit) + .GIVEN_Changes( + new CodeChangeTestDefinition(changeClasses[0], Collections.singletonList(Connection.class)), + new CodeChangeTestDefinition(changeClasses[1], Collections.singletonList(Connection.class)), + new CodeChangeTestDefinition(changeClasses[2], Collections.singletonList(Connection.class)) + ) + .WHEN(() -> testKit.createBuilder() + .setAuditStore(sqlAuditStore) + .addTargetSystem(sqlTargetSystem) + .build() + .run()) + .THEN_VerifyAuditSequenceStrict( + STARTED(expectedTaskIds[0]), + APPLIED(expectedTaskIds[0]), + STARTED(expectedTaskIds[1]), + APPLIED(expectedTaskIds[1]), + STARTED(expectedTaskIds[2]), + APPLIED(expectedTaskIds[2]) + ) + .run(); + + // Verify index exists and data state + SqlAuditTestHelper.verifyIndexExists(context); + verifyDataState(context, false); } @ParameterizedTest @@ -356,86 +319,40 @@ void happyPathWithMockedPipeline(SqlDialect sqlDialect, String dialectName) thro @DisplayName("When standalone runs the AuditStore and execution fails (with rollback method) should persist all the audit logs up to the failed one (ROLLED_BACK)") void failedWithRollback(SqlDialect sqlDialect, String dialectName) throws Exception { context = setupTest(sqlDialect, dialectName); - - try (MockedStatic mocked = Mockito.mockStatic(Deserializer.class)) { - Class[] changeClasses = getChangeClasses(dialectName, "failedWithRollback"); - - mocked.when(Deserializer::readMetadataFromFile).thenReturn(PipelineTestHelper.getPreviewPipeline( - new Trio<>(changeClasses[0], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[1], Collections.singletonList(Connection.class), Collections.singletonList(Connection.class)), - new Trio<>(changeClasses[2], Collections.singletonList(Connection.class), Collections.singletonList(Connection.class)) - )); - - SqlTargetSystem targetSystem = new SqlTargetSystem("sql", context.dataSource); - SqlAuditStore auditStore = SqlAuditStore.from(targetSystem); - - assertThrows(OperationException.class, () -> { - FlamingockFactory.getCommunityBuilder() - .setAuditStore(auditStore) - .addTargetSystem(targetSystem) - .build() - .run(); - }); - - // Verify audit sequence - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement( - "SELECT change_id, state FROM " + CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME + " ORDER BY id ASC"); - ResultSet rs = ps.executeQuery()) { - - assertTrue(rs.next()); - assertEquals("create-index", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("create-index", rs.getString("change_id")); - assertEquals("APPLIED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("insert-document", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("insert-document", rs.getString("change_id")); - assertEquals("APPLIED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("FAILED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("ROLLED_BACK", rs.getString("state")); - - assertFalse(rs.next()); - } - - // Verify index exists - SqlAuditTestHelper.verifyIndexExists(context); - - // Verify partial data - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { - ps.setString(1, "test-client-Federico"); - try (ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Federico", rs.getString("name")); - } - } - - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { - ps.setString(1, "test-client-Jorge"); - try (ResultSet rs = ps.executeQuery()) { - assertFalse(rs.next()); - } - } - } - + SqlTargetSystem sqlTargetSystem = new SqlTargetSystem("sql", context.dataSource); + SqlAuditStore sqlAuditStore = SqlAuditStore.from(sqlTargetSystem); + TestKit testKit = SqlTestKit.create(sqlAuditStore, context.dataSource); + + Class[] changeClasses = getChangeClasses(dialectName, "failedWithRollback"); + String[] expectedTaskIds = {"create-index", "insert-document", "execution-with-exception"}; + //Given-When-Then + AuditTestSupport.withTestKit(testKit) + .GIVEN_Changes( + new CodeChangeTestDefinition(changeClasses[0], Collections.singletonList(Connection.class), null), + new CodeChangeTestDefinition(changeClasses[1], Collections.singletonList(Connection.class), Collections.singletonList(Connection.class)), + new CodeChangeTestDefinition(changeClasses[2], Collections.singletonList(Connection.class), Collections.singletonList(Connection.class)) + ) + .WHEN(() -> assertThrows(OperationException.class, () -> { + testKit.createBuilder() + .setAuditStore(sqlAuditStore) + .addTargetSystem(sqlTargetSystem) + .build() + .run(); + })) + .THEN_VerifyAuditSequenceStrict( + STARTED(expectedTaskIds[0]), + APPLIED(expectedTaskIds[0]), + STARTED(expectedTaskIds[1]), + APPLIED(expectedTaskIds[1]), + STARTED(expectedTaskIds[2]), + FAILED(expectedTaskIds[2]), + ROLLED_BACK(expectedTaskIds[2]) + ) + .run(); + + // Verify index exists and data state + SqlAuditTestHelper.verifyIndexExists(context); + verifyDataState(context, true); } @ParameterizedTest @@ -443,72 +360,43 @@ void failedWithRollback(SqlDialect sqlDialect, String dialectName) throws Except @DisplayName("When standalone runs the AuditStore and execution fails (without rollback method) should persist all the audit logs up to the failed one (FAILED)") void failedWithoutRollback(SqlDialect sqlDialect, String dialectName) throws Exception { context = setupTest(sqlDialect, dialectName); - - try (MockedStatic mocked = Mockito.mockStatic(Deserializer.class)) { - Class[] changeClasses = getChangeClasses(dialectName, "failedWithoutRollback"); - - mocked.when(Deserializer::readMetadataFromFile).thenReturn(PipelineTestHelper.getPreviewPipeline( - new Trio<>(changeClasses[0], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[1], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[2], Collections.singletonList(Connection.class), null) - )); - - SqlTargetSystem targetSystem = new SqlTargetSystem("sql", context.dataSource); - SqlAuditStore auditStore = SqlAuditStore.from(targetSystem); - - assertThrows(OperationException.class, () -> { - FlamingockFactory.getCommunityBuilder() - .setAuditStore(auditStore) - .addTargetSystem(targetSystem) - .build() - .run(); - }); - - // Verify audit sequence - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement( - "SELECT change_id, state FROM " + CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME + " ORDER BY id ASC"); - ResultSet rs = ps.executeQuery()) { - - assertTrue(rs.next()); - assertEquals("create-index", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("create-index", rs.getString("change_id")); - assertEquals("APPLIED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("insert-document", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("insert-document", rs.getString("change_id")); - assertEquals("APPLIED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("FAILED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("ROLLED_BACK", rs.getString("state")); - - assertFalse(rs.next()); - } - - // Verify index exists and data state - SqlAuditTestHelper.verifyIndexExists(context); - verifyPartialDataState(context); - } - + SqlTargetSystem sqlTargetSystem = new SqlTargetSystem("sql", context.dataSource); + SqlAuditStore sqlAuditStore = SqlAuditStore.from(sqlTargetSystem); + TestKit testKit = SqlTestKit.create(sqlAuditStore, context.dataSource); + + Class[] changeClasses = getChangeClasses(dialectName, "failedWithoutRollback"); + String[] expectedTaskIds = {"create-index", "insert-document", "execution-with-exception"}; + //Given-When-Then + AuditTestSupport.withTestKit(testKit) + .GIVEN_Changes( + new CodeChangeTestDefinition(changeClasses[0], Collections.singletonList(Connection.class), null), + new CodeChangeTestDefinition(changeClasses[1], Collections.singletonList(Connection.class), null), + new CodeChangeTestDefinition(changeClasses[2], Collections.singletonList(Connection.class), null) + ) + .WHEN(() -> assertThrows(OperationException.class, () -> { + testKit.createBuilder() + .setAuditStore(sqlAuditStore) + .addTargetSystem(sqlTargetSystem) + .build() + .run(); + })) + .THEN_VerifyAuditSequenceStrict( + STARTED(expectedTaskIds[0]), + APPLIED(expectedTaskIds[0]), + STARTED(expectedTaskIds[1]), + APPLIED(expectedTaskIds[1]), + STARTED(expectedTaskIds[2]), + FAILED(expectedTaskIds[2]), + ROLLED_BACK(expectedTaskIds[2]) + ) + .run(); + + // Verify index exists and data state + SqlAuditTestHelper.verifyIndexExists(context); + verifyDataState(context, true); } - private void verifyPartialDataState(TestContext context) throws SQLException { + private void verifyDataState(TestContext context, Boolean partial) throws SQLException { try (Connection conn = context.dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { ps.setString(1, "test-client-Federico"); @@ -522,7 +410,12 @@ private void verifyPartialDataState(TestContext context) throws SQLException { PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { ps.setString(1, "test-client-Jorge"); try (ResultSet rs = ps.executeQuery()) { - assertFalse(rs.next()); + if (partial) { + assertFalse(rs.next()); + } else { + assertTrue(rs.next()); + assertEquals("Jorge", rs.getString("name")); + } } } } diff --git a/settings.gradle.kts b/settings.gradle.kts index c02225b6f..43a1e0001 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -173,6 +173,10 @@ include("utils:sql-util") project(":utils:sql-util").name = "sql-util" project(":utils:sql-util").projectDir = file("utils/sql-util") +include("utils:sql-test-kit") +project(":utils:sql-test-kit").name = "sql-test-kit" +project(":utils:sql-test-kit").projectDir = file("utils/sql-test-kit") + ////////////////////////////////////// // LEGACY ////////////////////////////////////// diff --git a/utils/sql-test-kit/build.gradle.kts b/utils/sql-test-kit/build.gradle.kts new file mode 100644 index 000000000..beb364614 --- /dev/null +++ b/utils/sql-test-kit/build.gradle.kts @@ -0,0 +1,13 @@ +dependencies { + implementation(project(":core:flamingock-core")) + implementation(project(":utils:sql-util")) + implementation(project(":utils:test-util")) +} + +description = "SQL TestKit for Flamingock testing" + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(8)) + } +} diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java new file mode 100644 index 000000000..a0692cb3c --- /dev/null +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java @@ -0,0 +1,180 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.sql.kit; + +import io.flamingock.core.kit.audit.AuditStorage; +import io.flamingock.internal.common.core.audit.AuditEntry; +import io.flamingock.internal.common.core.audit.AuditTxType; +import io.flamingock.internal.common.sql.SqlDialect; + +import javax.sql.DataSource; +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +import static io.flamingock.internal.util.constants.CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME; + +/** + * SQL implementation of AuditStorage for real database testing. + * Only depends on SQL client/database and core Flamingock classes. + * Does not depend on SQL-specific Flamingock components like SqlTargetSystem. + */ +public class SqlAuditStorage implements AuditStorage { + + private final DataSource dataSource; + private final SqlDialectHelper dialectHelper; + private final String auditTableName; + + public SqlAuditStorage(DataSource dataSource) throws SQLException { + this(dataSource, DEFAULT_AUDIT_STORE_NAME); + } + + public SqlAuditStorage(DataSource dataSource, String tableName) throws SQLException { + this.auditTableName = tableName; + this.dataSource = dataSource; + try (Connection conn = dataSource.getConnection()) { + this.dialectHelper = new SqlDialectHelper(conn); + } + } + + @Override + public void addAuditEntry(AuditEntry auditEntry) { + try (Connection connection = dataSource.getConnection()) { + // For Informix, ensure autoCommit is enabled for audit writes + if (dialectHelper.getSqlDialect() == SqlDialect.INFORMIX) { + connection.setAutoCommit(true); + } + + try (PreparedStatement ps = connection.prepareStatement( + dialectHelper.getInsertSqlString(auditTableName))) { + ps.setString(1, auditEntry.getExecutionId()); + ps.setString(2, auditEntry.getStageId()); + ps.setString(3, auditEntry.getTaskId()); + ps.setString(4, auditEntry.getAuthor()); + ps.setTimestamp(5, Timestamp.valueOf(auditEntry.getCreatedAt())); + ps.setString(6, auditEntry.getState() != null ? auditEntry.getState().name() : null); + ps.setString(7, auditEntry.getClassName()); + ps.setString(8, auditEntry.getMethodName()); + ps.setString(9, auditEntry.getSourceFile()); + ps.setString(10, auditEntry.getMetadata() != null ? auditEntry.getMetadata().toString() : null); + ps.setLong(11, auditEntry.getExecutionMillis()); + ps.setString(12, auditEntry.getExecutionHostname()); + ps.setString(13, auditEntry.getErrorTrace()); + ps.setString(14, auditEntry.getType() != null ? auditEntry.getType().name() : null); + ps.setString(15, auditEntry.getTxType() != null ? auditEntry.getTxType().name() : null); + ps.setString(16, auditEntry.getTargetSystemId()); + ps.setString(17, auditEntry.getOrder()); + ps.setString(18, auditEntry.getRecoveryStrategy() != null ? auditEntry.getRecoveryStrategy().name() : null); + ps.setObject(19, auditEntry.getTransactionFlag()); + ps.setObject(20, auditEntry.getSystemChange()); + ps.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to add audit entry", e); + } + // Log but don't throw + } + + @Override + public List getAuditEntries() { + List entries = new ArrayList<>(); + try (Connection connection = dataSource.getConnection(); + Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(dialectHelper.getSelectHistorySqlString(auditTableName))) { + while (rs.next()) { + AuditEntry entry = toAuditEntry(rs); + entries.add(entry); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to read audit history", e); + } + return entries; + } + + @Override + public List getAuditEntriesForChange(String changeId) { + List entries = new ArrayList<>(); + try (Connection connection = dataSource.getConnection(); + PreparedStatement ps = connection.prepareStatement(dialectHelper.getSelectHistoryByChangeIdSqlString(auditTableName))) { + ps.setString(1, changeId); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + AuditEntry entry = toAuditEntry(rs); + entries.add(entry); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed to read audit history", e); + } + return entries; + } + + @Override + public long countAuditEntriesWithStatus(AuditEntry.Status status) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement ps = connection.prepareStatement(dialectHelper.getCountByStatusSqlString(auditTableName))) { + ps.setString(1, status.toString()); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return rs.getLong(1); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed to count audit entries with status: " + status, e); + } + return 0; + } + + @Override + public boolean hasAuditEntries() { + return !this.getAuditEntries().isEmpty(); + } + + @Override + public void clear() { + try(Connection connection = dataSource.getConnection(); + Statement stmt = connection.createStatement()) { + stmt.executeUpdate(dialectHelper.getDeleteAllSqlString(auditTableName)); + } catch (SQLException e) { + throw new RuntimeException("Failed to clear audit entries", e); + } + } + + private AuditEntry toAuditEntry(ResultSet rs) throws SQLException { + return new AuditEntry( + rs.getString("execution_id"), + rs.getString("stage_id"), + rs.getString("change_id"), + rs.getString("author"), + rs.getTimestamp("created_at").toLocalDateTime(), + rs.getString("state") != null ? AuditEntry.Status.valueOf(rs.getString("state")) : null, + rs.getString("type") != null ? AuditEntry.ChangeType.valueOf(rs.getString("type")) : null, + rs.getString("invoked_class"), + rs.getString("invoked_method"), + rs.getString("source_file"), + rs.getLong("execution_millis"), + rs.getString("execution_hostname"), + rs.getString("metadata"), + rs.getBoolean("system_change"), + rs.getString("error_trace"), + AuditTxType.fromString(rs.getString("tx_strategy")), + rs.getString("target_system_id"), + rs.getString("change_order"), + rs.getString("recovery_strategy") != null ? io.flamingock.api.RecoveryStrategy.valueOf(rs.getString("recovery_strategy")) : null, + rs.getObject("transaction_flag") != null ? rs.getBoolean("transaction_flag") : null + ); + } +} diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlDialectHelper.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlDialectHelper.java new file mode 100644 index 000000000..93b7322ee --- /dev/null +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlDialectHelper.java @@ -0,0 +1,459 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.sql.kit; + +import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.SqlDialectFactory; +import io.flamingock.internal.core.external.store.lock.LockStatus; + +import java.sql.*; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +public final class SqlDialectHelper { + + final private SqlDialect sqlDialect; + + public SqlDialectHelper(Connection connection) { + this.sqlDialect = SqlDialectFactory.getSqlDialect(connection); + } + + public String getInsertSqlString(String tableName) { + return String.format( + "INSERT INTO %s (" + + "execution_id, stage_id, change_id, author, created_at, state, invoked_class, invoked_method, source_file, metadata, " + + "execution_millis, execution_hostname, error_trace, type, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag, system_change" + + ") VALUES (" + + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?" + + ")", tableName); + } + + public String getSelectHistorySqlString(String tableName) { + return String.format( + "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + + "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + + "FROM %s " + + "ORDER BY id ASC", tableName); + } + public String getSelectHistoryByChangeIdSqlString(String tableName) { + return String.format( + "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + + "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + + "FROM %s " + + "WHERE change_id = ? " + + "ORDER BY id ASC", tableName); + } + + public String getCountByStatusSqlString(String tableName) { + return String.format( + "SELECT COUNT(change_id) " + + "FROM %s " + + "WHERE state = ?", tableName); + } + + public String getDeleteAllSqlString(String tableName) { + return String.format("DELETE FROM %s", tableName); + } + + + + public String getInsertOrUpdateLockSqlString(String tableName) { + switch (sqlDialect) { + case MYSQL: + case MARIADB: + return String.format( + "INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE status = VALUES(status), owner = VALUES(owner), expires_at = VALUES(expires_at)", + tableName); + case POSTGRESQL: + return String.format( + "INSERT INTO %s (\"key\", status, owner, expires_at) VALUES (?, ?, ?, ?) " + + "ON CONFLICT (\"key\") DO UPDATE SET status = EXCLUDED.status, owner = EXCLUDED.owner, expires_at = EXCLUDED.expires_at", + tableName); + case SQLITE: + return String.format( + "INSERT OR REPLACE INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?)", + tableName); + case SQLSERVER: + return String.format( + "BEGIN TRANSACTION; " + + "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE [key] = ?; " + + "IF @@ROWCOUNT = 0 " + + "BEGIN " + + "INSERT INTO %s ([key], status, owner, expires_at) VALUES (?, ?, ?, ?) " + + "END; " + + "COMMIT TRANSACTION;", + tableName, tableName); + case SYBASE: + return String.format( + "BEGIN TRAN " + + "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " + + "IF @@ROWCOUNT = 0 " + + "BEGIN " + + " INSERT INTO %s (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?); " + + "END " + + "COMMIT TRAN", + tableName, tableName + ); + case ORACLE: + return String.format( + "MERGE INTO %s t USING (SELECT ? AS \"key\", ? AS status, ? AS owner, ? AS expires_at FROM dual) s " + + "ON (t.\"key\" = s.\"key\") " + + "WHEN MATCHED THEN UPDATE SET t.status = s.status, t.owner = s.owner, t.expires_at = s.expires_at " + + "WHEN NOT MATCHED THEN INSERT (\"key\", status, owner, expires_at) VALUES (s.\"key\", s.status, s.owner, s.expires_at)", + tableName); + case H2: + return String.format( + "MERGE INTO %s (\"key\", status, owner, expires_at) KEY (\"key\") VALUES (?, ?, ?, ?)", + tableName); + case DB2: + // Use a VALUES-derived table and a target alias for DB2 to avoid parsing issues + return String.format( + "MERGE INTO %s tgt USING (VALUES (?, ?, ?, ?)) src(lock_key, status, owner, expires_at) " + + "ON (tgt.lock_key = src.lock_key) " + + "WHEN MATCHED THEN UPDATE SET status = src.status, owner = src.owner, expires_at = src.expires_at " + + "WHEN NOT MATCHED THEN INSERT (lock_key, status, owner, expires_at) VALUES (src.lock_key, src.status, src.owner, src.expires_at)", + tableName); + case FIREBIRD: + return String.format("UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?", tableName); + case INFORMIX: + // Informix doesn't support ON DUPLICATE KEY UPDATE + // Use a procedural approach similar to SQL Server + return String.format( + "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " + + "INSERT INTO %s (lock_key, status, owner, expires_at) " + + "SELECT ?, ?, ?, ? FROM sysmaster:sysdual " + + "WHERE NOT EXISTS (SELECT 1 FROM %s WHERE lock_key = ?)", + tableName, tableName, tableName); + default: + throw new UnsupportedOperationException("Dialect not supported for upsert: " + sqlDialect.name()); + } + } + + public void upsertLockEntry(Connection conn, String tableName, String key, String owner, LocalDateTime expiresAt) throws SQLException { + String sql = getInsertOrUpdateLockSqlString(tableName); + + if (sqlDialect == SqlDialect.DB2) { + // UPDATE first + try (PreparedStatement update = conn.prepareStatement( + "UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?")) { + update.setString(1, LockStatus.LOCK_HELD.name()); + update.setString(2, owner); + update.setTimestamp(3, Timestamp.valueOf(expiresAt)); + update.setString(4, key); + int updated = update.executeUpdate(); + if (updated > 0) { + return; + } + } + + // If no row updated, try INSERT + try (PreparedStatement insert = conn.prepareStatement( + "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { + insert.setString(1, key); + insert.setString(2, LockStatus.LOCK_HELD.name()); + insert.setString(3, owner); + insert.setTimestamp(4, Timestamp.valueOf(expiresAt)); + insert.executeUpdate(); + } + return; + } + + if (getSqlDialect() == SqlDialect.INFORMIX) { + // Try UPDATE first + try (PreparedStatement update = conn.prepareStatement( + "UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?")) { + update.setString(1, LockStatus.LOCK_HELD.name()); + update.setString(2, owner); + update.setTimestamp(3, Timestamp.valueOf(expiresAt)); + update.setString(4, key); + int updated = update.executeUpdate(); + if (updated > 0) { + return; + } + } + + // If no row updated, try INSERT + try (PreparedStatement insert = conn.prepareStatement( + "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { + insert.setString(1, key); + insert.setString(2, LockStatus.LOCK_HELD.name()); + insert.setString(3, owner); + insert.setTimestamp(4, Timestamp.valueOf(expiresAt)); + insert.executeUpdate(); + } + return; + } + + if (getSqlDialect() == SqlDialect.SQLSERVER) { + // For SQL Server/Sybase, use Statement and format SQL + try (Statement stmt = conn.createStatement()) { + String formattedSql = sql + .replaceFirst("\\?", "'" + LockStatus.LOCK_HELD.name() + "'") + .replaceFirst("\\?", "'" + owner + "'") + .replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'") + .replaceFirst("\\?", "'" + key + "'") + .replaceFirst("\\?", "'" + key + "'") + .replaceFirst("\\?", "'" + LockStatus.LOCK_HELD.name() + "'") + .replaceFirst("\\?", "'" + owner + "'") + .replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'"); + stmt.execute(formattedSql); + } + return; + } + + if (sqlDialect == SqlDialect.FIREBIRD) { + try (PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setString(1, LockStatus.LOCK_HELD.name()); + ps.setString(2, owner); + ps.setTimestamp(3, Timestamp.valueOf(expiresAt)); + ps.setString(4, key); + int updated = ps.executeUpdate(); + if (updated == 0) { + String insertSql = "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)"; + try (PreparedStatement ins = conn.prepareStatement(insertSql)) { + ins.setString(1, key); + ins.setString(2, LockStatus.LOCK_HELD.name()); + ins.setString(3, owner); + ins.setTimestamp(4, Timestamp.valueOf(expiresAt)); + ins.executeUpdate(); + } + } + } + return; + } + + if (sqlDialect == SqlDialect.SYBASE) { + // The lock was already deleted in acquireLockQuery for Sybase + try (PreparedStatement insert = conn.prepareStatement( + "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { + insert.setString(1, key); + insert.setString(2, LockStatus.LOCK_HELD.name()); + insert.setString(3, owner); + insert.setTimestamp(4, Timestamp.valueOf(expiresAt)); + insert.executeUpdate(); + } + return; + } + + + // Default case for other dialects + try (PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setString(1, key); + ps.setString(2, LockStatus.LOCK_HELD.name()); + ps.setString(3, owner); + ps.setTimestamp(4, Timestamp.valueOf(expiresAt)); + ps.executeUpdate(); + } + } + + public String getSelectLockSqlString(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + case H2: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ?", tableName); + case DB2: + // Select lock_key as the first column (getLockEntry expects rs.getString(1) to be the key) + return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); + case SQLSERVER: + return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName); + case SYBASE: + return String.format( + "SELECT lock_key, status, owner, expires_at " + + "FROM %s HOLDLOCK " + + "WHERE lock_key = ?", + tableName + ); case ORACLE: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ? FOR UPDATE", tableName); + case INFORMIX: + return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); + case FIREBIRD: + return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); + default: + return String.format("SELECT `key`, status, owner, expires_at FROM %s WHERE `key` = ?", tableName); + } + } + + public String getSelectAllLocksSqlString(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + case H2: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s", tableName); + case SQLSERVER: + return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK)", tableName); + case SYBASE: + return String.format( + "SELECT lock_key, status, owner, expires_at " + + "FROM %s HOLDLOCK", + tableName + ); + case ORACLE: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s FOR UPDATE", tableName); + case DB2: + case INFORMIX: + case FIREBIRD: + return String.format("SELECT lock_key, status, owner, expires_at FROM %s", tableName); + default: + return String.format("SELECT `key`, status, owner, expires_at FROM %s", tableName); + } + } + + public String getDeleteLockSqlString(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName); + case INFORMIX: + case DB2: + case FIREBIRD: + case SYBASE: + return String.format("DELETE FROM %s WHERE lock_key = ?", tableName); + case SQLSERVER: + return String.format("DELETE FROM %s WHERE [key] = ?", tableName); + case ORACLE: + return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName); + default: // MYSQL, MARIADB, SQLITE, H2 + return String.format("DELETE FROM %s WHERE `key` = ?", tableName); + } + } + + public void disableForeignKeyChecks(Connection conn) throws SQLException { + switch (sqlDialect) { + case MYSQL: + case MARIADB: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET FOREIGN_KEY_CHECKS=0"); + } + break; + case SQLITE: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("PRAGMA foreign_keys = OFF"); + } + break; + case H2: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET REFERENTIAL_INTEGRITY FALSE"); + } + break; + case SQLSERVER: + case SYBASE: + case FIREBIRD: + // No hay un comando global; hay que eliminar las FK constraints antes de dropear tablas + dropAllForeignKeys(conn); + break; + default: + // POSTGRESQL, ORACLE, DB2, INFORMIX: el DROP maneja dependencias por sí solo + break; + } + } + + public void enableForeignKeyChecks(Connection conn) throws SQLException { + switch (sqlDialect) { + case MYSQL: + case MARIADB: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET FOREIGN_KEY_CHECKS=1"); + } + break; + case SQLITE: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("PRAGMA foreign_keys = ON"); + } + break; + case H2: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET REFERENTIAL_INTEGRITY TRUE"); + } + break; + default: + break; + } + } + + private void dropAllForeignKeys(Connection conn) throws SQLException { + // Para SQL Server, Sybase y Firebird: eliminar FK constraints de todas las tablas de usuario + DatabaseMetaData meta = conn.getMetaData(); + String schema = conn.getSchema(); + String catalog = conn.getCatalog(); + + try (ResultSet tables = meta.getTables(catalog, schema, "%", new String[]{"TABLE"})) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + try (ResultSet fks = meta.getExportedKeys(catalog, schema, tableName)) { + while (fks.next()) { + String fkName = fks.getString("FK_NAME"); + String fkTable = fks.getString("FKTABLE_NAME"); + if (fkName != null) { + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate( + "ALTER TABLE " + fkTable + " DROP CONSTRAINT " + fkName); + } catch (SQLException e) { + // Ignorar si ya fue eliminado + } + } + } + } + } + } + } + + public String getDropTableSql(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + return "DROP TABLE IF EXISTS " + tableName + " CASCADE"; + case ORACLE: + return "DROP TABLE " + tableName + " CASCADE CONSTRAINTS PURGE"; + case SQLSERVER: + // IF OBJECT_ID funciona desde SQL Server 2005, más compatible que IF EXISTS (2016+) + return "IF OBJECT_ID('" + tableName + "', 'U') IS NOT NULL DROP TABLE " + tableName; + case SYBASE: + // Sybase no tiene IF EXISTS en DROP TABLE + return "DROP TABLE " + tableName; + default: + // MYSQL, MARIADB, SQLITE, H2, DB2, INFORMIX, FIREBIRD + return "DROP TABLE IF EXISTS " + tableName; + } + } + + public List getUserTables(Connection conn) throws SQLException { + List tables = new ArrayList<>(); + DatabaseMetaData meta = conn.getMetaData(); + String schema = conn.getSchema(); + String catalog = conn.getCatalog(); + + // Para Informix: getSchema() devuelve null, usar el nombre de usuario como schema + if (sqlDialect == SqlDialect.INFORMIX && schema == null) { + schema = meta.getUserName(); + } + + try (ResultSet rs = meta.getTables(catalog, schema, "%", new String[]{"TABLE"})) { + while (rs.next()) { + String tableName = rs.getString("TABLE_NAME"); + // Para Firebird: filtrar tablas del sistema (RDB$, MON$, SEC$) + String upperName = tableName.toUpperCase(); + if (sqlDialect == SqlDialect.FIREBIRD && (upperName.startsWith("RDB$") || upperName.startsWith("MON$") || upperName.startsWith("SEC$"))) { + continue; + } + tables.add(tableName); + } + } + return tables; + } + + public SqlDialect getSqlDialect() { + return sqlDialect; + } +} diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java new file mode 100644 index 000000000..a495d6f29 --- /dev/null +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java @@ -0,0 +1,261 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.sql.kit; + +import io.flamingock.core.kit.lock.LockStorage; +import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.core.external.store.lock.LockAcquisition; +import io.flamingock.internal.core.external.store.lock.LockKey; +import io.flamingock.internal.core.external.store.lock.LockServiceException; +import io.flamingock.internal.core.external.store.lock.LockStatus; +import io.flamingock.internal.core.external.store.lock.community.CommunityLockEntry; +import io.flamingock.internal.util.id.RunnerId; + +import javax.sql.DataSource; +import java.sql.*; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static io.flamingock.internal.util.constants.CommunityPersistenceConstants.DEFAULT_LOCK_STORE_NAME; + +/** + * SQL implementation of LockStorage for real database testing. + * Only depends on SQL client/database and core Flamingock classes. + * Does not depend on SQL-specific Flamingock components like SqlTargetSystem. + */ +public class SqlLockStorage implements LockStorage { + + private final String lockTableName; + private final Map metadata = new ConcurrentHashMap<>(); + private final DataSource dataSource; + private final SqlDialectHelper dialectHelper; + + public SqlLockStorage(DataSource dataSource) throws SQLException { + this(dataSource, DEFAULT_LOCK_STORE_NAME); + } + + public SqlLockStorage(DataSource dataSource, String lockTableName) throws SQLException { + this.lockTableName = lockTableName; + this.dataSource = dataSource; + try (Connection conn = dataSource.getConnection()) { + this.dialectHelper = new SqlDialectHelper(conn); + } + } + + @Override + public void storeLock(LockKey key, LockAcquisition acquisition) { + String keyStr = key.toString(); + RunnerId owner = acquisition.getOwner(); + LocalDateTime expiresAt = LocalDateTime.now().plusNanos(acquisition.getAcquiredForMillis() * 1_000_000); + + Connection connection = null; + try { + connection = dataSource.getConnection(); + // For Informix, use shorter timeout and simpler transaction handling + // For Sybase we MUST disable auto-commit so HOLDLOCK works as intended + boolean isInformix = dialectHelper.getSqlDialect() == SqlDialect.INFORMIX; + boolean isSybase = dialectHelper.getSqlDialect() == SqlDialect.SYBASE; + connection.setAutoCommit(isInformix); // Informix uses autocommit + + try { + + if (isSybase) { + // For Sybase, use HOLDLOCK to prevent race conditions during lock check + String selectSql = "SELECT lock_key, status, owner, expires_at " + + "FROM " + lockTableName + " HOLDLOCK " + + "WHERE lock_key = ?"; + + CommunityLockEntry existing = null; + try (PreparedStatement ps = connection.prepareStatement(selectSql)) { + ps.setString(1, keyStr); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + existing = new CommunityLockEntry( + rs.getString("lock_key"), + LockStatus.valueOf(rs.getString("status")), + rs.getString("owner"), + rs.getTimestamp("expires_at").toLocalDateTime() + ); + } + } + } + + if (existing == null || + owner.toString().equals(existing.getOwner()) || + LocalDateTime.now().isAfter(existing.getExpiresAt())) { + // Delete existing lock first, then insert new one + try (PreparedStatement delete = connection.prepareStatement( + "DELETE FROM " + lockTableName + " WHERE lock_key = ?")) { + delete.setString(1, keyStr); + delete.executeUpdate(); + } + dialectHelper.upsertLockEntry(connection, lockTableName, keyStr, owner.toString(), expiresAt); + connection.commit(); + return; + } else { + connection.rollback(); + throw new LockServiceException("upsert", keyStr, + "Still locked by " + existing.getOwner() + " until " + existing.getExpiresAt()); + } + } + + CommunityLockEntry existing = getLockEntry(connection, keyStr); + if (existing == null || + owner.toString().equals(existing.getOwner()) || + LocalDateTime.now().isAfter(existing.getExpiresAt())) { + dialectHelper.upsertLockEntry(connection, lockTableName, keyStr, owner.toString(), expiresAt); + // Commit for all dialects except Informix (which uses auto-commit above) + if (dialectHelper.getSqlDialect() != SqlDialect.INFORMIX) { + connection.commit(); + } + } else { + if (dialectHelper.getSqlDialect() != SqlDialect.INFORMIX) { + connection.rollback(); + } + throw new LockServiceException("upsert", keyStr, + "Still locked by " + existing.getOwner() + " until " + existing.getExpiresAt()); + } + } catch (Exception e) { + if (dialectHelper.getSqlDialect() != SqlDialect.INFORMIX) { + connection.rollback(); + } + throw e; + } finally { + if (dialectHelper.getSqlDialect() != SqlDialect.INFORMIX) { + connection.setAutoCommit(true); + } + } + } catch (SQLException e) { + throw new LockServiceException("upsert", keyStr, e.getMessage()); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + // Log but don't throw + } + } + } + } + + @Override + public LockAcquisition getLock(LockKey key) { + try (Connection connection = dataSource.getConnection()) { + CommunityLockEntry entry = getLockEntry(connection, key.toString()); + if (entry != null) { + return new LockAcquisition(RunnerId.fromString(entry.getOwner()), + Timestamp.valueOf(entry.getExpiresAt()).getTime() - System.currentTimeMillis()); + } + } catch (SQLException e) { + // ignore + } + return null; + } + + private CommunityLockEntry getLockEntry(Connection conn, String key) throws SQLException { + try (PreparedStatement ps = conn.prepareStatement( + dialectHelper.getSelectLockSqlString(lockTableName))) { + + // Set query timeout for Informix to prevent long waits + if (dialectHelper.getSqlDialect() == SqlDialect.INFORMIX) { + ps.setQueryTimeout(5); + } + + ps.setString(1, key); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return new CommunityLockEntry( + rs.getString(1), + LockStatus.valueOf(rs.getString("status")), + rs.getString("owner"), + rs.getTimestamp("expires_at").toLocalDateTime() + ); + } + } + } + return null; + } + + @Override + public Map getAllLocks() { + Map locks = new HashMap<>(); + + try (Connection connection = dataSource.getConnection(); + Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(dialectHelper.getSelectAllLocksSqlString(lockTableName))) { + while (rs.next()) { + LockKey key = LockKey.fromString(rs.getString(1)); + LockAcquisition acquisition = new LockAcquisition( + RunnerId.fromString(rs.getString("owner")), + Timestamp.valueOf(rs.getTimestamp("expires_at").toLocalDateTime()).getTime() - System.currentTimeMillis() + ); + locks.put(key, acquisition); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to read locks from database", e); + } + + return locks; + } + + @Override + public void removeLock(LockKey key) { + try { + try (Connection connection = dataSource.getConnection(); + PreparedStatement ps = connection.prepareStatement( + dialectHelper.getDeleteLockSqlString(lockTableName))) { + + // Set query timeout for Informix to prevent long waits + if (dialectHelper.getSqlDialect() == SqlDialect.INFORMIX) { + ps.setQueryTimeout(5); + } + ps.setString(1, key.toString()); + ps.executeUpdate(); + } + } catch (SQLException e) { + // ignore + } + } + + @Override + public boolean hasLocks() { + return !this.getAllLocks().isEmpty(); + } + + @Override + public void clear() { + try(Connection connection = dataSource.getConnection(); + Statement stmt = connection.createStatement()) { + stmt.executeUpdate(dialectHelper.getDeleteAllSqlString(lockTableName)); + } catch (SQLException e) { + throw new RuntimeException("Failed to clear lock entries", e); + } + metadata.clear(); + } + + @Override + public void setMetadata(String key, Object value) { + metadata.put(key, value); + } + + @Override + public Object getMetadata(String key) { + return metadata.get(key); + } + +} diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java new file mode 100644 index 000000000..a53773f8a --- /dev/null +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java @@ -0,0 +1,88 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.sql.kit; + +import io.flamingock.core.kit.AbstractTestKit; +import io.flamingock.core.kit.audit.AuditStorage; +import io.flamingock.core.kit.lock.LockStorage; +import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.core.external.store.CommunityAuditStore; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +public class SqlTestKit extends AbstractTestKit { + + private final DataSource dataSource; + private final SqlDialectHelper dialectHelper; + + public SqlTestKit(AuditStorage auditStorage, LockStorage lockStorage, CommunityAuditStore auditStore, DataSource dataSource) throws SQLException { + super(auditStorage, lockStorage, auditStore); + this.dataSource = dataSource; + try (Connection conn = dataSource.getConnection()) { + this.dialectHelper = new SqlDialectHelper(conn); + } + } + + @Override + public void cleanUp() { + try { + try (Connection connection = dataSource.getConnection()) { + if (dialectHelper.getSqlDialect() == SqlDialect.H2) { + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate("DROP ALL OBJECTS"); + } + return; + } + List tables = dialectHelper.getUserTables(connection); + + if (tables.isEmpty()) { + return; + } + + dialectHelper.disableForeignKeyChecks(connection); + try { + for (String tableName : tables) { + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate(dialectHelper.getDropTableSql(tableName)); + } catch (SQLException e) { + // Para Sybase, ignorar si la tabla ya no existe + if (dialectHelper.getSqlDialect() != SqlDialect.SYBASE) { + throw e; + } + } + } + } finally { + dialectHelper.enableForeignKeyChecks(connection); + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Create a new SqlTestKit with client and database + */ + public static SqlTestKit create(CommunityAuditStore auditStore, DataSource dataSource) throws SQLException { + SqlAuditStorage auditStorage = new SqlAuditStorage(dataSource); + SqlLockStorage lockStorage = new SqlLockStorage(dataSource); + return new SqlTestKit(auditStorage, lockStorage, auditStore, dataSource); + } +} From 871afd5d104acb5d4dd379ab557e3ab239444508 Mon Sep 17 00:00:00 2001 From: bercianor Date: Sat, 21 Mar 2026 16:21:35 +0000 Subject: [PATCH 2/4] fix: move all DialectHelpers to sql-util --- .../store/sql/internal/SqlAuditor.java | 1 + .../sql/internal/SqlLockDialectHelper.java | 350 ---------------- .../store/sql/internal/SqlLockService.java | 1 + .../flamingock/sql/kit/SqlAuditStorage.java | 7 +- .../io/flamingock/sql/kit/SqlLockStorage.java | 7 +- .../io/flamingock/sql/kit/SqlTestKit.java | 5 +- .../SqlAuditorDialectHelper.java | 37 +- .../dialectHelpers/SqlLockDialectHelper.java} | 379 +++++++----------- .../SqlTestKitDialectHelper.java | 151 +++++++ 9 files changed, 340 insertions(+), 598 deletions(-) delete mode 100644 community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockDialectHelper.java rename {community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal => utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers}/SqlAuditorDialectHelper.java (91%) rename utils/{sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlDialectHelper.java => sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlLockDialectHelper.java} (66%) create mode 100644 utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlTestKitDialectHelper.java diff --git a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditor.java b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditor.java index ed716c199..e709b2d0b 100644 --- a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditor.java +++ b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditor.java @@ -19,6 +19,7 @@ import io.flamingock.internal.common.core.audit.AuditReader; import io.flamingock.internal.common.core.audit.AuditTxType; import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlAuditorDialectHelper; import io.flamingock.internal.core.external.store.audit.LifecycleAuditWriter; import io.flamingock.internal.util.Result; diff --git a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockDialectHelper.java b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockDialectHelper.java deleted file mode 100644 index be4950e0c..000000000 --- a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockDialectHelper.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Copyright 2025 Flamingock (https://www.flamingock.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.flamingock.store.sql.internal; - -import io.flamingock.internal.common.sql.SqlDialectFactory; -import io.flamingock.internal.common.sql.SqlDialect; -import io.flamingock.internal.core.external.store.lock.LockStatus; - -import java.sql.*; -import java.time.LocalDateTime; -import java.util.Objects; - -public final class SqlLockDialectHelper { - - final private SqlDialect sqlDialect; - - public SqlLockDialectHelper(Connection connection) throws SQLException { - this.sqlDialect = SqlDialectFactory.getSqlDialect(connection); - } - - public String getCreateTableSqlString(String tableName) { - switch (sqlDialect) { - case POSTGRESQL: - return String.format( - "CREATE TABLE IF NOT EXISTS %s (" + - "\"key\" VARCHAR(255) PRIMARY KEY," + - "status VARCHAR(32)," + - "owner VARCHAR(255)," + - "expires_at TIMESTAMP" + - ")", tableName); - case FIREBIRD: - return String.format( - "CREATE TABLE %s (" + - "lock_key VARCHAR(255) PRIMARY KEY, " + - "status VARCHAR(32), " + - "owner VARCHAR(255), " + - "expires_at TIMESTAMP" + - ")", - tableName); - case MYSQL: - case MARIADB: - case SQLITE: - case H2: - return String.format( - "CREATE TABLE IF NOT EXISTS %s (" + - "`key` VARCHAR(255) PRIMARY KEY," + - "status VARCHAR(32)," + - "owner VARCHAR(255)," + - "expires_at TIMESTAMP" + - ")", tableName); - case SQLSERVER: - return String.format( - "IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='%s' AND xtype='U') " + - "CREATE TABLE %s (" + - "[key] VARCHAR(255) PRIMARY KEY," + - "status VARCHAR(32)," + - "owner VARCHAR(255)," + - "expires_at DATETIME" + - ")", tableName, tableName); - case SYBASE: - return String.format( - "IF NOT EXISTS (SELECT 1 FROM sysobjects WHERE name='%s' AND type='U') " + - "BEGIN " + - " EXEC('CREATE TABLE %s (" + - " lock_key VARCHAR(255) NOT NULL PRIMARY KEY, " + - " status VARCHAR(32), " + - " owner VARCHAR(255), " + - " expires_at DATETIME" + - " )') " + - "END", - tableName, tableName - ); - case ORACLE: - return String.format( - "BEGIN EXECUTE IMMEDIATE 'CREATE TABLE %s (" + - "\"key\" VARCHAR2(255) PRIMARY KEY," + - "status VARCHAR2(32)," + - "owner VARCHAR2(255)," + - "expires_at TIMESTAMP" + - ")'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;", tableName); - case DB2: - return String.format( - "BEGIN " + - "DECLARE CONTINUE HANDLER FOR SQLSTATE '42710' BEGIN END; " + - "EXECUTE IMMEDIATE 'CREATE TABLE %s (" + - "lock_key VARCHAR(255) NOT NULL PRIMARY KEY, " + - "status VARCHAR(32), " + - "owner VARCHAR(255), " + - "expires_at TIMESTAMP)'; " + - "END", tableName); - case INFORMIX: - return String.format( - "CREATE TABLE %s (" + - "lock_key VARCHAR(255) PRIMARY KEY, " + - "status VARCHAR(32), " + - "owner VARCHAR(255), " + - "expires_at DATETIME YEAR TO FRACTION(3)" + - ")", tableName); - default: - throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name()); - } - } - - public String getSelectLockSqlString(String tableName) { - switch (sqlDialect) { - case POSTGRESQL: - return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ?", tableName); - case DB2: - // Select lock_key as the first column (getLockEntry expects rs.getString(1) to be the key) - return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); - case SQLSERVER: - return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName); - case SYBASE: - return String.format( - "SELECT lock_key, status, owner, expires_at " + - "FROM %s HOLDLOCK " + - "WHERE lock_key = ?", - tableName - ); case ORACLE: - return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ? FOR UPDATE", tableName); - case INFORMIX: - return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); - case FIREBIRD: - return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); - default: - return String.format("SELECT `key`, status, owner, expires_at FROM %s WHERE `key` = ?", tableName); - } - } - - public String getInsertOrUpdateLockSqlString(String tableName) { - switch (sqlDialect) { - case MYSQL: - case MARIADB: - return String.format( - "INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) " + - "ON DUPLICATE KEY UPDATE status = VALUES(status), owner = VALUES(owner), expires_at = VALUES(expires_at)", - tableName); - case POSTGRESQL: - return String.format( - "INSERT INTO %s (\"key\", status, owner, expires_at) VALUES (?, ?, ?, ?) " + - "ON CONFLICT (\"key\") DO UPDATE SET status = EXCLUDED.status, owner = EXCLUDED.owner, expires_at = EXCLUDED.expires_at", - tableName); - case SQLITE: - return String.format( - "INSERT OR REPLACE INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?)", - tableName); - case SQLSERVER: - return String.format( - "BEGIN TRANSACTION; " + - "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE [key] = ?; " + - "IF @@ROWCOUNT = 0 " + - "BEGIN " + - "INSERT INTO %s ([key], status, owner, expires_at) VALUES (?, ?, ?, ?) " + - "END; " + - "COMMIT TRANSACTION;", - tableName, tableName); - case SYBASE: - return String.format( - "BEGIN TRAN " + - "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " + - "IF @@ROWCOUNT = 0 " + - "BEGIN " + - " INSERT INTO %s (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?); " + - "END " + - "COMMIT TRAN", - tableName, tableName - ); - case ORACLE: - return String.format( - "MERGE INTO %s t USING (SELECT ? AS \"key\", ? AS status, ? AS owner, ? AS expires_at FROM dual) s " + - "ON (t.\"key\" = s.\"key\") " + - "WHEN MATCHED THEN UPDATE SET t.status = s.status, t.owner = s.owner, t.expires_at = s.expires_at " + - "WHEN NOT MATCHED THEN INSERT (\"key\", status, owner, expires_at) VALUES (s.\"key\", s.status, s.owner, s.expires_at)", - tableName); - case H2: - return String.format( - "MERGE INTO %s (`key`, status, owner, expires_at) KEY (`key`) VALUES (?, ?, ?, ?)", - tableName); - case DB2: - // Use a VALUES-derived table and a target alias for DB2 to avoid parsing issues - return String.format( - "MERGE INTO %s tgt USING (VALUES (?, ?, ?, ?)) src(lock_key, status, owner, expires_at) " + - "ON (tgt.lock_key = src.lock_key) " + - "WHEN MATCHED THEN UPDATE SET status = src.status, owner = src.owner, expires_at = src.expires_at " + - "WHEN NOT MATCHED THEN INSERT (lock_key, status, owner, expires_at) VALUES (src.lock_key, src.status, src.owner, src.expires_at)", - tableName); - case FIREBIRD: - return String.format("UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?", tableName); - case INFORMIX: - // Informix doesn't support ON DUPLICATE KEY UPDATE - // Use a procedural approach similar to SQL Server - return String.format( - "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " + - "INSERT INTO %s (lock_key, status, owner, expires_at) " + - "SELECT ?, ?, ?, ? FROM sysmaster:sysdual " + - "WHERE NOT EXISTS (SELECT 1 FROM %s WHERE lock_key = ?)", - tableName, tableName, tableName); - default: - throw new UnsupportedOperationException("Dialect not supported for upsert: " + sqlDialect.name()); - } - } - - public String getDeleteLockSqlString(String tableName) { - if (Objects.requireNonNull(sqlDialect) == SqlDialect.POSTGRESQL) { - return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName); - } - if (sqlDialect == SqlDialect.INFORMIX || sqlDialect == SqlDialect.DB2) { - return String.format("DELETE FROM %s WHERE lock_key = ?", tableName); - } - if (sqlDialect == SqlDialect.FIREBIRD) { - return String.format("DELETE FROM %s WHERE lock_key = ?", tableName); - } - return String.format("DELETE FROM %s WHERE `key` = ?", tableName); - } - - public void upsertLockEntry(Connection conn, String tableName, String key, String owner, LocalDateTime expiresAt) throws SQLException { - String sql = getInsertOrUpdateLockSqlString(tableName); - - if (sqlDialect == SqlDialect.DB2) { - // UPDATE first - try (PreparedStatement update = conn.prepareStatement( - "UPDATE " + tableName + " SET owner = ?, expires_at = ? WHERE lock_key = ?")) { - update.setString(1, owner); - update.setTimestamp(2, Timestamp.valueOf(expiresAt)); - update.setString(3, key); - int updated = update.executeUpdate(); - if (updated > 0) { - return; - } - } - - // If no row updated, try INSERT - try (PreparedStatement insert = conn.prepareStatement( - "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { - insert.setString(1, key); - insert.setString(2, LockStatus.LOCK_HELD.name()); - insert.setString(3, owner); - insert.setTimestamp(4, Timestamp.valueOf(expiresAt)); - insert.executeUpdate(); - } - return; - } - - if (getSqlDialect() == SqlDialect.INFORMIX) { - // Try UPDATE first - try (PreparedStatement update = conn.prepareStatement( - "UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?")) { - update.setString(1, LockStatus.LOCK_HELD.name()); - update.setString(2, owner); - update.setTimestamp(3, Timestamp.valueOf(expiresAt)); - update.setString(4, key); - int updated = update.executeUpdate(); - if (updated > 0) { - return; - } - } - - // If no row updated, try INSERT - try (PreparedStatement insert = conn.prepareStatement( - "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { - insert.setString(1, key); - insert.setString(2, LockStatus.LOCK_HELD.name()); - insert.setString(3, owner); - insert.setTimestamp(4, Timestamp.valueOf(expiresAt)); - insert.executeUpdate(); - } - return; - } - - if (getSqlDialect() == SqlDialect.SQLSERVER) { - // For SQL Server/Sybase, use Statement and format SQL - try (Statement stmt = conn.createStatement()) { - String formattedSql = sql - .replaceFirst("\\?", "'" + LockStatus.LOCK_HELD.name() + "'") - .replaceFirst("\\?", "'" + owner + "'") - .replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'") - .replaceFirst("\\?", "'" + key + "'") - .replaceFirst("\\?", "'" + key + "'") - .replaceFirst("\\?", "'" + LockStatus.LOCK_HELD.name() + "'") - .replaceFirst("\\?", "'" + owner + "'") - .replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'"); - stmt.execute(formattedSql); - } - return; - } - - if (sqlDialect == SqlDialect.FIREBIRD) { - String updateSql = getInsertOrUpdateLockSqlString(tableName); - try (PreparedStatement ps = conn.prepareStatement(updateSql)) { - ps.setString(1, LockStatus.LOCK_HELD.name()); - ps.setString(2, owner); - ps.setTimestamp(3, Timestamp.valueOf(expiresAt)); - ps.setString(4, key); - int updated = ps.executeUpdate(); - if (updated == 0) { - String insertSql = "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)"; - try (PreparedStatement ins = conn.prepareStatement(insertSql)) { - ins.setString(1, key); - ins.setString(2, LockStatus.LOCK_HELD.name()); - ins.setString(3, owner); - ins.setTimestamp(4, Timestamp.valueOf(expiresAt)); - ins.executeUpdate(); - } - } - } - return; - } - - if (sqlDialect == SqlDialect.SYBASE) { - // The lock was already deleted in acquireLockQuery for Sybase - try (PreparedStatement insert = conn.prepareStatement( - "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { - insert.setString(1, key); - insert.setString(2, LockStatus.LOCK_HELD.name()); - insert.setString(3, owner); - insert.setTimestamp(4, Timestamp.valueOf(expiresAt)); - insert.executeUpdate(); - } - return; - } - - - // Default case for other dialects - try (PreparedStatement ps = conn.prepareStatement(sql)) { - ps.setString(1, key); - ps.setString(2, LockStatus.LOCK_HELD.name()); - ps.setString(3, owner); - ps.setTimestamp(4, Timestamp.valueOf(expiresAt)); - ps.executeUpdate(); - } - } - - - public SqlDialect getSqlDialect() { - return sqlDialect; - } -} diff --git a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockService.java b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockService.java index 52ce8a36c..5ec2196e1 100644 --- a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockService.java +++ b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockService.java @@ -16,6 +16,7 @@ package io.flamingock.store.sql.internal; import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlLockDialectHelper; import io.flamingock.internal.core.external.store.lock.LockAcquisition; import io.flamingock.internal.core.external.store.lock.LockKey; import io.flamingock.internal.core.external.store.lock.LockServiceException; diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java index a0692cb3c..3c6b896c7 100644 --- a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java @@ -19,6 +19,7 @@ import io.flamingock.internal.common.core.audit.AuditEntry; import io.flamingock.internal.common.core.audit.AuditTxType; import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlAuditorDialectHelper; import javax.sql.DataSource; import java.sql.*; @@ -35,7 +36,7 @@ public class SqlAuditStorage implements AuditStorage { private final DataSource dataSource; - private final SqlDialectHelper dialectHelper; + private final SqlAuditorDialectHelper dialectHelper; private final String auditTableName; public SqlAuditStorage(DataSource dataSource) throws SQLException { @@ -46,7 +47,7 @@ public SqlAuditStorage(DataSource dataSource, String tableName) throws SQLExcept this.auditTableName = tableName; this.dataSource = dataSource; try (Connection conn = dataSource.getConnection()) { - this.dialectHelper = new SqlDialectHelper(conn); + this.dialectHelper = new SqlAuditorDialectHelper(conn); } } @@ -147,7 +148,7 @@ public boolean hasAuditEntries() { public void clear() { try(Connection connection = dataSource.getConnection(); Statement stmt = connection.createStatement()) { - stmt.executeUpdate(dialectHelper.getDeleteAllSqlString(auditTableName)); + stmt.executeUpdate(String.format("DELETE FROM %s", auditTableName)); } catch (SQLException e) { throw new RuntimeException("Failed to clear audit entries", e); } diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java index a495d6f29..e2ff5fa80 100644 --- a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java @@ -17,6 +17,7 @@ import io.flamingock.core.kit.lock.LockStorage; import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlLockDialectHelper; import io.flamingock.internal.core.external.store.lock.LockAcquisition; import io.flamingock.internal.core.external.store.lock.LockKey; import io.flamingock.internal.core.external.store.lock.LockServiceException; @@ -43,7 +44,7 @@ public class SqlLockStorage implements LockStorage { private final String lockTableName; private final Map metadata = new ConcurrentHashMap<>(); private final DataSource dataSource; - private final SqlDialectHelper dialectHelper; + private final SqlLockDialectHelper dialectHelper; public SqlLockStorage(DataSource dataSource) throws SQLException { this(dataSource, DEFAULT_LOCK_STORE_NAME); @@ -53,7 +54,7 @@ public SqlLockStorage(DataSource dataSource, String lockTableName) throws SQLExc this.lockTableName = lockTableName; this.dataSource = dataSource; try (Connection conn = dataSource.getConnection()) { - this.dialectHelper = new SqlDialectHelper(conn); + this.dialectHelper = new SqlLockDialectHelper(conn); } } @@ -241,7 +242,7 @@ public boolean hasLocks() { public void clear() { try(Connection connection = dataSource.getConnection(); Statement stmt = connection.createStatement()) { - stmt.executeUpdate(dialectHelper.getDeleteAllSqlString(lockTableName)); + stmt.executeUpdate(String.format("DELETE FROM %s", lockTableName)); } catch (SQLException e) { throw new RuntimeException("Failed to clear lock entries", e); } diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java index a53773f8a..55e7bfeb3 100644 --- a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java @@ -19,6 +19,7 @@ import io.flamingock.core.kit.audit.AuditStorage; import io.flamingock.core.kit.lock.LockStorage; import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlTestKitDialectHelper; import io.flamingock.internal.core.external.store.CommunityAuditStore; import javax.sql.DataSource; @@ -30,13 +31,13 @@ public class SqlTestKit extends AbstractTestKit { private final DataSource dataSource; - private final SqlDialectHelper dialectHelper; + private final SqlTestKitDialectHelper dialectHelper; public SqlTestKit(AuditStorage auditStorage, LockStorage lockStorage, CommunityAuditStore auditStore, DataSource dataSource) throws SQLException { super(auditStorage, lockStorage, auditStore); this.dataSource = dataSource; try (Connection conn = dataSource.getConnection()) { - this.dialectHelper = new SqlDialectHelper(conn); + this.dialectHelper = new SqlTestKitDialectHelper(conn); } } diff --git a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditorDialectHelper.java b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlAuditorDialectHelper.java similarity index 91% rename from community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditorDialectHelper.java rename to utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlAuditorDialectHelper.java index 924584a33..31a5ea4b5 100644 --- a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditorDialectHelper.java +++ b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlAuditorDialectHelper.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.flamingock.store.sql.internal; +package io.flamingock.internal.common.sql.dialectHelpers; import io.flamingock.internal.common.sql.SqlDialectFactory; import io.flamingock.internal.common.sql.SqlDialect; @@ -279,19 +279,36 @@ public String getCreateTableSqlString(String tableName) { public String getInsertSqlString(String tableName) { return String.format( - "INSERT INTO %s (" + - "execution_id, stage_id, change_id, author, created_at, state, invoked_class, invoked_method, source_file, metadata, " + - "execution_millis, execution_hostname, error_trace, type, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag, system_change" + - ") VALUES (" + - "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?" + - ")", tableName); + "INSERT INTO %s (" + + "execution_id, stage_id, change_id, author, created_at, state, invoked_class, invoked_method, source_file, metadata, " + + "execution_millis, execution_hostname, error_trace, type, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag, system_change" + + ") VALUES (" + + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?" + + ")", tableName); } public String getSelectHistorySqlString(String tableName) { return String.format( - "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + - "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + - "FROM %s ORDER BY id ASC", tableName); + "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + + "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + + "FROM %s " + + "ORDER BY id ASC", tableName); + } + + public String getSelectHistoryByChangeIdSqlString(String tableName) { + return String.format( + "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + + "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + + "FROM %s " + + "WHERE change_id = ? " + + "ORDER BY id ASC", tableName); + } + + public String getCountByStatusSqlString(String tableName) { + return String.format( + "SELECT COUNT(change_id) " + + "FROM %s " + + "WHERE state = ?", tableName); } private String getAutoIncrementType() { diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlDialectHelper.java b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlLockDialectHelper.java similarity index 66% rename from utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlDialectHelper.java rename to utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlLockDialectHelper.java index 93b7322ee..a4672f885 100644 --- a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlDialectHelper.java +++ b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlLockDialectHelper.java @@ -1,5 +1,5 @@ /* - * Copyright 2026 Flamingock (https://www.flamingock.io) + * Copyright 2025 Flamingock (https://www.flamingock.io) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,64 +13,157 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.flamingock.sql.kit; +package io.flamingock.internal.common.sql.dialectHelpers; -import io.flamingock.internal.common.sql.SqlDialect; import io.flamingock.internal.common.sql.SqlDialectFactory; +import io.flamingock.internal.common.sql.SqlDialect; import io.flamingock.internal.core.external.store.lock.LockStatus; import java.sql.*; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -public final class SqlDialectHelper { +public final class SqlLockDialectHelper { final private SqlDialect sqlDialect; - public SqlDialectHelper(Connection connection) { + public SqlLockDialectHelper(Connection connection) throws SQLException { this.sqlDialect = SqlDialectFactory.getSqlDialect(connection); } - public String getInsertSqlString(String tableName) { - return String.format( - "INSERT INTO %s (" + - "execution_id, stage_id, change_id, author, created_at, state, invoked_class, invoked_method, source_file, metadata, " + - "execution_millis, execution_hostname, error_trace, type, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag, system_change" + - ") VALUES (" + - "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?" + - ")", tableName); - } - - public String getSelectHistorySqlString(String tableName) { - return String.format( - "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + - "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + - "FROM %s " + - "ORDER BY id ASC", tableName); + public String getCreateTableSqlString(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + return String.format( + "CREATE TABLE IF NOT EXISTS %s (" + + "\"key\" VARCHAR(255) PRIMARY KEY," + + "status VARCHAR(32)," + + "owner VARCHAR(255)," + + "expires_at TIMESTAMP" + + ")", tableName); + case FIREBIRD: + return String.format( + "CREATE TABLE %s (" + + "lock_key VARCHAR(255) PRIMARY KEY, " + + "status VARCHAR(32), " + + "owner VARCHAR(255), " + + "expires_at TIMESTAMP" + + ")", + tableName); + case MYSQL: + case MARIADB: + case SQLITE: + case H2: + return String.format( + "CREATE TABLE IF NOT EXISTS %s (" + + "`key` VARCHAR(255) PRIMARY KEY," + + "status VARCHAR(32)," + + "owner VARCHAR(255)," + + "expires_at TIMESTAMP" + + ")", tableName); + case SQLSERVER: + return String.format( + "IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='%s' AND xtype='U') " + + "CREATE TABLE %s (" + + "[key] VARCHAR(255) PRIMARY KEY," + + "status VARCHAR(32)," + + "owner VARCHAR(255)," + + "expires_at DATETIME" + + ")", tableName, tableName); + case SYBASE: + return String.format( + "IF NOT EXISTS (SELECT 1 FROM sysobjects WHERE name='%s' AND type='U') " + + "BEGIN " + + " EXEC('CREATE TABLE %s (" + + " lock_key VARCHAR(255) NOT NULL PRIMARY KEY, " + + " status VARCHAR(32), " + + " owner VARCHAR(255), " + + " expires_at DATETIME" + + " )') " + + "END", + tableName, tableName + ); + case ORACLE: + return String.format( + "BEGIN EXECUTE IMMEDIATE 'CREATE TABLE %s (" + + "\"key\" VARCHAR2(255) PRIMARY KEY," + + "status VARCHAR2(32)," + + "owner VARCHAR2(255)," + + "expires_at TIMESTAMP" + + ")'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;", tableName); + case DB2: + return String.format( + "BEGIN " + + "DECLARE CONTINUE HANDLER FOR SQLSTATE '42710' BEGIN END; " + + "EXECUTE IMMEDIATE 'CREATE TABLE %s (" + + "lock_key VARCHAR(255) NOT NULL PRIMARY KEY, " + + "status VARCHAR(32), " + + "owner VARCHAR(255), " + + "expires_at TIMESTAMP)'; " + + "END", tableName); + case INFORMIX: + return String.format( + "CREATE TABLE %s (" + + "lock_key VARCHAR(255) PRIMARY KEY, " + + "status VARCHAR(32), " + + "owner VARCHAR(255), " + + "expires_at DATETIME YEAR TO FRACTION(3)" + + ")", tableName); + default: + throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name()); } - public String getSelectHistoryByChangeIdSqlString(String tableName) { - return String.format( - "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + - "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + - "FROM %s " + - "WHERE change_id = ? " + - "ORDER BY id ASC", tableName); } - public String getCountByStatusSqlString(String tableName) { - return String.format( - "SELECT COUNT(change_id) " + - "FROM %s " + - "WHERE state = ?", tableName); + public String getSelectLockSqlString(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + case H2: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ?", tableName); + case DB2: + // Select lock_key as the first column (getLockEntry expects rs.getString(1) to be the key) + return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); + case SQLSERVER: + return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName); + case SYBASE: + return String.format( + "SELECT lock_key, status, owner, expires_at " + + "FROM %s HOLDLOCK " + + "WHERE lock_key = ?", + tableName + ); case ORACLE: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ? FOR UPDATE", tableName); + case INFORMIX: + return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); + case FIREBIRD: + return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); + default: + return String.format("SELECT `key`, status, owner, expires_at FROM %s WHERE `key` = ?", tableName); + } } - public String getDeleteAllSqlString(String tableName) { - return String.format("DELETE FROM %s", tableName); + public String getSelectAllLocksSqlString(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + case H2: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s", tableName); + case SQLSERVER: + return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK)", tableName); + case SYBASE: + return String.format( + "SELECT lock_key, status, owner, expires_at " + + "FROM %s HOLDLOCK", + tableName + ); + case ORACLE: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s FOR UPDATE", tableName); + case DB2: + case INFORMIX: + case FIREBIRD: + return String.format("SELECT lock_key, status, owner, expires_at FROM %s", tableName); + default: + return String.format("SELECT `key`, status, owner, expires_at FROM %s", tableName); + } } - - public String getInsertOrUpdateLockSqlString(String tableName) { switch (sqlDialect) { case MYSQL: @@ -144,6 +237,23 @@ public String getInsertOrUpdateLockSqlString(String tableName) { } } + public String getDeleteLockSqlString(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + case ORACLE: + return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName); + case INFORMIX: + case DB2: + case FIREBIRD: + case SYBASE: + return String.format("DELETE FROM %s WHERE lock_key = ?", tableName); + case SQLSERVER: + return String.format("DELETE FROM %s WHERE [key] = ?", tableName); + default: // MYSQL, MARIADB, SQLITE, H2 + return String.format("DELETE FROM %s WHERE `key` = ?", tableName); + } + } + public void upsertLockEntry(Connection conn, String tableName, String key, String owner, LocalDateTime expiresAt) throws SQLException { String sql = getInsertOrUpdateLockSqlString(tableName); @@ -261,197 +371,6 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin } } - public String getSelectLockSqlString(String tableName) { - switch (sqlDialect) { - case POSTGRESQL: - case H2: - return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ?", tableName); - case DB2: - // Select lock_key as the first column (getLockEntry expects rs.getString(1) to be the key) - return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); - case SQLSERVER: - return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName); - case SYBASE: - return String.format( - "SELECT lock_key, status, owner, expires_at " + - "FROM %s HOLDLOCK " + - "WHERE lock_key = ?", - tableName - ); case ORACLE: - return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ? FOR UPDATE", tableName); - case INFORMIX: - return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); - case FIREBIRD: - return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName); - default: - return String.format("SELECT `key`, status, owner, expires_at FROM %s WHERE `key` = ?", tableName); - } - } - - public String getSelectAllLocksSqlString(String tableName) { - switch (sqlDialect) { - case POSTGRESQL: - case H2: - return String.format("SELECT \"key\", status, owner, expires_at FROM %s", tableName); - case SQLSERVER: - return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK)", tableName); - case SYBASE: - return String.format( - "SELECT lock_key, status, owner, expires_at " + - "FROM %s HOLDLOCK", - tableName - ); - case ORACLE: - return String.format("SELECT \"key\", status, owner, expires_at FROM %s FOR UPDATE", tableName); - case DB2: - case INFORMIX: - case FIREBIRD: - return String.format("SELECT lock_key, status, owner, expires_at FROM %s", tableName); - default: - return String.format("SELECT `key`, status, owner, expires_at FROM %s", tableName); - } - } - - public String getDeleteLockSqlString(String tableName) { - switch (sqlDialect) { - case POSTGRESQL: - return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName); - case INFORMIX: - case DB2: - case FIREBIRD: - case SYBASE: - return String.format("DELETE FROM %s WHERE lock_key = ?", tableName); - case SQLSERVER: - return String.format("DELETE FROM %s WHERE [key] = ?", tableName); - case ORACLE: - return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName); - default: // MYSQL, MARIADB, SQLITE, H2 - return String.format("DELETE FROM %s WHERE `key` = ?", tableName); - } - } - - public void disableForeignKeyChecks(Connection conn) throws SQLException { - switch (sqlDialect) { - case MYSQL: - case MARIADB: - try (Statement stmt = conn.createStatement()) { - stmt.executeUpdate("SET FOREIGN_KEY_CHECKS=0"); - } - break; - case SQLITE: - try (Statement stmt = conn.createStatement()) { - stmt.executeUpdate("PRAGMA foreign_keys = OFF"); - } - break; - case H2: - try (Statement stmt = conn.createStatement()) { - stmt.executeUpdate("SET REFERENTIAL_INTEGRITY FALSE"); - } - break; - case SQLSERVER: - case SYBASE: - case FIREBIRD: - // No hay un comando global; hay que eliminar las FK constraints antes de dropear tablas - dropAllForeignKeys(conn); - break; - default: - // POSTGRESQL, ORACLE, DB2, INFORMIX: el DROP maneja dependencias por sí solo - break; - } - } - - public void enableForeignKeyChecks(Connection conn) throws SQLException { - switch (sqlDialect) { - case MYSQL: - case MARIADB: - try (Statement stmt = conn.createStatement()) { - stmt.executeUpdate("SET FOREIGN_KEY_CHECKS=1"); - } - break; - case SQLITE: - try (Statement stmt = conn.createStatement()) { - stmt.executeUpdate("PRAGMA foreign_keys = ON"); - } - break; - case H2: - try (Statement stmt = conn.createStatement()) { - stmt.executeUpdate("SET REFERENTIAL_INTEGRITY TRUE"); - } - break; - default: - break; - } - } - - private void dropAllForeignKeys(Connection conn) throws SQLException { - // Para SQL Server, Sybase y Firebird: eliminar FK constraints de todas las tablas de usuario - DatabaseMetaData meta = conn.getMetaData(); - String schema = conn.getSchema(); - String catalog = conn.getCatalog(); - - try (ResultSet tables = meta.getTables(catalog, schema, "%", new String[]{"TABLE"})) { - while (tables.next()) { - String tableName = tables.getString("TABLE_NAME"); - try (ResultSet fks = meta.getExportedKeys(catalog, schema, tableName)) { - while (fks.next()) { - String fkName = fks.getString("FK_NAME"); - String fkTable = fks.getString("FKTABLE_NAME"); - if (fkName != null) { - try (Statement stmt = conn.createStatement()) { - stmt.executeUpdate( - "ALTER TABLE " + fkTable + " DROP CONSTRAINT " + fkName); - } catch (SQLException e) { - // Ignorar si ya fue eliminado - } - } - } - } - } - } - } - - public String getDropTableSql(String tableName) { - switch (sqlDialect) { - case POSTGRESQL: - return "DROP TABLE IF EXISTS " + tableName + " CASCADE"; - case ORACLE: - return "DROP TABLE " + tableName + " CASCADE CONSTRAINTS PURGE"; - case SQLSERVER: - // IF OBJECT_ID funciona desde SQL Server 2005, más compatible que IF EXISTS (2016+) - return "IF OBJECT_ID('" + tableName + "', 'U') IS NOT NULL DROP TABLE " + tableName; - case SYBASE: - // Sybase no tiene IF EXISTS en DROP TABLE - return "DROP TABLE " + tableName; - default: - // MYSQL, MARIADB, SQLITE, H2, DB2, INFORMIX, FIREBIRD - return "DROP TABLE IF EXISTS " + tableName; - } - } - - public List getUserTables(Connection conn) throws SQLException { - List tables = new ArrayList<>(); - DatabaseMetaData meta = conn.getMetaData(); - String schema = conn.getSchema(); - String catalog = conn.getCatalog(); - - // Para Informix: getSchema() devuelve null, usar el nombre de usuario como schema - if (sqlDialect == SqlDialect.INFORMIX && schema == null) { - schema = meta.getUserName(); - } - - try (ResultSet rs = meta.getTables(catalog, schema, "%", new String[]{"TABLE"})) { - while (rs.next()) { - String tableName = rs.getString("TABLE_NAME"); - // Para Firebird: filtrar tablas del sistema (RDB$, MON$, SEC$) - String upperName = tableName.toUpperCase(); - if (sqlDialect == SqlDialect.FIREBIRD && (upperName.startsWith("RDB$") || upperName.startsWith("MON$") || upperName.startsWith("SEC$"))) { - continue; - } - tables.add(tableName); - } - } - return tables; - } public SqlDialect getSqlDialect() { return sqlDialect; diff --git a/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlTestKitDialectHelper.java b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlTestKitDialectHelper.java new file mode 100644 index 000000000..61076b687 --- /dev/null +++ b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlTestKitDialectHelper.java @@ -0,0 +1,151 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.common.sql.dialectHelpers; + +import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.SqlDialectFactory; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +public final class SqlTestKitDialectHelper { + + final private SqlDialect sqlDialect; + + public SqlTestKitDialectHelper(Connection connection) { + this.sqlDialect = SqlDialectFactory.getSqlDialect(connection); + } + + public void disableForeignKeyChecks(Connection conn) throws SQLException { + switch (sqlDialect) { + case MYSQL: + case MARIADB: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET FOREIGN_KEY_CHECKS=0"); + } + break; + case SQLITE: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("PRAGMA foreign_keys = OFF"); + } + break; + case H2: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET REFERENTIAL_INTEGRITY FALSE"); + } + break; + case SQLSERVER: + case SYBASE: + case FIREBIRD: + dropAllForeignKeys(conn); + break; + default: + break; + } + } + + public void enableForeignKeyChecks(Connection conn) throws SQLException { + switch (sqlDialect) { + case MYSQL: + case MARIADB: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET FOREIGN_KEY_CHECKS=1"); + } + break; + case SQLITE: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("PRAGMA foreign_keys = ON"); + } + break; + case H2: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET REFERENTIAL_INTEGRITY TRUE"); + } + break; + default: + break; + } + } + + private void dropAllForeignKeys(Connection conn) throws SQLException { + DatabaseMetaData meta = conn.getMetaData(); + String schema = conn.getSchema(); + String catalog = conn.getCatalog(); + + try (ResultSet tables = meta.getTables(catalog, schema, "%", new String[]{"TABLE"})) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + try (ResultSet fks = meta.getExportedKeys(catalog, schema, tableName)) { + while (fks.next()) { + String fkName = fks.getString("FK_NAME"); + String fkTable = fks.getString("FKTABLE_NAME"); + if (fkName != null) { + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate( + "ALTER TABLE " + fkTable + " DROP CONSTRAINT " + fkName); + } catch (SQLException e) { + } + } + } + } + } + } + } + + public String getDropTableSql(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + return "DROP TABLE IF EXISTS " + tableName + " CASCADE"; + case ORACLE: + return "DROP TABLE " + tableName + " CASCADE CONSTRAINTS PURGE"; + case SQLSERVER: + return "IF OBJECT_ID('" + tableName + "', 'U') IS NOT NULL DROP TABLE " + tableName; + case SYBASE: + return "DROP TABLE " + tableName; + default: + // MYSQL, MARIADB, SQLITE, H2, DB2, INFORMIX, FIREBIRD + return "DROP TABLE IF EXISTS " + tableName; + } + } + + public List getUserTables(Connection conn) throws SQLException { + List tables = new ArrayList<>(); + DatabaseMetaData meta = conn.getMetaData(); + String schema = conn.getSchema(); + String catalog = conn.getCatalog(); + + if (sqlDialect == SqlDialect.INFORMIX && schema == null) { + schema = meta.getUserName(); + } + + try (ResultSet rs = meta.getTables(catalog, schema, "%", new String[]{"TABLE"})) { + while (rs.next()) { + String tableName = rs.getString("TABLE_NAME"); + String upperName = tableName.toUpperCase(); + if (sqlDialect == SqlDialect.FIREBIRD && (upperName.startsWith("RDB$") || upperName.startsWith("MON$") || upperName.startsWith("SEC$"))) { + continue; + } + tables.add(tableName); + } + } + return tables; + } + + public SqlDialect getSqlDialect() { + return sqlDialect; + } +} From 0d3b3c0bd1539cb04c338805a7b733df58f9d6a5 Mon Sep 17 00:00:00 2001 From: bercianor Date: Sat, 21 Mar 2026 16:31:49 +0000 Subject: [PATCH 3/4] fix: add missing dependency --- utils/sql-util/build.gradle.kts | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/sql-util/build.gradle.kts b/utils/sql-util/build.gradle.kts index d86ec391c..d468198c5 100644 --- a/utils/sql-util/build.gradle.kts +++ b/utils/sql-util/build.gradle.kts @@ -7,6 +7,7 @@ java { } dependencies { + implementation(project(":core:flamingock-core")) implementation("com.zaxxer:HikariCP:3.4.5") implementation("org.testcontainers:testcontainers-junit-jupiter:2.0.2") // SQL Testcontainers From e2fdd86ae4fef234a19bff39200735aa2b8d8f8d Mon Sep 17 00:00:00 2001 From: bercianor Date: Sat, 21 Mar 2026 16:49:21 +0000 Subject: [PATCH 4/4] fix: remove spanish comment --- .../src/main/java/io/flamingock/sql/kit/SqlTestKit.java | 1 - 1 file changed, 1 deletion(-) diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java index 55e7bfeb3..9b6020e14 100644 --- a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java @@ -63,7 +63,6 @@ public void cleanUp() { try (Statement stmt = connection.createStatement()) { stmt.executeUpdate(dialectHelper.getDropTableSql(tableName)); } catch (SQLException e) { - // Para Sybase, ignorar si la tabla ya no existe if (dialectHelper.getSqlDialect() != SqlDialect.SYBASE) { throw e; }