diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 726b391b7..c42917db4 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -461,6 +461,19 @@ jobs: FLAMINGOCK_JRELEASER_GPG_SECRET_KEY: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_SECRET_KEY }} FLAMINGOCK_JRELEASER_GPG_PASSPHRASE: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_PASSPHRASE }} + sql-test-kit: + needs: [ build ] + uses: ./.github/workflows/module-release-graalvm.yml + with: + module: sql-test-kit + secrets: + FLAMINGOCK_JRELEASER_GITHUB_TOKEN: ${{ secrets.FLAMINGOCK_JRELEASER_GITHUB_TOKEN }} + FLAMINGOCK_JRELEASER_MAVENCENTRAL_USERNAME: ${{ secrets.FLAMINGOCK_JRELEASER_MAVENCENTRAL_USERNAME }} + FLAMINGOCK_JRELEASER_MAVENCENTRAL_PASSWORD: ${{ secrets.FLAMINGOCK_JRELEASER_MAVENCENTRAL_PASSWORD }} + FLAMINGOCK_JRELEASER_GPG_PUBLIC_KEY: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_PUBLIC_KEY }} + FLAMINGOCK_JRELEASER_GPG_SECRET_KEY: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_SECRET_KEY }} + FLAMINGOCK_JRELEASER_GPG_PASSPHRASE: ${{ secrets.FLAMINGOCK_JRELEASER_GPG_PASSPHRASE }} + mongock-support: needs: [ build ] uses: ./.github/workflows/module-release-graalvm.yml @@ -545,6 +558,7 @@ jobs: dynamodb-test-kit, couchbase-util, sql-util, + sql-test-kit, mongock-support, mongock-importer-mongodb, mongock-importer-dynamodb, diff --git a/buildSrc/src/main/kotlin/flamingock.project-structure.gradle.kts b/buildSrc/src/main/kotlin/flamingock.project-structure.gradle.kts index ca15eaae0..dbdd50e62 100644 --- a/buildSrc/src/main/kotlin/flamingock.project-structure.gradle.kts +++ b/buildSrc/src/main/kotlin/flamingock.project-structure.gradle.kts @@ -65,7 +65,8 @@ val legacyProjects = setOf( val testKitsProjects = setOf( "mongodb-test-kit", - "dynamodb-test-kit" + "dynamodb-test-kit", + "sql-test-kit" ) val allProjects = coreProjects + cloudProjects + communityProjects + pluginProjects + targetSystemProjects + externalSystemProjects + utilProjects + legacyProjects + testKitsProjects diff --git a/community/flamingock-auditstore-sql/build.gradle.kts b/community/flamingock-auditstore-sql/build.gradle.kts index 4b1c51ae4..e0d0ff827 100644 --- a/community/flamingock-auditstore-sql/build.gradle.kts +++ b/community/flamingock-auditstore-sql/build.gradle.kts @@ -20,6 +20,7 @@ dependencies { testImplementation("org.testcontainers:testcontainers-postgresql:2.0.2") testImplementation("org.testcontainers:testcontainers-mariadb:2.0.2") testImplementation(project(":utils:test-util")) + testImplementation(project(":utils:sql-test-kit")) testImplementation("com.zaxxer:HikariCP:3.4.5") testImplementation("org.testcontainers:testcontainers-junit-jupiter:2.0.2") testImplementation("com.h2database:h2:2.2.224") diff --git a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditor.java b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditor.java index ed716c199..e709b2d0b 100644 --- a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditor.java +++ b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditor.java @@ -19,6 +19,7 @@ import io.flamingock.internal.common.core.audit.AuditReader; import io.flamingock.internal.common.core.audit.AuditTxType; import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlAuditorDialectHelper; import io.flamingock.internal.core.external.store.audit.LifecycleAuditWriter; import io.flamingock.internal.util.Result; diff --git a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockService.java b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockService.java index 52ce8a36c..5ec2196e1 100644 --- a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockService.java +++ b/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockService.java @@ -16,6 +16,7 @@ package io.flamingock.store.sql.internal; import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlLockDialectHelper; import io.flamingock.internal.core.external.store.lock.LockAcquisition; import io.flamingock.internal.core.external.store.lock.LockKey; import io.flamingock.internal.core.external.store.lock.LockServiceException; diff --git a/community/flamingock-auditstore-sql/src/test/java/io/flamingock/store/sql/SqlAuditStoreTest.java b/community/flamingock-auditstore-sql/src/test/java/io/flamingock/store/sql/SqlAuditStoreTest.java index 192ea6f11..024d9e251 100644 --- a/community/flamingock-auditstore-sql/src/test/java/io/flamingock/store/sql/SqlAuditStoreTest.java +++ b/community/flamingock-auditstore-sql/src/test/java/io/flamingock/store/sql/SqlAuditStoreTest.java @@ -17,23 +17,21 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; -import io.flamingock.internal.common.core.util.Deserializer; +import io.flamingock.common.test.pipeline.CodeChangeTestDefinition; +import io.flamingock.core.kit.TestKit; +import io.flamingock.core.kit.audit.AuditTestSupport; import io.flamingock.internal.common.sql.SqlDialect; -import io.flamingock.internal.core.builder.FlamingockFactory; import io.flamingock.internal.core.operation.OperationException; -import io.flamingock.internal.util.Trio; -import io.flamingock.internal.util.constants.CommunityPersistenceConstants; import io.flamingock.store.sql.changes.postgresql.failedWithoutRollback._001__create_index; import io.flamingock.store.sql.changes.postgresql.failedWithoutRollback._002__insert_document; import io.flamingock.store.sql.changes.postgresql.failedWithoutRollback._003__execution_with_exception; import io.flamingock.store.sql.changes.postgresql.happyPath._003__insert_another_document; import io.flamingock.targetsystem.sql.SqlTargetSystem; +import io.flamingock.sql.kit.SqlTestKit; import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.sqlite.SQLiteDataSource; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.junit.jupiter.Testcontainers; @@ -44,6 +42,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static io.flamingock.core.kit.audit.AuditEntryExpectation.*; import static org.junit.jupiter.api.Assertions.*; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -282,73 +281,37 @@ private Class[] getChangeClasses(String dialectName, String scenario) { @DisplayName("When standalone runs the AuditStore should persist the audit logs and the test data") void happyPathWithMockedPipeline(SqlDialect sqlDialect, String dialectName) throws Exception { context = setupTest(sqlDialect, dialectName); - - try (MockedStatic mocked = Mockito.mockStatic(Deserializer.class)) { - Class[] changeClasses = getChangeClasses(dialectName, "happyPath"); - - mocked.when(Deserializer::readMetadataFromFile).thenReturn(PipelineTestHelper.getPreviewPipeline( - new Trio<>(changeClasses[0], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[1], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[2], Collections.singletonList(Connection.class), null) - )); - - SqlTargetSystem targetSystem = new SqlTargetSystem("sql", context.dataSource); - SqlAuditStore auditStore = SqlAuditStore.from(targetSystem); - - FlamingockFactory.getCommunityBuilder() - .setAuditStore(auditStore) - .addTargetSystem(targetSystem) - .build() - .run(); - } - - // Verify audit logs - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement( - "SELECT change_id, state FROM " + CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME + " ORDER BY id ASC"); - ResultSet rs = ps.executeQuery()) { - - String[] expectedTaskIds = {"create-index", "insert-document", "insert-another-document"}; - int recordCount = 0; - int startedCount = 0; - int appliedCount = 0; - - while (rs.next()) { - String taskId = rs.getString("change_id"); - String state = rs.getString("state"); - assertTrue( - java.util.Arrays.asList(expectedTaskIds).contains(taskId), - "Unexpected change_id: " + taskId - ); - assertTrue( - state.equals("STARTED") || state.equals("APPLIED"), - "Unexpected state: " + state - ); - if (state.equals("STARTED")) startedCount++; - if (state.equals("APPLIED")) appliedCount++; - recordCount++; - } - - assertEquals(6, recordCount, "Audit log should have 6 records"); - assertEquals(3, startedCount, "Should have 3 STARTED records"); - assertEquals(3, appliedCount, "Should have 3 APPLIED records"); - } - - // Verify test data - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { - ps.setString(1, "test-client-Federico"); - try (ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Federico", rs.getString("name")); - } - ps.setString(1, "test-client-Jorge"); - try (ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Jorge", rs.getString("name")); - } - } - + SqlTargetSystem sqlTargetSystem = new SqlTargetSystem("sql", context.dataSource); + SqlAuditStore sqlAuditStore = SqlAuditStore.from(sqlTargetSystem); + TestKit testKit = SqlTestKit.create(sqlAuditStore, context.dataSource); + + Class[] changeClasses = getChangeClasses(dialectName, "happyPath"); + String[] expectedTaskIds = {"create-index", "insert-document", "insert-another-document"}; + //Given-When-Then + AuditTestSupport.withTestKit(testKit) + .GIVEN_Changes( + new CodeChangeTestDefinition(changeClasses[0], Collections.singletonList(Connection.class)), + new CodeChangeTestDefinition(changeClasses[1], Collections.singletonList(Connection.class)), + new CodeChangeTestDefinition(changeClasses[2], Collections.singletonList(Connection.class)) + ) + .WHEN(() -> testKit.createBuilder() + .setAuditStore(sqlAuditStore) + .addTargetSystem(sqlTargetSystem) + .build() + .run()) + .THEN_VerifyAuditSequenceStrict( + STARTED(expectedTaskIds[0]), + APPLIED(expectedTaskIds[0]), + STARTED(expectedTaskIds[1]), + APPLIED(expectedTaskIds[1]), + STARTED(expectedTaskIds[2]), + APPLIED(expectedTaskIds[2]) + ) + .run(); + + // Verify index exists and data state + SqlAuditTestHelper.verifyIndexExists(context); + verifyDataState(context, false); } @ParameterizedTest @@ -356,86 +319,40 @@ void happyPathWithMockedPipeline(SqlDialect sqlDialect, String dialectName) thro @DisplayName("When standalone runs the AuditStore and execution fails (with rollback method) should persist all the audit logs up to the failed one (ROLLED_BACK)") void failedWithRollback(SqlDialect sqlDialect, String dialectName) throws Exception { context = setupTest(sqlDialect, dialectName); - - try (MockedStatic mocked = Mockito.mockStatic(Deserializer.class)) { - Class[] changeClasses = getChangeClasses(dialectName, "failedWithRollback"); - - mocked.when(Deserializer::readMetadataFromFile).thenReturn(PipelineTestHelper.getPreviewPipeline( - new Trio<>(changeClasses[0], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[1], Collections.singletonList(Connection.class), Collections.singletonList(Connection.class)), - new Trio<>(changeClasses[2], Collections.singletonList(Connection.class), Collections.singletonList(Connection.class)) - )); - - SqlTargetSystem targetSystem = new SqlTargetSystem("sql", context.dataSource); - SqlAuditStore auditStore = SqlAuditStore.from(targetSystem); - - assertThrows(OperationException.class, () -> { - FlamingockFactory.getCommunityBuilder() - .setAuditStore(auditStore) - .addTargetSystem(targetSystem) - .build() - .run(); - }); - - // Verify audit sequence - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement( - "SELECT change_id, state FROM " + CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME + " ORDER BY id ASC"); - ResultSet rs = ps.executeQuery()) { - - assertTrue(rs.next()); - assertEquals("create-index", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("create-index", rs.getString("change_id")); - assertEquals("APPLIED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("insert-document", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("insert-document", rs.getString("change_id")); - assertEquals("APPLIED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("FAILED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("ROLLED_BACK", rs.getString("state")); - - assertFalse(rs.next()); - } - - // Verify index exists - SqlAuditTestHelper.verifyIndexExists(context); - - // Verify partial data - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { - ps.setString(1, "test-client-Federico"); - try (ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Federico", rs.getString("name")); - } - } - - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { - ps.setString(1, "test-client-Jorge"); - try (ResultSet rs = ps.executeQuery()) { - assertFalse(rs.next()); - } - } - } - + SqlTargetSystem sqlTargetSystem = new SqlTargetSystem("sql", context.dataSource); + SqlAuditStore sqlAuditStore = SqlAuditStore.from(sqlTargetSystem); + TestKit testKit = SqlTestKit.create(sqlAuditStore, context.dataSource); + + Class[] changeClasses = getChangeClasses(dialectName, "failedWithRollback"); + String[] expectedTaskIds = {"create-index", "insert-document", "execution-with-exception"}; + //Given-When-Then + AuditTestSupport.withTestKit(testKit) + .GIVEN_Changes( + new CodeChangeTestDefinition(changeClasses[0], Collections.singletonList(Connection.class), null), + new CodeChangeTestDefinition(changeClasses[1], Collections.singletonList(Connection.class), Collections.singletonList(Connection.class)), + new CodeChangeTestDefinition(changeClasses[2], Collections.singletonList(Connection.class), Collections.singletonList(Connection.class)) + ) + .WHEN(() -> assertThrows(OperationException.class, () -> { + testKit.createBuilder() + .setAuditStore(sqlAuditStore) + .addTargetSystem(sqlTargetSystem) + .build() + .run(); + })) + .THEN_VerifyAuditSequenceStrict( + STARTED(expectedTaskIds[0]), + APPLIED(expectedTaskIds[0]), + STARTED(expectedTaskIds[1]), + APPLIED(expectedTaskIds[1]), + STARTED(expectedTaskIds[2]), + FAILED(expectedTaskIds[2]), + ROLLED_BACK(expectedTaskIds[2]) + ) + .run(); + + // Verify index exists and data state + SqlAuditTestHelper.verifyIndexExists(context); + verifyDataState(context, true); } @ParameterizedTest @@ -443,72 +360,43 @@ void failedWithRollback(SqlDialect sqlDialect, String dialectName) throws Except @DisplayName("When standalone runs the AuditStore and execution fails (without rollback method) should persist all the audit logs up to the failed one (FAILED)") void failedWithoutRollback(SqlDialect sqlDialect, String dialectName) throws Exception { context = setupTest(sqlDialect, dialectName); - - try (MockedStatic mocked = Mockito.mockStatic(Deserializer.class)) { - Class[] changeClasses = getChangeClasses(dialectName, "failedWithoutRollback"); - - mocked.when(Deserializer::readMetadataFromFile).thenReturn(PipelineTestHelper.getPreviewPipeline( - new Trio<>(changeClasses[0], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[1], Collections.singletonList(Connection.class), null), - new Trio<>(changeClasses[2], Collections.singletonList(Connection.class), null) - )); - - SqlTargetSystem targetSystem = new SqlTargetSystem("sql", context.dataSource); - SqlAuditStore auditStore = SqlAuditStore.from(targetSystem); - - assertThrows(OperationException.class, () -> { - FlamingockFactory.getCommunityBuilder() - .setAuditStore(auditStore) - .addTargetSystem(targetSystem) - .build() - .run(); - }); - - // Verify audit sequence - try (Connection conn = context.dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement( - "SELECT change_id, state FROM " + CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME + " ORDER BY id ASC"); - ResultSet rs = ps.executeQuery()) { - - assertTrue(rs.next()); - assertEquals("create-index", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("create-index", rs.getString("change_id")); - assertEquals("APPLIED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("insert-document", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("insert-document", rs.getString("change_id")); - assertEquals("APPLIED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("STARTED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("FAILED", rs.getString("state")); - - assertTrue(rs.next()); - assertEquals("execution-with-exception", rs.getString("change_id")); - assertEquals("ROLLED_BACK", rs.getString("state")); - - assertFalse(rs.next()); - } - - // Verify index exists and data state - SqlAuditTestHelper.verifyIndexExists(context); - verifyPartialDataState(context); - } - + SqlTargetSystem sqlTargetSystem = new SqlTargetSystem("sql", context.dataSource); + SqlAuditStore sqlAuditStore = SqlAuditStore.from(sqlTargetSystem); + TestKit testKit = SqlTestKit.create(sqlAuditStore, context.dataSource); + + Class[] changeClasses = getChangeClasses(dialectName, "failedWithoutRollback"); + String[] expectedTaskIds = {"create-index", "insert-document", "execution-with-exception"}; + //Given-When-Then + AuditTestSupport.withTestKit(testKit) + .GIVEN_Changes( + new CodeChangeTestDefinition(changeClasses[0], Collections.singletonList(Connection.class), null), + new CodeChangeTestDefinition(changeClasses[1], Collections.singletonList(Connection.class), null), + new CodeChangeTestDefinition(changeClasses[2], Collections.singletonList(Connection.class), null) + ) + .WHEN(() -> assertThrows(OperationException.class, () -> { + testKit.createBuilder() + .setAuditStore(sqlAuditStore) + .addTargetSystem(sqlTargetSystem) + .build() + .run(); + })) + .THEN_VerifyAuditSequenceStrict( + STARTED(expectedTaskIds[0]), + APPLIED(expectedTaskIds[0]), + STARTED(expectedTaskIds[1]), + APPLIED(expectedTaskIds[1]), + STARTED(expectedTaskIds[2]), + FAILED(expectedTaskIds[2]), + ROLLED_BACK(expectedTaskIds[2]) + ) + .run(); + + // Verify index exists and data state + SqlAuditTestHelper.verifyIndexExists(context); + verifyDataState(context, true); } - private void verifyPartialDataState(TestContext context) throws SQLException { + private void verifyDataState(TestContext context, Boolean partial) throws SQLException { try (Connection conn = context.dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { ps.setString(1, "test-client-Federico"); @@ -522,7 +410,12 @@ private void verifyPartialDataState(TestContext context) throws SQLException { PreparedStatement ps = conn.prepareStatement("SELECT name FROM test_table WHERE id = ?")) { ps.setString(1, "test-client-Jorge"); try (ResultSet rs = ps.executeQuery()) { - assertFalse(rs.next()); + if (partial) { + assertFalse(rs.next()); + } else { + assertTrue(rs.next()); + assertEquals("Jorge", rs.getString("name")); + } } } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 6888f68b0..b1e25a385 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -162,6 +162,10 @@ include("utils:sql-util") project(":utils:sql-util").name = "sql-util" project(":utils:sql-util").projectDir = file("utils/sql-util") +include("utils:sql-test-kit") +project(":utils:sql-test-kit").name = "sql-test-kit" +project(":utils:sql-test-kit").projectDir = file("utils/sql-test-kit") + ////////////////////////////////////// // LEGACY ////////////////////////////////////// diff --git a/utils/sql-test-kit/build.gradle.kts b/utils/sql-test-kit/build.gradle.kts new file mode 100644 index 000000000..beb364614 --- /dev/null +++ b/utils/sql-test-kit/build.gradle.kts @@ -0,0 +1,13 @@ +dependencies { + implementation(project(":core:flamingock-core")) + implementation(project(":utils:sql-util")) + implementation(project(":utils:test-util")) +} + +description = "SQL TestKit for Flamingock testing" + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(8)) + } +} diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java new file mode 100644 index 000000000..3c6b896c7 --- /dev/null +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlAuditStorage.java @@ -0,0 +1,181 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.sql.kit; + +import io.flamingock.core.kit.audit.AuditStorage; +import io.flamingock.internal.common.core.audit.AuditEntry; +import io.flamingock.internal.common.core.audit.AuditTxType; +import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlAuditorDialectHelper; + +import javax.sql.DataSource; +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +import static io.flamingock.internal.util.constants.CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME; + +/** + * SQL implementation of AuditStorage for real database testing. + * Only depends on SQL client/database and core Flamingock classes. + * Does not depend on SQL-specific Flamingock components like SqlTargetSystem. + */ +public class SqlAuditStorage implements AuditStorage { + + private final DataSource dataSource; + private final SqlAuditorDialectHelper dialectHelper; + private final String auditTableName; + + public SqlAuditStorage(DataSource dataSource) throws SQLException { + this(dataSource, DEFAULT_AUDIT_STORE_NAME); + } + + public SqlAuditStorage(DataSource dataSource, String tableName) throws SQLException { + this.auditTableName = tableName; + this.dataSource = dataSource; + try (Connection conn = dataSource.getConnection()) { + this.dialectHelper = new SqlAuditorDialectHelper(conn); + } + } + + @Override + public void addAuditEntry(AuditEntry auditEntry) { + try (Connection connection = dataSource.getConnection()) { + // For Informix, ensure autoCommit is enabled for audit writes + if (dialectHelper.getSqlDialect() == SqlDialect.INFORMIX) { + connection.setAutoCommit(true); + } + + try (PreparedStatement ps = connection.prepareStatement( + dialectHelper.getInsertSqlString(auditTableName))) { + ps.setString(1, auditEntry.getExecutionId()); + ps.setString(2, auditEntry.getStageId()); + ps.setString(3, auditEntry.getTaskId()); + ps.setString(4, auditEntry.getAuthor()); + ps.setTimestamp(5, Timestamp.valueOf(auditEntry.getCreatedAt())); + ps.setString(6, auditEntry.getState() != null ? auditEntry.getState().name() : null); + ps.setString(7, auditEntry.getClassName()); + ps.setString(8, auditEntry.getMethodName()); + ps.setString(9, auditEntry.getSourceFile()); + ps.setString(10, auditEntry.getMetadata() != null ? auditEntry.getMetadata().toString() : null); + ps.setLong(11, auditEntry.getExecutionMillis()); + ps.setString(12, auditEntry.getExecutionHostname()); + ps.setString(13, auditEntry.getErrorTrace()); + ps.setString(14, auditEntry.getType() != null ? auditEntry.getType().name() : null); + ps.setString(15, auditEntry.getTxType() != null ? auditEntry.getTxType().name() : null); + ps.setString(16, auditEntry.getTargetSystemId()); + ps.setString(17, auditEntry.getOrder()); + ps.setString(18, auditEntry.getRecoveryStrategy() != null ? auditEntry.getRecoveryStrategy().name() : null); + ps.setObject(19, auditEntry.getTransactionFlag()); + ps.setObject(20, auditEntry.getSystemChange()); + ps.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to add audit entry", e); + } + // Log but don't throw + } + + @Override + public List getAuditEntries() { + List entries = new ArrayList<>(); + try (Connection connection = dataSource.getConnection(); + Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(dialectHelper.getSelectHistorySqlString(auditTableName))) { + while (rs.next()) { + AuditEntry entry = toAuditEntry(rs); + entries.add(entry); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to read audit history", e); + } + return entries; + } + + @Override + public List getAuditEntriesForChange(String changeId) { + List entries = new ArrayList<>(); + try (Connection connection = dataSource.getConnection(); + PreparedStatement ps = connection.prepareStatement(dialectHelper.getSelectHistoryByChangeIdSqlString(auditTableName))) { + ps.setString(1, changeId); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + AuditEntry entry = toAuditEntry(rs); + entries.add(entry); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed to read audit history", e); + } + return entries; + } + + @Override + public long countAuditEntriesWithStatus(AuditEntry.Status status) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement ps = connection.prepareStatement(dialectHelper.getCountByStatusSqlString(auditTableName))) { + ps.setString(1, status.toString()); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return rs.getLong(1); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed to count audit entries with status: " + status, e); + } + return 0; + } + + @Override + public boolean hasAuditEntries() { + return !this.getAuditEntries().isEmpty(); + } + + @Override + public void clear() { + try(Connection connection = dataSource.getConnection(); + Statement stmt = connection.createStatement()) { + stmt.executeUpdate(String.format("DELETE FROM %s", auditTableName)); + } catch (SQLException e) { + throw new RuntimeException("Failed to clear audit entries", e); + } + } + + private AuditEntry toAuditEntry(ResultSet rs) throws SQLException { + return new AuditEntry( + rs.getString("execution_id"), + rs.getString("stage_id"), + rs.getString("change_id"), + rs.getString("author"), + rs.getTimestamp("created_at").toLocalDateTime(), + rs.getString("state") != null ? AuditEntry.Status.valueOf(rs.getString("state")) : null, + rs.getString("type") != null ? AuditEntry.ChangeType.valueOf(rs.getString("type")) : null, + rs.getString("invoked_class"), + rs.getString("invoked_method"), + rs.getString("source_file"), + rs.getLong("execution_millis"), + rs.getString("execution_hostname"), + rs.getString("metadata"), + rs.getBoolean("system_change"), + rs.getString("error_trace"), + AuditTxType.fromString(rs.getString("tx_strategy")), + rs.getString("target_system_id"), + rs.getString("change_order"), + rs.getString("recovery_strategy") != null ? io.flamingock.api.RecoveryStrategy.valueOf(rs.getString("recovery_strategy")) : null, + rs.getObject("transaction_flag") != null ? rs.getBoolean("transaction_flag") : null + ); + } +} diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java new file mode 100644 index 000000000..e2ff5fa80 --- /dev/null +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlLockStorage.java @@ -0,0 +1,262 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.sql.kit; + +import io.flamingock.core.kit.lock.LockStorage; +import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlLockDialectHelper; +import io.flamingock.internal.core.external.store.lock.LockAcquisition; +import io.flamingock.internal.core.external.store.lock.LockKey; +import io.flamingock.internal.core.external.store.lock.LockServiceException; +import io.flamingock.internal.core.external.store.lock.LockStatus; +import io.flamingock.internal.core.external.store.lock.community.CommunityLockEntry; +import io.flamingock.internal.util.id.RunnerId; + +import javax.sql.DataSource; +import java.sql.*; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static io.flamingock.internal.util.constants.CommunityPersistenceConstants.DEFAULT_LOCK_STORE_NAME; + +/** + * SQL implementation of LockStorage for real database testing. + * Only depends on SQL client/database and core Flamingock classes. + * Does not depend on SQL-specific Flamingock components like SqlTargetSystem. + */ +public class SqlLockStorage implements LockStorage { + + private final String lockTableName; + private final Map metadata = new ConcurrentHashMap<>(); + private final DataSource dataSource; + private final SqlLockDialectHelper dialectHelper; + + public SqlLockStorage(DataSource dataSource) throws SQLException { + this(dataSource, DEFAULT_LOCK_STORE_NAME); + } + + public SqlLockStorage(DataSource dataSource, String lockTableName) throws SQLException { + this.lockTableName = lockTableName; + this.dataSource = dataSource; + try (Connection conn = dataSource.getConnection()) { + this.dialectHelper = new SqlLockDialectHelper(conn); + } + } + + @Override + public void storeLock(LockKey key, LockAcquisition acquisition) { + String keyStr = key.toString(); + RunnerId owner = acquisition.getOwner(); + LocalDateTime expiresAt = LocalDateTime.now().plusNanos(acquisition.getAcquiredForMillis() * 1_000_000); + + Connection connection = null; + try { + connection = dataSource.getConnection(); + // For Informix, use shorter timeout and simpler transaction handling + // For Sybase we MUST disable auto-commit so HOLDLOCK works as intended + boolean isInformix = dialectHelper.getSqlDialect() == SqlDialect.INFORMIX; + boolean isSybase = dialectHelper.getSqlDialect() == SqlDialect.SYBASE; + connection.setAutoCommit(isInformix); // Informix uses autocommit + + try { + + if (isSybase) { + // For Sybase, use HOLDLOCK to prevent race conditions during lock check + String selectSql = "SELECT lock_key, status, owner, expires_at " + + "FROM " + lockTableName + " HOLDLOCK " + + "WHERE lock_key = ?"; + + CommunityLockEntry existing = null; + try (PreparedStatement ps = connection.prepareStatement(selectSql)) { + ps.setString(1, keyStr); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + existing = new CommunityLockEntry( + rs.getString("lock_key"), + LockStatus.valueOf(rs.getString("status")), + rs.getString("owner"), + rs.getTimestamp("expires_at").toLocalDateTime() + ); + } + } + } + + if (existing == null || + owner.toString().equals(existing.getOwner()) || + LocalDateTime.now().isAfter(existing.getExpiresAt())) { + // Delete existing lock first, then insert new one + try (PreparedStatement delete = connection.prepareStatement( + "DELETE FROM " + lockTableName + " WHERE lock_key = ?")) { + delete.setString(1, keyStr); + delete.executeUpdate(); + } + dialectHelper.upsertLockEntry(connection, lockTableName, keyStr, owner.toString(), expiresAt); + connection.commit(); + return; + } else { + connection.rollback(); + throw new LockServiceException("upsert", keyStr, + "Still locked by " + existing.getOwner() + " until " + existing.getExpiresAt()); + } + } + + CommunityLockEntry existing = getLockEntry(connection, keyStr); + if (existing == null || + owner.toString().equals(existing.getOwner()) || + LocalDateTime.now().isAfter(existing.getExpiresAt())) { + dialectHelper.upsertLockEntry(connection, lockTableName, keyStr, owner.toString(), expiresAt); + // Commit for all dialects except Informix (which uses auto-commit above) + if (dialectHelper.getSqlDialect() != SqlDialect.INFORMIX) { + connection.commit(); + } + } else { + if (dialectHelper.getSqlDialect() != SqlDialect.INFORMIX) { + connection.rollback(); + } + throw new LockServiceException("upsert", keyStr, + "Still locked by " + existing.getOwner() + " until " + existing.getExpiresAt()); + } + } catch (Exception e) { + if (dialectHelper.getSqlDialect() != SqlDialect.INFORMIX) { + connection.rollback(); + } + throw e; + } finally { + if (dialectHelper.getSqlDialect() != SqlDialect.INFORMIX) { + connection.setAutoCommit(true); + } + } + } catch (SQLException e) { + throw new LockServiceException("upsert", keyStr, e.getMessage()); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + // Log but don't throw + } + } + } + } + + @Override + public LockAcquisition getLock(LockKey key) { + try (Connection connection = dataSource.getConnection()) { + CommunityLockEntry entry = getLockEntry(connection, key.toString()); + if (entry != null) { + return new LockAcquisition(RunnerId.fromString(entry.getOwner()), + Timestamp.valueOf(entry.getExpiresAt()).getTime() - System.currentTimeMillis()); + } + } catch (SQLException e) { + // ignore + } + return null; + } + + private CommunityLockEntry getLockEntry(Connection conn, String key) throws SQLException { + try (PreparedStatement ps = conn.prepareStatement( + dialectHelper.getSelectLockSqlString(lockTableName))) { + + // Set query timeout for Informix to prevent long waits + if (dialectHelper.getSqlDialect() == SqlDialect.INFORMIX) { + ps.setQueryTimeout(5); + } + + ps.setString(1, key); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return new CommunityLockEntry( + rs.getString(1), + LockStatus.valueOf(rs.getString("status")), + rs.getString("owner"), + rs.getTimestamp("expires_at").toLocalDateTime() + ); + } + } + } + return null; + } + + @Override + public Map getAllLocks() { + Map locks = new HashMap<>(); + + try (Connection connection = dataSource.getConnection(); + Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(dialectHelper.getSelectAllLocksSqlString(lockTableName))) { + while (rs.next()) { + LockKey key = LockKey.fromString(rs.getString(1)); + LockAcquisition acquisition = new LockAcquisition( + RunnerId.fromString(rs.getString("owner")), + Timestamp.valueOf(rs.getTimestamp("expires_at").toLocalDateTime()).getTime() - System.currentTimeMillis() + ); + locks.put(key, acquisition); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to read locks from database", e); + } + + return locks; + } + + @Override + public void removeLock(LockKey key) { + try { + try (Connection connection = dataSource.getConnection(); + PreparedStatement ps = connection.prepareStatement( + dialectHelper.getDeleteLockSqlString(lockTableName))) { + + // Set query timeout for Informix to prevent long waits + if (dialectHelper.getSqlDialect() == SqlDialect.INFORMIX) { + ps.setQueryTimeout(5); + } + ps.setString(1, key.toString()); + ps.executeUpdate(); + } + } catch (SQLException e) { + // ignore + } + } + + @Override + public boolean hasLocks() { + return !this.getAllLocks().isEmpty(); + } + + @Override + public void clear() { + try(Connection connection = dataSource.getConnection(); + Statement stmt = connection.createStatement()) { + stmt.executeUpdate(String.format("DELETE FROM %s", lockTableName)); + } catch (SQLException e) { + throw new RuntimeException("Failed to clear lock entries", e); + } + metadata.clear(); + } + + @Override + public void setMetadata(String key, Object value) { + metadata.put(key, value); + } + + @Override + public Object getMetadata(String key) { + return metadata.get(key); + } + +} diff --git a/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java new file mode 100644 index 000000000..9b6020e14 --- /dev/null +++ b/utils/sql-test-kit/src/main/java/io/flamingock/sql/kit/SqlTestKit.java @@ -0,0 +1,88 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.sql.kit; + +import io.flamingock.core.kit.AbstractTestKit; +import io.flamingock.core.kit.audit.AuditStorage; +import io.flamingock.core.kit.lock.LockStorage; +import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.dialectHelpers.SqlTestKitDialectHelper; +import io.flamingock.internal.core.external.store.CommunityAuditStore; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +public class SqlTestKit extends AbstractTestKit { + + private final DataSource dataSource; + private final SqlTestKitDialectHelper dialectHelper; + + public SqlTestKit(AuditStorage auditStorage, LockStorage lockStorage, CommunityAuditStore auditStore, DataSource dataSource) throws SQLException { + super(auditStorage, lockStorage, auditStore); + this.dataSource = dataSource; + try (Connection conn = dataSource.getConnection()) { + this.dialectHelper = new SqlTestKitDialectHelper(conn); + } + } + + @Override + public void cleanUp() { + try { + try (Connection connection = dataSource.getConnection()) { + if (dialectHelper.getSqlDialect() == SqlDialect.H2) { + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate("DROP ALL OBJECTS"); + } + return; + } + List tables = dialectHelper.getUserTables(connection); + + if (tables.isEmpty()) { + return; + } + + dialectHelper.disableForeignKeyChecks(connection); + try { + for (String tableName : tables) { + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate(dialectHelper.getDropTableSql(tableName)); + } catch (SQLException e) { + if (dialectHelper.getSqlDialect() != SqlDialect.SYBASE) { + throw e; + } + } + } + } finally { + dialectHelper.enableForeignKeyChecks(connection); + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Create a new SqlTestKit with client and database + */ + public static SqlTestKit create(CommunityAuditStore auditStore, DataSource dataSource) throws SQLException { + SqlAuditStorage auditStorage = new SqlAuditStorage(dataSource); + SqlLockStorage lockStorage = new SqlLockStorage(dataSource); + return new SqlTestKit(auditStorage, lockStorage, auditStore, dataSource); + } +} diff --git a/utils/sql-util/build.gradle.kts b/utils/sql-util/build.gradle.kts index d86ec391c..d468198c5 100644 --- a/utils/sql-util/build.gradle.kts +++ b/utils/sql-util/build.gradle.kts @@ -7,6 +7,7 @@ java { } dependencies { + implementation(project(":core:flamingock-core")) implementation("com.zaxxer:HikariCP:3.4.5") implementation("org.testcontainers:testcontainers-junit-jupiter:2.0.2") // SQL Testcontainers diff --git a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditorDialectHelper.java b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlAuditorDialectHelper.java similarity index 91% rename from community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditorDialectHelper.java rename to utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlAuditorDialectHelper.java index 924584a33..31a5ea4b5 100644 --- a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlAuditorDialectHelper.java +++ b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlAuditorDialectHelper.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.flamingock.store.sql.internal; +package io.flamingock.internal.common.sql.dialectHelpers; import io.flamingock.internal.common.sql.SqlDialectFactory; import io.flamingock.internal.common.sql.SqlDialect; @@ -279,19 +279,36 @@ public String getCreateTableSqlString(String tableName) { public String getInsertSqlString(String tableName) { return String.format( - "INSERT INTO %s (" + - "execution_id, stage_id, change_id, author, created_at, state, invoked_class, invoked_method, source_file, metadata, " + - "execution_millis, execution_hostname, error_trace, type, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag, system_change" + - ") VALUES (" + - "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?" + - ")", tableName); + "INSERT INTO %s (" + + "execution_id, stage_id, change_id, author, created_at, state, invoked_class, invoked_method, source_file, metadata, " + + "execution_millis, execution_hostname, error_trace, type, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag, system_change" + + ") VALUES (" + + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?" + + ")", tableName); } public String getSelectHistorySqlString(String tableName) { return String.format( - "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + - "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + - "FROM %s ORDER BY id ASC", tableName); + "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + + "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + + "FROM %s " + + "ORDER BY id ASC", tableName); + } + + public String getSelectHistoryByChangeIdSqlString(String tableName) { + return String.format( + "SELECT execution_id, stage_id, change_id, author, created_at, state, type, invoked_class, invoked_method, source_file, " + + "execution_millis, execution_hostname, metadata, system_change, error_trace, tx_strategy, target_system_id, change_order, recovery_strategy, transaction_flag " + + "FROM %s " + + "WHERE change_id = ? " + + "ORDER BY id ASC", tableName); + } + + public String getCountByStatusSqlString(String tableName) { + return String.format( + "SELECT COUNT(change_id) " + + "FROM %s " + + "WHERE state = ?", tableName); } private String getAutoIncrementType() { diff --git a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockDialectHelper.java b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlLockDialectHelper.java similarity index 65% rename from community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockDialectHelper.java rename to utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlLockDialectHelper.java index be4950e0c..a4672f885 100644 --- a/community/flamingock-auditstore-sql/src/main/java/io/flamingock/store/sql/internal/SqlLockDialectHelper.java +++ b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlLockDialectHelper.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.flamingock.store.sql.internal; +package io.flamingock.internal.common.sql.dialectHelpers; import io.flamingock.internal.common.sql.SqlDialectFactory; import io.flamingock.internal.common.sql.SqlDialect; @@ -21,7 +21,6 @@ import java.sql.*; import java.time.LocalDateTime; -import java.util.Objects; public final class SqlLockDialectHelper { @@ -117,6 +116,7 @@ public String getCreateTableSqlString(String tableName) { public String getSelectLockSqlString(String tableName) { switch (sqlDialect) { case POSTGRESQL: + case H2: return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ?", tableName); case DB2: // Select lock_key as the first column (getLockEntry expects rs.getString(1) to be the key) @@ -125,10 +125,10 @@ public String getSelectLockSqlString(String tableName) { return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName); case SYBASE: return String.format( - "SELECT lock_key, status, owner, expires_at " + - "FROM %s HOLDLOCK " + - "WHERE lock_key = ?", - tableName + "SELECT lock_key, status, owner, expires_at " + + "FROM %s HOLDLOCK " + + "WHERE lock_key = ?", + tableName ); case ORACLE: return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ? FOR UPDATE", tableName); case INFORMIX: @@ -140,90 +140,118 @@ public String getSelectLockSqlString(String tableName) { } } + public String getSelectAllLocksSqlString(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + case H2: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s", tableName); + case SQLSERVER: + return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK)", tableName); + case SYBASE: + return String.format( + "SELECT lock_key, status, owner, expires_at " + + "FROM %s HOLDLOCK", + tableName + ); + case ORACLE: + return String.format("SELECT \"key\", status, owner, expires_at FROM %s FOR UPDATE", tableName); + case DB2: + case INFORMIX: + case FIREBIRD: + return String.format("SELECT lock_key, status, owner, expires_at FROM %s", tableName); + default: + return String.format("SELECT `key`, status, owner, expires_at FROM %s", tableName); + } + } + public String getInsertOrUpdateLockSqlString(String tableName) { switch (sqlDialect) { case MYSQL: case MARIADB: return String.format( - "INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) " + - "ON DUPLICATE KEY UPDATE status = VALUES(status), owner = VALUES(owner), expires_at = VALUES(expires_at)", - tableName); + "INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE status = VALUES(status), owner = VALUES(owner), expires_at = VALUES(expires_at)", + tableName); case POSTGRESQL: return String.format( - "INSERT INTO %s (\"key\", status, owner, expires_at) VALUES (?, ?, ?, ?) " + - "ON CONFLICT (\"key\") DO UPDATE SET status = EXCLUDED.status, owner = EXCLUDED.owner, expires_at = EXCLUDED.expires_at", - tableName); + "INSERT INTO %s (\"key\", status, owner, expires_at) VALUES (?, ?, ?, ?) " + + "ON CONFLICT (\"key\") DO UPDATE SET status = EXCLUDED.status, owner = EXCLUDED.owner, expires_at = EXCLUDED.expires_at", + tableName); case SQLITE: return String.format( - "INSERT OR REPLACE INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?)", - tableName); + "INSERT OR REPLACE INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?)", + tableName); case SQLSERVER: return String.format( - "BEGIN TRANSACTION; " + - "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE [key] = ?; " + - "IF @@ROWCOUNT = 0 " + - "BEGIN " + - "INSERT INTO %s ([key], status, owner, expires_at) VALUES (?, ?, ?, ?) " + - "END; " + - "COMMIT TRANSACTION;", - tableName, tableName); + "BEGIN TRANSACTION; " + + "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE [key] = ?; " + + "IF @@ROWCOUNT = 0 " + + "BEGIN " + + "INSERT INTO %s ([key], status, owner, expires_at) VALUES (?, ?, ?, ?) " + + "END; " + + "COMMIT TRANSACTION;", + tableName, tableName); case SYBASE: return String.format( - "BEGIN TRAN " + - "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " + - "IF @@ROWCOUNT = 0 " + - "BEGIN " + - " INSERT INTO %s (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?); " + - "END " + - "COMMIT TRAN", - tableName, tableName + "BEGIN TRAN " + + "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " + + "IF @@ROWCOUNT = 0 " + + "BEGIN " + + " INSERT INTO %s (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?); " + + "END " + + "COMMIT TRAN", + tableName, tableName ); case ORACLE: return String.format( - "MERGE INTO %s t USING (SELECT ? AS \"key\", ? AS status, ? AS owner, ? AS expires_at FROM dual) s " + - "ON (t.\"key\" = s.\"key\") " + - "WHEN MATCHED THEN UPDATE SET t.status = s.status, t.owner = s.owner, t.expires_at = s.expires_at " + - "WHEN NOT MATCHED THEN INSERT (\"key\", status, owner, expires_at) VALUES (s.\"key\", s.status, s.owner, s.expires_at)", - tableName); + "MERGE INTO %s t USING (SELECT ? AS \"key\", ? AS status, ? AS owner, ? AS expires_at FROM dual) s " + + "ON (t.\"key\" = s.\"key\") " + + "WHEN MATCHED THEN UPDATE SET t.status = s.status, t.owner = s.owner, t.expires_at = s.expires_at " + + "WHEN NOT MATCHED THEN INSERT (\"key\", status, owner, expires_at) VALUES (s.\"key\", s.status, s.owner, s.expires_at)", + tableName); case H2: return String.format( - "MERGE INTO %s (`key`, status, owner, expires_at) KEY (`key`) VALUES (?, ?, ?, ?)", - tableName); + "MERGE INTO %s (\"key\", status, owner, expires_at) KEY (\"key\") VALUES (?, ?, ?, ?)", + tableName); case DB2: // Use a VALUES-derived table and a target alias for DB2 to avoid parsing issues return String.format( - "MERGE INTO %s tgt USING (VALUES (?, ?, ?, ?)) src(lock_key, status, owner, expires_at) " + - "ON (tgt.lock_key = src.lock_key) " + - "WHEN MATCHED THEN UPDATE SET status = src.status, owner = src.owner, expires_at = src.expires_at " + - "WHEN NOT MATCHED THEN INSERT (lock_key, status, owner, expires_at) VALUES (src.lock_key, src.status, src.owner, src.expires_at)", - tableName); + "MERGE INTO %s tgt USING (VALUES (?, ?, ?, ?)) src(lock_key, status, owner, expires_at) " + + "ON (tgt.lock_key = src.lock_key) " + + "WHEN MATCHED THEN UPDATE SET status = src.status, owner = src.owner, expires_at = src.expires_at " + + "WHEN NOT MATCHED THEN INSERT (lock_key, status, owner, expires_at) VALUES (src.lock_key, src.status, src.owner, src.expires_at)", + tableName); case FIREBIRD: - return String.format("UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?", tableName); + return String.format("UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?", tableName); case INFORMIX: // Informix doesn't support ON DUPLICATE KEY UPDATE // Use a procedural approach similar to SQL Server return String.format( - "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " + - "INSERT INTO %s (lock_key, status, owner, expires_at) " + - "SELECT ?, ?, ?, ? FROM sysmaster:sysdual " + - "WHERE NOT EXISTS (SELECT 1 FROM %s WHERE lock_key = ?)", - tableName, tableName, tableName); + "UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " + + "INSERT INTO %s (lock_key, status, owner, expires_at) " + + "SELECT ?, ?, ?, ? FROM sysmaster:sysdual " + + "WHERE NOT EXISTS (SELECT 1 FROM %s WHERE lock_key = ?)", + tableName, tableName, tableName); default: throw new UnsupportedOperationException("Dialect not supported for upsert: " + sqlDialect.name()); } } public String getDeleteLockSqlString(String tableName) { - if (Objects.requireNonNull(sqlDialect) == SqlDialect.POSTGRESQL) { - return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName); - } - if (sqlDialect == SqlDialect.INFORMIX || sqlDialect == SqlDialect.DB2) { - return String.format("DELETE FROM %s WHERE lock_key = ?", tableName); - } - if (sqlDialect == SqlDialect.FIREBIRD) { - return String.format("DELETE FROM %s WHERE lock_key = ?", tableName); + switch (sqlDialect) { + case POSTGRESQL: + case ORACLE: + return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName); + case INFORMIX: + case DB2: + case FIREBIRD: + case SYBASE: + return String.format("DELETE FROM %s WHERE lock_key = ?", tableName); + case SQLSERVER: + return String.format("DELETE FROM %s WHERE [key] = ?", tableName); + default: // MYSQL, MARIADB, SQLITE, H2 + return String.format("DELETE FROM %s WHERE `key` = ?", tableName); } - return String.format("DELETE FROM %s WHERE `key` = ?", tableName); } public void upsertLockEntry(Connection conn, String tableName, String key, String owner, LocalDateTime expiresAt) throws SQLException { @@ -232,10 +260,11 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin if (sqlDialect == SqlDialect.DB2) { // UPDATE first try (PreparedStatement update = conn.prepareStatement( - "UPDATE " + tableName + " SET owner = ?, expires_at = ? WHERE lock_key = ?")) { - update.setString(1, owner); - update.setTimestamp(2, Timestamp.valueOf(expiresAt)); - update.setString(3, key); + "UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?")) { + update.setString(1, LockStatus.LOCK_HELD.name()); + update.setString(2, owner); + update.setTimestamp(3, Timestamp.valueOf(expiresAt)); + update.setString(4, key); int updated = update.executeUpdate(); if (updated > 0) { return; @@ -244,7 +273,7 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin // If no row updated, try INSERT try (PreparedStatement insert = conn.prepareStatement( - "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { + "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { insert.setString(1, key); insert.setString(2, LockStatus.LOCK_HELD.name()); insert.setString(3, owner); @@ -257,7 +286,7 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin if (getSqlDialect() == SqlDialect.INFORMIX) { // Try UPDATE first try (PreparedStatement update = conn.prepareStatement( - "UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?")) { + "UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?")) { update.setString(1, LockStatus.LOCK_HELD.name()); update.setString(2, owner); update.setTimestamp(3, Timestamp.valueOf(expiresAt)); @@ -270,7 +299,7 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin // If no row updated, try INSERT try (PreparedStatement insert = conn.prepareStatement( - "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { + "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { insert.setString(1, key); insert.setString(2, LockStatus.LOCK_HELD.name()); insert.setString(3, owner); @@ -284,22 +313,21 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin // For SQL Server/Sybase, use Statement and format SQL try (Statement stmt = conn.createStatement()) { String formattedSql = sql - .replaceFirst("\\?", "'" + LockStatus.LOCK_HELD.name() + "'") - .replaceFirst("\\?", "'" + owner + "'") - .replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'") - .replaceFirst("\\?", "'" + key + "'") - .replaceFirst("\\?", "'" + key + "'") - .replaceFirst("\\?", "'" + LockStatus.LOCK_HELD.name() + "'") - .replaceFirst("\\?", "'" + owner + "'") - .replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'"); + .replaceFirst("\\?", "'" + LockStatus.LOCK_HELD.name() + "'") + .replaceFirst("\\?", "'" + owner + "'") + .replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'") + .replaceFirst("\\?", "'" + key + "'") + .replaceFirst("\\?", "'" + key + "'") + .replaceFirst("\\?", "'" + LockStatus.LOCK_HELD.name() + "'") + .replaceFirst("\\?", "'" + owner + "'") + .replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'"); stmt.execute(formattedSql); } return; } if (sqlDialect == SqlDialect.FIREBIRD) { - String updateSql = getInsertOrUpdateLockSqlString(tableName); - try (PreparedStatement ps = conn.prepareStatement(updateSql)) { + try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, LockStatus.LOCK_HELD.name()); ps.setString(2, owner); ps.setTimestamp(3, Timestamp.valueOf(expiresAt)); @@ -322,7 +350,7 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin if (sqlDialect == SqlDialect.SYBASE) { // The lock was already deleted in acquireLockQuery for Sybase try (PreparedStatement insert = conn.prepareStatement( - "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { + "INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) { insert.setString(1, key); insert.setString(2, LockStatus.LOCK_HELD.name()); insert.setString(3, owner); diff --git a/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlTestKitDialectHelper.java b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlTestKitDialectHelper.java new file mode 100644 index 000000000..61076b687 --- /dev/null +++ b/utils/sql-util/src/main/java/io/flamingock/internal/common/sql/dialectHelpers/SqlTestKitDialectHelper.java @@ -0,0 +1,151 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.common.sql.dialectHelpers; + +import io.flamingock.internal.common.sql.SqlDialect; +import io.flamingock.internal.common.sql.SqlDialectFactory; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +public final class SqlTestKitDialectHelper { + + final private SqlDialect sqlDialect; + + public SqlTestKitDialectHelper(Connection connection) { + this.sqlDialect = SqlDialectFactory.getSqlDialect(connection); + } + + public void disableForeignKeyChecks(Connection conn) throws SQLException { + switch (sqlDialect) { + case MYSQL: + case MARIADB: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET FOREIGN_KEY_CHECKS=0"); + } + break; + case SQLITE: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("PRAGMA foreign_keys = OFF"); + } + break; + case H2: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET REFERENTIAL_INTEGRITY FALSE"); + } + break; + case SQLSERVER: + case SYBASE: + case FIREBIRD: + dropAllForeignKeys(conn); + break; + default: + break; + } + } + + public void enableForeignKeyChecks(Connection conn) throws SQLException { + switch (sqlDialect) { + case MYSQL: + case MARIADB: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET FOREIGN_KEY_CHECKS=1"); + } + break; + case SQLITE: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("PRAGMA foreign_keys = ON"); + } + break; + case H2: + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("SET REFERENTIAL_INTEGRITY TRUE"); + } + break; + default: + break; + } + } + + private void dropAllForeignKeys(Connection conn) throws SQLException { + DatabaseMetaData meta = conn.getMetaData(); + String schema = conn.getSchema(); + String catalog = conn.getCatalog(); + + try (ResultSet tables = meta.getTables(catalog, schema, "%", new String[]{"TABLE"})) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + try (ResultSet fks = meta.getExportedKeys(catalog, schema, tableName)) { + while (fks.next()) { + String fkName = fks.getString("FK_NAME"); + String fkTable = fks.getString("FKTABLE_NAME"); + if (fkName != null) { + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate( + "ALTER TABLE " + fkTable + " DROP CONSTRAINT " + fkName); + } catch (SQLException e) { + } + } + } + } + } + } + } + + public String getDropTableSql(String tableName) { + switch (sqlDialect) { + case POSTGRESQL: + return "DROP TABLE IF EXISTS " + tableName + " CASCADE"; + case ORACLE: + return "DROP TABLE " + tableName + " CASCADE CONSTRAINTS PURGE"; + case SQLSERVER: + return "IF OBJECT_ID('" + tableName + "', 'U') IS NOT NULL DROP TABLE " + tableName; + case SYBASE: + return "DROP TABLE " + tableName; + default: + // MYSQL, MARIADB, SQLITE, H2, DB2, INFORMIX, FIREBIRD + return "DROP TABLE IF EXISTS " + tableName; + } + } + + public List getUserTables(Connection conn) throws SQLException { + List tables = new ArrayList<>(); + DatabaseMetaData meta = conn.getMetaData(); + String schema = conn.getSchema(); + String catalog = conn.getCatalog(); + + if (sqlDialect == SqlDialect.INFORMIX && schema == null) { + schema = meta.getUserName(); + } + + try (ResultSet rs = meta.getTables(catalog, schema, "%", new String[]{"TABLE"})) { + while (rs.next()) { + String tableName = rs.getString("TABLE_NAME"); + String upperName = tableName.toUpperCase(); + if (sqlDialect == SqlDialect.FIREBIRD && (upperName.startsWith("RDB$") || upperName.startsWith("MON$") || upperName.startsWith("SEC$"))) { + continue; + } + tables.add(tableName); + } + } + return tables; + } + + public SqlDialect getSqlDialect() { + return sqlDialect; + } +}