diff --git a/.github/workflows/daily-it.yml b/.github/workflows/daily-it.yml index 5f7d8cdcfd612..1510340caa2d8 100644 --- a/.github/workflows/daily-it.yml +++ b/.github/workflows/daily-it.yml @@ -85,995 +85,3 @@ jobs: name: table-standalone-log-java${{ matrix.java }}-${{ runner.os }} path: integration-test/target/cluster-logs retention-days: 3 - PipeSingle: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - cluster2: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - ] - os: [ubuntu-latest] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT1 \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTreeAutoBasic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTreeAutoEnhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - cluster2: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - ] - os: [ubuntu-latest] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTreeManual: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - cluster2: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - ] - os: [ubuntu-latest] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeManual \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - SubscriptionTreeArchVerification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: - [ - ScalableSingleNodeMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - cluster2: [ScalableSingleNodeMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-arch-verification-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - SubscriptionTableArchVerification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ScalableSingleNodeMode] - cluster2: [ScalableSingleNodeMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTableArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-table-arch-verification-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - SubscriptionTreeRegressionConsumer: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: - [ - ScalableSingleNodeMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - cluster2: [ScalableSingleNodeMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeRegressionConsumer \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-regression-consumer-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - SubscriptionTreeRegressionMisc: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: - [ - ScalableSingleNodeMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - cluster2: [ScalableSingleNodeMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeRegressionMisc \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTableManualBasic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTableManualEnhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: - [ - LightWeightStandaloneMode, - ScalableSingleNodeMode, - HighPerformanceMode, - PipeConsensusBatchMode, - PipeConsensusStreamMode, - ] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml index b1c2b75fa9f29..e69de29bb2d1d 100644 --- a/.github/workflows/pipe-it.yml +++ b/.github/workflows/pipe-it.yml @@ -1,978 +0,0 @@ -name: Multi-Cluster IT - -on: - push: - branches: - - master - - "rel/*" - - "rc/*" - paths-ignore: - - "docs/**" - - "site/**" - - "iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**" #queryengine - pull_request: - branches: - - master - - "rel/*" - - "rc/*" - - "force_ci/**" - paths-ignore: - - "docs/**" - - "site/**" - - "iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**" #queryengine - # allow manually run the action: - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - MAVEN_ARGS: --batch-mode --no-transfer-progress - DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} - -jobs: - single: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [HighPerformanceMode] - cluster2: [HighPerformanceMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT1 \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-auto-basic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [HighPerformanceMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-auto-enhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [HighPerformanceMode] - cluster2: [HighPerformanceMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-manual: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [HighPerformanceMode] - cluster2: [HighPerformanceMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeManual \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-tree-arch-verification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ScalableSingleNodeMode] - cluster2: [ScalableSingleNodeMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-arch-verification-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-table-arch-verification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ScalableSingleNodeMode] - cluster2: [ScalableSingleNodeMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTableArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-table-arch-verification-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-tree-regression-consumer: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ScalableSingleNodeMode] - cluster2: [ScalableSingleNodeMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeRegressionConsumer \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-regression-consumer-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-tree-regression-misc: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ScalableSingleNodeMode] - cluster2: [ScalableSingleNodeMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeRegressionMisc \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-table-manual-basic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [HighPerformanceMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-table-manual-enhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [HighPerformanceMode] - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - triple: - strategy: - fail-fast: false - max-parallel: 1 - matrix: - java: [ 17 ] - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - cluster3: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }},${{ matrix.cluster3 }} \ - -pl integration-test \ - -am -PMultiClusterIT3 \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-triple-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}-${{ matrix.cluster3 }} - path: integration-test/target/cluster-logs - retention-days: 30 diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java index 3fce11bf51f6d..8e8c61dca24ee 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java @@ -351,11 +351,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); } - @Test - public void testAirGapConnectorUseNodeUrls() throws Exception { - doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()); - } - private void doTestUseNodeUrls(String connectorName) throws Exception { senderEnv .getConfig() @@ -398,16 +393,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { }; for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) { - if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) { - // Use default port for convenience - nodeUrlsBuilder - .append(wrapper.getIp()) - .append(":") - .append(wrapper.getPipeAirGapReceiverPort()) - .append(","); - } else { - nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); - } + nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); } try (final SyncConfigNodeIServiceClient client = diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java index b5e4127e9465c..7833a0621ba52 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java @@ -345,11 +345,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); } - @Test - public void testAirGapConnectorUseNodeUrls() throws Exception { - doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()); - } - private void doTestUseNodeUrls(String sinkName) throws Exception { senderEnv .getConfig() @@ -384,16 +379,7 @@ private void doTestUseNodeUrls(String sinkName) throws Exception { final StringBuilder nodeUrlsBuilder = new StringBuilder(); for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) { - if (sinkName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) { - // Use default port for convenience - nodeUrlsBuilder - .append(wrapper.getIp()) - .append(":") - .append(wrapper.getPipeAirGapReceiverPort()) - .append(","); - } else { - nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); - } + nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); } try (final SyncConfigNodeIServiceClient client = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java index c7222e9a1bc97..82c394c7fb545 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; -import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionAirGapSink; import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionSink; import org.apache.iotdb.pipe.api.PipeConnector; @@ -41,9 +40,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBConfigRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBConfigRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); @@ -55,8 +51,6 @@ protected void initConstructors() { BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBConfigRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 31cc8250ebf96..33143bdc27ede 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -21,18 +21,10 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor; -import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor; class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { @@ -44,28 +36,6 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), DoNothingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(), - TumblingTimeSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(), - SwingingDoorTrendingSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(), - ChangingValueSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), - ThrowingExceptionProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), AggregateProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(), - StandardStatisticsOperatorProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(), - TumblingWindowingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new); pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(), PipeConsensusProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java index 536cf71cdb8db..09773d0cad52e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java @@ -23,10 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink; import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; -import org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaSink; -import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; @@ -59,13 +56,8 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), IoTDBLegacyPipeSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBDataRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); pluginConstructors.put( @@ -82,12 +74,8 @@ protected void initConstructors() { IoTDBDataRegionAsyncSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), IoTDBLegacyPipeSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBDataRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); pluginConstructors.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java index 160ecf54c0158..a7752994ee264 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; -import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink; import org.apache.iotdb.pipe.api.PipeConnector; @@ -41,9 +40,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBSchemaRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBSchemaRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); @@ -55,8 +51,6 @@ protected void initConstructors() { BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBSchemaRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java index a5598f87d84ec..1d348314d6c85 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java @@ -236,7 +236,7 @@ public synchronized void removeEventFromBuffer(EnrichedEvent event) { while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) { current = iterator.next(); } - if (current.equalsInIoTConsensusV2(event)) { + if (current.equalsInPipeConsensus(event)) { iterator.remove(); } else { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index d1ceb48e8aad0..5e75c23d6a48c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePatternOperations; @@ -140,6 +141,21 @@ public class IoTDBDataRegionSource extends IoTDBSource { public void validate(final PipeParameterValidator validator) throws Exception { super.validate(validator); + final boolean forwardingPipeRequests = + validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, + PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), + PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + if (!forwardingPipeRequests) { + throw new PipeParameterNotValidException( + String.format( + "The parameter %s cannot be set to false.", + PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY)); + } + final boolean isTreeDialect = validator .getParameters() @@ -266,32 +282,6 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)); - // Validate source.realtime.mode - if (validator - .getParameters() - .getBooleanOrDefault( - Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), - EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE) - || validator - .getParameters() - .hasAnyAttributes( - SOURCE_START_TIME_KEY, - EXTRACTOR_START_TIME_KEY, - SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY)) { - validator.validateAttributeValueRange( - validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY) - ? EXTRACTOR_REALTIME_MODE_KEY - : SOURCE_REALTIME_MODE_KEY, - true, - EXTRACTOR_REALTIME_MODE_FILE_VALUE, - EXTRACTOR_REALTIME_MODE_HYBRID_VALUE, - EXTRACTOR_REALTIME_MODE_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE, - EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE); - } - checkInvalidParameters(validator); constructHistoricalExtractor(); @@ -454,6 +444,13 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { return; } + if (!(pipeName != null + && (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX) + || pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) { + realtimeExtractor = new PipeRealtimeDataRegionTsFileSource(); + return; + } + // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index cc59f885686b7..9626f4577698a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -930,8 +930,8 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) { false); // if using IoTV2, assign a replicateIndex for this historical deletion event - if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2 - && IoTConsensusV2Processor.isShouldReplicate(event)) { + if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus + && PipeConsensusProcessor.isShouldReplicate(event)) { event.setReplicateIndexForIoTV2( ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName)); LOGGER.debug( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index cd00e2975a0aa..f0a395832fc5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -114,7 +114,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { private final AtomicReference> dataRegionTimePartitionIdBound = new AtomicReference<>(); - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; private boolean shouldTransferModFile; // Whether to transfer mods @@ -250,22 +250,7 @@ public void customize( ? TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) : TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1; - final boolean isDoubleLiving = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY, - PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY), - PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE); - if (isDoubleLiving) { - isForwardingPipeRequests = false; - } else { - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - } + isForwardingPipeRequests = true; if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { shouldTransferModFile = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index ec2122b917574..2cf154053afed 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -21,27 +21,16 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; -import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; -import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; -import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; -import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.IMeasurementSchema; -import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; -import java.security.SecureRandom; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; public class PipeSinkTest { @@ -103,95 +92,4 @@ public void testIoTDBThriftAsyncConnectorToOthers() { Assert.fail(); } } - - @Test - public void testOpcUaSink() { - final List schemaList = - Arrays.asList( - new MeasurementSchema("s1", TSDataType.INT64), - new MeasurementSchema("s2", TSDataType.INT64)); - - final Tablet tablet = new Tablet("root.db.d1.vector6", schemaList, 100); - - long timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - final int rowSize = tablet.getRowSize(); - tablet.addTimestamp(rowSize, timestamp); - for (int i = 0; i < 2; i++) { - tablet.addValue( - schemaList.get(i).getMeasurementName(), rowSize, new SecureRandom().nextLong()); - } - timestamp++; - } - - final List opcSchemaList = - Arrays.asList( - new MeasurementSchema("value1", TSDataType.INT64), - new MeasurementSchema("quality1", TSDataType.BOOLEAN)); - final Tablet qualityTablet = new Tablet("root.db.d1.vector6.s3", opcSchemaList, 100); - - timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - final int rowSize = qualityTablet.getRowSize(); - qualityTablet.addTimestamp(rowSize, timestamp); - qualityTablet.addValue( - opcSchemaList.get(0).getMeasurementName(), rowSize, new SecureRandom().nextLong()); - qualityTablet.addValue(opcSchemaList.get(1).getMeasurementName(), rowSize, true); - timestamp++; - } - - try (final OpcUaSink qualityOPC = new OpcUaSink(); - final OpcUaSink normalOPC = new OpcUaSink()) { - final PipeTaskRuntimeConfiguration configuration = - new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("temp", 0, 1)); - qualityOPC.customize( - new PipeParameters( - new HashMap() { - { - put( - PipeSinkConstant.CONNECTOR_KEY, - BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName()); - put(PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY, "true"); - put(PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY, "value1"); - put(PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY, "quality1"); - } - }), - configuration); - normalOPC.customize( - new PipeParameters( - new HashMap() { - { - put( - PipeSinkConstant.CONNECTOR_KEY, - BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName()); - } - }), - configuration); - final PipeRawTabletInsertionEvent event = - new PipeRawTabletInsertionEvent( - false, "root.db", "db", "root.db", tablet, false, "pipe", 0L, null, null, false); - event.increaseReferenceCount(""); - normalOPC.transfer(event); - // Shall not throw - qualityOPC.transfer(event); - event.decreaseReferenceCount("", false); - - qualityOPC.transfer( - new PipeRawTabletInsertionEvent( - false, - "root.db", - "db", - "root.db", - qualityTablet, - false, - "pipe", - 0L, - null, - null, - false)); - - } catch (Exception e) { - Assert.fail(); - } - } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index 114f9a35ac1c2..39a4f3e01f428 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -237,7 +237,7 @@ public void testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution() measurementSchemas)); ReadOnlyMemChunk readOnlyMemChunk = resourcesByPathUtils.getReadOnlyMemChunkFromMemTable( - new QueryContext(1, false), memTable, null, Long.MAX_VALUE, null); + new QueryContext(1), memTable, null, Long.MAX_VALUE, null); for (int i = 1; i <= 50; i++) { memTable.writeAlignedRow( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 74bc0d9815aa8..f9f9c0fb6e380 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -19,27 +19,16 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange.RenameDatabaseProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.airgap.IoTDBAirGapSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.consensus.PipeConsensusAsyncSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBLegacyPipeSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftAsyncSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSslSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSyncSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcda.OpcDaSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcua.OpcUaSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.websocket.WebSocketSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.writeback.WriteBackSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource; @@ -61,18 +50,8 @@ public enum BuiltinPipePlugin { // processors DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class), - TUMBLING_TIME_SAMPLING_PROCESSOR( - "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class), - SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class), - CHANGING_VALUE_SAMPLING_PROCESSOR( - "changing-value-sampling-processor", ChangingValueSamplingProcessor.class), - THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class), - AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class), - COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class), // Hidden-processors, which are plugins of the processors - STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), - TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", PipeConsensusProcessor.class), RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class), @@ -83,12 +62,9 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", IoTDBThriftSyncSink.class), IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncSink.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeSink.class), - IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class), PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", PipeConsensusAsyncSink.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class), - OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class), - OPC_DA_CONNECTOR("opc-da-connector", OpcDaSink.class), WRITE_BACK_CONNECTOR("write-back-connector", WriteBackSink.class), DO_NOTHING_SINK("do-nothing-sink", DoNothingSink.class), @@ -97,10 +73,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncSink.class), IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", IoTDBThriftAsyncSink.class), IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeSink.class), - IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapSink.class), WEBSOCKET_SINK("websocket-sink", WebSocketSink.class), - OPC_UA_SINK("opc-ua-sink", OpcUaSink.class), - OPC_DA_SINK("opc-da-sink", OpcDaSink.class), WRITE_BACK_SINK("write-back-sink", WriteBackSink.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class), PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncSink.class), @@ -148,14 +121,6 @@ public String getClassName() { // Sources DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(), // Processors - TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(), - AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(), - COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(), - STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(), - TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(), RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(), // Connectors @@ -165,10 +130,7 @@ public String getClassName() { IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(), WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(), WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), // Sinks @@ -176,8 +138,6 @@ public String getClassName() { IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(), WEBSOCKET_SINK.getPipePluginName().toUpperCase(), - OPC_UA_SINK.getPipePluginName().toUpperCase(), - OPC_DA_SINK.getPipePluginName().toUpperCase(), SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase()))); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java index 91a149c9f6690..39f401eac2417 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java @@ -63,7 +63,7 @@ public abstract class IoTDBSource implements PipeExtractor { protected int regionId; protected PipeTaskMeta pipeTaskMeta; - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; // The value is always true after the first start even the extractor is closed protected final AtomicBoolean hasBeenStarted = new AtomicBoolean(false); @@ -162,22 +162,7 @@ public void customize( taskID = pipeName + "_" + regionId + "_" + creationTime; pipeTaskMeta = environment.getPipeTaskMeta(); - final boolean isDoubleLiving = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY, - PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY), - PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE); - if (isDoubleLiving) { - isForwardingPipeRequests = false; - } else { - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - } + isForwardingPipeRequests = true; userId = parameters.getStringOrDefault(