diff --git a/.github/actions/php/pre-merge/action.yml b/.github/actions/php/pre-merge/action.yml new file mode 100644 index 0000000000..2c1717e199 --- /dev/null +++ b/.github/actions/php/pre-merge/action.yml @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: php-pre-merge +description: PHP pre-merge testing github iggy actions + +inputs: + task: + description: "Task to run (lint, test, build)" + required: true + +runs: + using: "composite" + steps: + - name: Install PHP build dependencies + shell: bash + run: | + sudo apt-get update + sudo apt-get install -y --no-install-recommends \ + clang \ + libclang-dev \ + libssl-dev \ + php-cli \ + php-dev \ + pkg-config \ + unzip + + echo "PHP=$(command -v php)" >> "$GITHUB_ENV" + echo "PHP_CONFIG=$(command -v php-config)" >> "$GITHUB_ENV" + php --version + php-config --version + + - name: Setup Rust with cache + uses: ./.github/actions/utils/setup-rust-with-cache + with: + shared-key: dev + save-cache: ${{ inputs.task == 'test' }} + + - name: Use shared Cargo target directory + shell: bash + run: echo "CARGO_TARGET_DIR=${GITHUB_WORKSPACE}/target" >> "$GITHUB_ENV" + + - name: Validate task + shell: bash + run: | + case "${{ inputs.task }}" in + lint|test|build) ;; + *) + echo "Unknown PHP SDK task: ${{ inputs.task }}" + exit 1 + ;; + esac + + - name: Lint + if: inputs.task == 'lint' + shell: bash + run: | + php -r 'json_decode(file_get_contents("foreign/php/composer.json"), true, 512, JSON_THROW_ON_ERROR);' + cargo fmt --manifest-path foreign/php/Cargo.toml -- --check + find foreign/php \ + -path foreign/php/vendor -prune -o \ + -name '*.php' -print0 \ + | xargs -0 -n1 php -l + + - name: Build PHP extension + if: inputs.task == 'build' + shell: bash + run: | + cargo build --release --manifest-path foreign/php/Cargo.toml + extension="$(find "${CARGO_TARGET_DIR}/release" -maxdepth 1 -name 'libiggy_php.so' -print -quit)" + if [ -z "$extension" ]; then + echo "PHP extension was not produced" + exit 1 + fi + ls -lh "$extension" + + - name: Build PHP extension for tests + if: inputs.task == 'test' + shell: bash + run: | + cargo build --manifest-path foreign/php/Cargo.toml + extension="$(find "${CARGO_TARGET_DIR}/debug" -maxdepth 1 -name 'libiggy_php.so' -print -quit)" + if [ -z "$extension" ]; then + echo "PHP extension was not produced" + exit 1 + fi + echo "PHP_IGGY_EXTENSION=$(realpath "$extension")" >> "$GITHUB_ENV" + ls -lh "$extension" + + - name: Start Iggy server + if: inputs.task == 'test' + id: iggy + uses: ./.github/actions/utils/server-start + + - name: Run PHP SDK tests + if: inputs.task == 'test' + shell: bash + working-directory: foreign/php + env: + IGGY_HOST: 127.0.0.1 + IGGY_PORT: 8090 + IGGY_USERNAME: iggy + IGGY_PASSWORD: iggy + run: ./scripts/test.sh + + - name: Stop Iggy server + if: always() && inputs.task == 'test' + uses: ./.github/actions/utils/server-stop + with: + pid-file: ${{ steps.iggy.outputs.pid_file }} + log-file: ${{ steps.iggy.outputs.log_file }} diff --git a/.github/config/components.yml b/.github/config/components.yml index e7b6dbbbc5..fde571c1ee 100644 --- a/.github/config/components.yml +++ b/.github/config/components.yml @@ -200,6 +200,15 @@ components: - "foreign/python/**" tasks: ["lint", "test", "build"] + sdk-php: + depends_on: + - "rust-sdk" # PHP SDK wraps the Rust SDK + - "rust-server" # For integration tests + - "ci-infrastructure" # CI changes trigger full regression + paths: + - "foreign/php/**" + tasks: ["lint", "test", "build"] + sdk-node: depends_on: - "rust-sdk" # Node SDK depends on core SDK diff --git a/.github/workflows/_detect.yml b/.github/workflows/_detect.yml index 63f7add7f4..8acde265bd 100644 --- a/.github/workflows/_detect.yml +++ b/.github/workflows/_detect.yml @@ -26,6 +26,9 @@ on: python_matrix: description: "Matrix for Python SDK" value: ${{ jobs.detect.outputs.python_matrix }} + php_matrix: + description: "Matrix for PHP SDK" + value: ${{ jobs.detect.outputs.php_matrix }} node_matrix: description: "Matrix for Node SDK" value: ${{ jobs.detect.outputs.node_matrix }} @@ -60,6 +63,7 @@ jobs: outputs: rust_matrix: ${{ steps.mk.outputs.rust_matrix }} python_matrix: ${{ steps.mk.outputs.python_matrix }} + php_matrix: ${{ steps.mk.outputs.php_matrix }} node_matrix: ${{ steps.mk.outputs.node_matrix }} go_matrix: ${{ steps.mk.outputs.go_matrix }} java_matrix: ${{ steps.mk.outputs.java_matrix }} @@ -230,7 +234,7 @@ jobs: console.log(`Total files changed: ${files.length}`); } - const groups = { rust:[], python:[], node:[], go:[], java:[], csharp:[], cpp:[], bdd:[], examples:[], other:[] }; + const groups = { rust:[], python:[], php:[], node:[], go:[], java:[], csharp:[], cpp:[], bdd:[], examples:[], other:[] }; // Process affected components and generate tasks console.log(''); @@ -253,6 +257,7 @@ jobs: if (name === 'rust') groups.rust.push(...entries); else if (name === 'sdk-python') groups.python.push(...entries); + else if (name === 'sdk-php') groups.php.push(...entries); else if (name === 'sdk-node') groups.node.push(...entries); else if (name === 'sdk-go') groups.go.push(...entries); else if (name === 'sdk-java') groups.java.push(...entries); @@ -299,6 +304,7 @@ jobs: // Clear existing groups to avoid duplicates - we'll run everything anyway groups.rust = []; groups.python = []; + groups.php = []; groups.node = []; groups.go = []; groups.java = []; @@ -313,6 +319,7 @@ jobs: const entries = cfg.tasks.map(task => ({ component: name, task })); if (name === 'rust') groups.rust.push(...entries); else if (name === 'sdk-python') groups.python.push(...entries); + else if (name === 'sdk-php') groups.php.push(...entries); else if (name === 'sdk-node') groups.node.push(...entries); else if (name === 'sdk-go') groups.go.push(...entries); else if (name === 'sdk-java') groups.java.push(...entries); @@ -349,6 +356,7 @@ jobs: const jobSummary = [ { name: 'Rust', tasks: groups.rust }, { name: 'Python SDK', tasks: groups.python }, + { name: 'PHP SDK', tasks: groups.php }, { name: 'Node SDK', tasks: groups.node }, { name: 'Go SDK', tasks: groups.go }, { name: 'Java SDK', tasks: groups.java }, @@ -381,6 +389,7 @@ jobs: setOutput('rust_matrix', JSON.stringify(matrix(groups.rust))); setOutput('python_matrix', JSON.stringify(matrix(groups.python))); + setOutput('php_matrix', JSON.stringify(matrix(groups.php))); setOutput('node_matrix', JSON.stringify(matrix(groups.node))); setOutput('go_matrix', JSON.stringify(matrix(groups.go))); setOutput('java_matrix', JSON.stringify(matrix(groups.java))); diff --git a/.github/workflows/_test.yml b/.github/workflows/_test.yml index dc405f2795..61297f589a 100644 --- a/.github/workflows/_test.yml +++ b/.github/workflows/_test.yml @@ -43,6 +43,7 @@ jobs: (inputs.task == 'build-aarch64-gnu' || inputs.task == 'build-aarch64-musl') && 'ubuntu-24.04-arm' || inputs.task == 'build-macos-aarch64' && 'macos-14' || inputs.task == 'build-windows-sdk' && 'windows-latest' || + inputs.component == 'sdk-php' && 'ubuntu-24.04' || 'ubuntu-latest' }} timeout-minutes: 60 @@ -97,6 +98,13 @@ jobs: verbose: true override_pr: ${{ github.event.pull_request.number }} + # PHP SDK + - name: Run PHP SDK task + if: inputs.component == 'sdk-php' + uses: ./.github/actions/php/pre-merge + with: + task: ${{ inputs.task }} + # Node SDK - name: Run Node SDK task if: inputs.component == 'sdk-node' diff --git a/.github/workflows/pre-merge.yml b/.github/workflows/pre-merge.yml index 1e9d5312de..d08eeba935 100644 --- a/.github/workflows/pre-merge.yml +++ b/.github/workflows/pre-merge.yml @@ -78,6 +78,19 @@ jobs: secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + # PHP SDK + test-php: + name: PHP • ${{ matrix.task }} + needs: detect + if: ${{ fromJson(needs.detect.outputs.php_matrix).include[0].component != 'noop' }} + strategy: + fail-fast: false + matrix: ${{ fromJson(needs.detect.outputs.php_matrix) }} + uses: ./.github/workflows/_test.yml + with: + component: ${{ matrix.component }} + task: ${{ matrix.task }} + # Node SDK test-node: name: Node • ${{ matrix.task }} @@ -194,7 +207,7 @@ jobs: status: name: CI Status runs-on: ubuntu-latest - needs: [common, detect, test-rust, test-python, test-node, test-go, test-java, test-csharp, test-cpp, test-bdd, test-examples, test-other] + needs: [common, detect, test-rust, test-python, test-php, test-node, test-go, test-java, test-csharp, test-cpp, test-bdd, test-examples, test-other] if: always() steps: - name: Get job execution times @@ -272,6 +285,7 @@ jobs: // Set outputs for each component const rust = findJobInfo('Rust •'); const python = findJobInfo('Python •'); + const php = findJobInfo('PHP •'); const node = findJobInfo('Node •'); const go = findJobInfo('Go •'); const java = findJobInfo('Java •'); @@ -292,6 +306,7 @@ jobs: // Output formatted durations core.setOutput('rust_time', formatJobDuration(rust)); core.setOutput('python_time', formatJobDuration(python)); + core.setOutput('php_time', formatJobDuration(php)); core.setOutput('node_time', formatJobDuration(node)); core.setOutput('go_time', formatJobDuration(go)); core.setOutput('java_time', formatJobDuration(java)); @@ -382,6 +397,7 @@ jobs: # Language/component tests rust_status=$(format_status "${{ needs.test-rust.result }}" "${{ steps.times.outputs.rust_time }}") python_status=$(format_status "${{ needs.test-python.result }}" "${{ steps.times.outputs.python_time }}") + php_status=$(format_status "${{ needs.test-php.result }}" "${{ steps.times.outputs.php_time }}") node_status=$(format_status "${{ needs.test-node.result }}" "${{ steps.times.outputs.node_time }}") go_status=$(format_status "${{ needs.test-go.result }}" "${{ steps.times.outputs.go_time }}") java_status=$(format_status "${{ needs.test-java.result }}" "${{ steps.times.outputs.java_time }}") @@ -393,6 +409,7 @@ jobs: echo "| 🦀 Rust | $rust_status | ${{ steps.times.outputs.rust_time }} |" >> $GITHUB_STEP_SUMMARY echo "| 🐍 Python | $python_status | ${{ steps.times.outputs.python_time }} |" >> $GITHUB_STEP_SUMMARY + echo "| 🐘 PHP | $php_status | ${{ steps.times.outputs.php_time }} |" >> $GITHUB_STEP_SUMMARY echo "| 🟢 Node | $node_status | ${{ steps.times.outputs.node_time }} |" >> $GITHUB_STEP_SUMMARY echo "| 🐹 Go | $go_status | ${{ steps.times.outputs.go_time }} |" >> $GITHUB_STEP_SUMMARY echo "| ☕ Java | $java_status | ${{ steps.times.outputs.java_time }} |" >> $GITHUB_STEP_SUMMARY @@ -409,6 +426,7 @@ jobs: [[ "${{ needs.detect.result }}" == "failure" ]] || \ [[ "${{ needs.test-rust.result }}" == "failure" ]] || \ [[ "${{ needs.test-python.result }}" == "failure" ]] || \ + [[ "${{ needs.test-php.result }}" == "failure" ]] || \ [[ "${{ needs.test-node.result }}" == "failure" ]] || \ [[ "${{ needs.test-go.result }}" == "failure" ]] || \ [[ "${{ needs.test-java.result }}" == "failure" ]] || \ @@ -424,6 +442,7 @@ jobs: [[ "${{ needs.detect.result }}" == "cancelled" ]] || \ [[ "${{ needs.test-rust.result }}" == "cancelled" ]] || \ [[ "${{ needs.test-python.result }}" == "cancelled" ]] || \ + [[ "${{ needs.test-php.result }}" == "cancelled" ]] || \ [[ "${{ needs.test-node.result }}" == "cancelled" ]] || \ [[ "${{ needs.test-go.result }}" == "cancelled" ]] || \ [[ "${{ needs.test-java.result }}" == "cancelled" ]] || \ diff --git a/Cargo.toml b/Cargo.toml index fcaae3c4de..e0e40a94b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ members = [ "core/tools", "examples/rust", ] -exclude = ["foreign/cpp", "foreign/python"] +exclude = ["foreign/cpp", "foreign/php", "foreign/python"] resolver = "2" [workspace.dependencies] diff --git a/foreign/php/.cargo/config.toml b/foreign/php/.cargo/config.toml new file mode 100644 index 0000000000..652a1ff9c0 --- /dev/null +++ b/foreign/php/.cargo/config.toml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[target.aarch64-apple-darwin] +rustflags = ["-C", "link-arg=-Wl,-undefined,dynamic_lookup"] + +[target.x86_64-apple-darwin] +rustflags = ["-C", "link-arg=-Wl,-undefined,dynamic_lookup"] diff --git a/foreign/php/.gitignore b/foreign/php/.gitignore new file mode 100644 index 0000000000..c9f5961ccf --- /dev/null +++ b/foreign/php/.gitignore @@ -0,0 +1,4 @@ +/target +/vendor +Cargo.lock +composer.lock diff --git a/foreign/php/Cargo.toml b/foreign/php/Cargo.toml new file mode 100644 index 0000000000..e10be21771 --- /dev/null +++ b/foreign/php/Cargo.toml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy-php" +version = "0.1.0" +edition = "2024" +authors = ["Iggy Committers "] +license = "Apache-2.0" +description = "PHP extension bindings for Apache Iggy." +documentation = "https://iggy.apache.org/docs/" +repository = "https://github.com/apache/iggy" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +bytes = "1.11.1" +futures = "0.3.32" +ext-php-rs = "0.15.13" +iggy = { path = "../../core/sdk", version = "0.10.0" } +tokio = "1.50.0" + +[profile.release] +strip = "debuginfo" diff --git a/foreign/php/Dockerfile.test b/foreign/php/Dockerfile.test new file mode 100644 index 0000000000..5fb81e91df --- /dev/null +++ b/foreign/php/Dockerfile.test @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM rust:1.95-slim-bookworm + +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + curl \ + git \ + clang \ + libclang-dev \ + libssl-dev \ + composer \ + php-cli \ + php-dev \ + php-mbstring \ + php-xml \ + pkg-config \ + unzip \ + && rm -rf /var/lib/apt/lists/* + +ENV PHP=/usr/bin/php +ENV PHP_CONFIG=/usr/bin/php-config +ENV IGGY_HOST=iggy-server +ENV IGGY_PORT=8090 + +WORKDIR /workspace + +COPY Cargo.toml Cargo.lock ./ +COPY core/ ./core/ + +COPY foreign/php/Cargo.toml ./foreign/php/ +COPY foreign/php/composer.json ./foreign/php/ +COPY foreign/php/phpunit.xml.dist ./foreign/php/ +COPY foreign/php/README.md foreign/php/LICENSE foreign/php/NOTICE ./foreign/php/ +COPY foreign/php/.cargo/ ./foreign/php/.cargo/ +COPY foreign/php/src/ ./foreign/php/src/ +COPY foreign/php/tests/ ./foreign/php/tests/ +COPY foreign/php/scripts/ ./foreign/php/scripts/ + +WORKDIR /workspace/foreign/php + +RUN cargo install cargo-php --locked +RUN composer install --no-interaction --prefer-dist +RUN cargo php install --yes +RUN chmod +x ./scripts/test.sh + +CMD ["./scripts/test.sh"] diff --git a/foreign/php/LICENSE b/foreign/php/LICENSE new file mode 100644 index 0000000000..8dada3edaf --- /dev/null +++ b/foreign/php/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/foreign/php/NOTICE b/foreign/php/NOTICE new file mode 100644 index 0000000000..cc8d194800 --- /dev/null +++ b/foreign/php/NOTICE @@ -0,0 +1,12 @@ +Apache Iggy (Incubating) +Copyright 2026 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +================================================================ + +The Iggy project code was originally created, designed, developed by Piotr Gankiewicz in April 2023. +It was released as an open-source project under MIT License, later converted to Apache 2.0 License, +and donated by LaserData, Inc. to the Apache Software Foundation (ASF) in February 2025. +Copyright April 2023 - February 2025 Piotr Gankiewicz, LaserData, Inc. diff --git a/foreign/php/README.md b/foreign/php/README.md new file mode 100644 index 0000000000..dd834ad328 --- /dev/null +++ b/foreign/php/README.md @@ -0,0 +1,134 @@ +# iggy-php + +PHP extension bindings for [Apache Iggy](https://iggy.apache.org/), built in Rust with +[`ext-php-rs`](https://github.com/davidcole1340/ext-php-rs). + +This repository is experimental. The extension exposes `IggyClient`, a blocking +synchronous PHP API over the Rust Iggy client. + +## Requirements + +- Rust and Cargo +- PHP with `php-config` +- `cargo-php` +- Composer, for installing PHPUnit +- Docker, for running the integration test server + +On macOS with Homebrew PHP: + +```sh +export PATH="/opt/homebrew/opt/php/bin:$PATH" +export PHP=/opt/homebrew/opt/php/bin/php +export PHP_CONFIG=/opt/homebrew/opt/php/bin/php-config +``` + +## Build + +```sh +cargo build --release +``` + +## Install + +```sh +cargo php install --release --yes +``` + +If the extension is already enabled, reinstall it with: + +```sh +cargo php remove --yes +cargo php install --release --yes +``` + +Verify PHP can load it: + +```sh +php -r 'var_dump(extension_loaded("iggy-php"));' +``` + +## Run Iggy + +```sh +docker run --rm --name iggy-php-test \ + -p 8090:8090 \ + -p 3000:3000 \ + apache/iggy:latest +``` + +The tests assume: + +- host: `127.0.0.1` +- port: `8090` +- username: `iggy` +- password: `iggy` + +Override them with `IGGY_HOST`, `IGGY_PORT`, `IGGY_USERNAME`, and `IGGY_PASSWORD`. + +## Usage + +```php +connect(); +$client->loginUser('iggy', 'iggy'); + +$stream = 'php-stream'; +$topic = 'php-topic'; +$partitionId = 0; + +$client->createStream($stream); +$client->createTopic($stream, $topic, 1, null, null, null, null); + +$client->sendMessages($stream, $topic, $partitionId, [ + new SendMessage('hello from PHP'), +]); + +$messages = $client->pollMessages( + $stream, + $topic, + $partitionId, + PollingStrategy::first(), + 10, + true, +); + +foreach ($messages as $message) { + echo $message->payload(), PHP_EOL; +} +``` + +## Tests + +Run the Dockerized integration suite: + +```sh +docker compose -f docker-compose.test.yml up --build --abort-on-container-exit --exit-code-from php-tests +``` + +Run the PHP test suite: + +```sh +composer install +composer test +``` + +Run Rust verification: + +```sh +cargo test +``` + +TLS tests are opt-in because they require a TLS-enabled Iggy server and certificate +setup. Set `IGGY_TLS_CONNECTION_STRING` to enable TLS connection tests. Set +`IGGY_TLS_PLAINTEXT_ADDRESS` to run the negative plaintext-to-TLS test. + +## API Notes + +- Methods are exposed to PHP as camelCase, for example `createStream()` and + `pollMessages()`. +- Partition IDs use the Iggy partition index. For a topic with one partition, use `0`. +- Large unsigned values that can overflow PHP integers, such as message checksums, + are returned as decimal strings. +- `IggyClient` is synchronous and blocks the current PHP thread. diff --git a/foreign/php/composer.json b/foreign/php/composer.json new file mode 100644 index 0000000000..4b5c64debd --- /dev/null +++ b/foreign/php/composer.json @@ -0,0 +1,12 @@ +{ + "name": "apache/iggy-php", + "description": "PHP extension bindings for Apache Iggy.", + "license": "Apache-2.0", + "type": "library", + "require-dev": { + "phpunit/phpunit": "^10.5" + }, + "scripts": { + "test": "phpunit" + } +} diff --git a/foreign/php/docker-compose.test.yml b/foreign/php/docker-compose.test.yml new file mode 100644 index 0000000000..492779feac --- /dev/null +++ b/foreign/php/docker-compose.test.yml @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + iggy-server: + build: + context: ../.. + dockerfile: core/server/Dockerfile + args: + PROFILE: debug + command: ["--fresh", "--with-default-root-credentials"] + container_name: iggy-server-php-test + security_opt: + - seccomp:unconfined + environment: + - IGGY_HTTP_ADDRESS=0.0.0.0:3000 + - IGGY_TCP_ADDRESS=0.0.0.0:8090 + - IGGY_QUIC_ADDRESS=0.0.0.0:8080 + - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8092 + networks: + - php-test-network + ports: + - "3000:3000" + - "8080:8080" + - "8090:8090" + healthcheck: + test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", "127.0.0.1:8090", "ping"] + interval: 5s + timeout: 5s + retries: 12 + start_period: 10s + volumes: + - iggy-data:/local_data + + php-tests: + build: + context: ../.. + dockerfile: foreign/php/Dockerfile.test + container_name: php-sdk-tests + depends_on: + iggy-server: + condition: service_healthy + networks: + - php-test-network + environment: + - IGGY_HOST=iggy-server + - IGGY_PORT=8090 + - IGGY_USERNAME=iggy + - IGGY_PASSWORD=iggy + volumes: + - ./test-results:/workspace/foreign/php/test-results + +networks: + php-test-network: + name: php-test-network + +volumes: + iggy-data: diff --git a/foreign/php/phpunit.xml.dist b/foreign/php/phpunit.xml.dist new file mode 100644 index 0000000000..7403e076cd --- /dev/null +++ b/foreign/php/phpunit.xml.dist @@ -0,0 +1,11 @@ + + + + + tests + + + diff --git a/foreign/php/scripts/test.sh b/foreign/php/scripts/test.sh new file mode 100755 index 0000000000..6c964ace24 --- /dev/null +++ b/foreign/php/scripts/test.sh @@ -0,0 +1,44 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -euo pipefail + +echo "PHP SDK Test Runner" +echo "===================" + +IGGY_HOST="${IGGY_HOST:-127.0.0.1}" +IGGY_PORT="${IGGY_PORT:-8090}" + +echo "Waiting for Iggy server at ${IGGY_HOST}:${IGGY_PORT}..." +timeout 60 bash -c " + until timeout 5 bash -c ', +} + +#[php_impl] +impl IggyClient { + /// Constructs a new IggyClient from a TCP server address. + #[php(constructor)] + pub fn __construct(conn: Option) -> PhpResult { + let client = IggyClientBuilder::new() + .with_tcp() + .with_server_address(conn.unwrap_or_else(|| "127.0.0.1:8090".to_string())) + .build() + .map_err(to_php_exception)?; + + Ok(Self { + inner: Arc::new(client), + }) + } + + /// Constructs a new IggyClient from a connection string. + pub fn from_connection_string(connection_string: String) -> PhpResult { + let client = + RustIggyClient::from_connection_string(&connection_string).map_err(to_php_exception)?; + + Ok(Self { + inner: Arc::new(client), + }) + } + + /// Sends a ping request to the server. + pub fn ping(&self) -> PhpResult { + let inner = self.inner.clone(); + runtime().block_on(async move { inner.ping().await.map_err(to_php_exception) }) + } + + /// Logs in the user with the given credentials. + pub fn login_user(&self, username: String, password: String) -> PhpResult { + let inner = self.inner.clone(); + + runtime().block_on(async move { + inner + .login_user(&username, &password) + .await + .map(|_| ()) + .map_err(to_php_exception) + }) + } + + /// Connects the IggyClient to its service. + pub fn connect(&self) -> PhpResult { + let inner = self.inner.clone(); + runtime().block_on(async move { inner.connect().await.map_err(to_php_exception) }) + } + + /// Creates a new stream. + pub fn create_stream(&self, name: String) -> PhpResult { + let inner = self.inner.clone(); + + runtime().block_on(async move { + inner + .create_stream(&name) + .await + .map(|_| ()) + .map_err(to_php_exception) + }) + } + + /// Gets a stream by id or name. + pub fn get_stream(&self, stream_id: PhpIdentifier) -> PhpResult> { + let stream_id = stream_id.into_identifier()?; + let inner = self.inner.clone(); + + runtime().block_on(async move { + inner + .get_stream(&stream_id) + .await + .map(|stream| stream.map(StreamDetails::from)) + .map_err(to_php_exception) + }) + } + + /// Creates a topic. + /// + /// message_expiry_micros is null for server default. + #[allow(clippy::too_many_arguments)] + pub fn create_topic( + &self, + stream: PhpIdentifier, + name: String, + partitions_count: u32, + compression_algorithm: Option, + replication_factor: Option, + message_expiry_micros: Option, + max_topic_size: Option, + ) -> PhpResult { + let compression_algorithm = match compression_algorithm { + Some(value) => CompressionAlgorithm::from_str(&value).map_err(to_php_exception)?, + None => CompressionAlgorithm::default(), + }; + let expiry = message_expiry_micros.map_or(IggyExpiry::ServerDefault, |micros| { + IggyExpiry::ExpireDuration(iggy_duration_from_micros(micros)) + }); + let max_size = max_topic_size.map_or(MaxTopicSize::ServerDefault, MaxTopicSize::from); + let stream = stream.into_identifier()?; + let inner = self.inner.clone(); + + runtime().block_on(async move { + inner + .create_topic( + &stream, + &name, + partitions_count, + compression_algorithm, + replication_factor, + expiry, + max_size, + ) + .await + .map(|_| ()) + .map_err(to_php_exception) + }) + } + + /// Gets a topic by stream and topic id/name. + pub fn get_topic( + &self, + stream_id: PhpIdentifier, + topic_id: PhpIdentifier, + ) -> PhpResult> { + let stream_id = stream_id.into_identifier()?; + let topic_id = topic_id.into_identifier()?; + let inner = self.inner.clone(); + + runtime().block_on(async move { + inner + .get_topic(&stream_id, &topic_id) + .await + .map(|topic| topic.map(TopicDetails::from)) + .map_err(to_php_exception) + }) + } + + /// Sends messages to a topic. + pub fn send_messages( + &self, + stream: PhpIdentifier, + topic: PhpIdentifier, + partition_id: u32, + messages: Vec<&SendMessage>, + ) -> PhpResult { + let stream = stream.into_identifier()?; + let topic = topic.into_identifier()?; + let partitioning = Partitioning::partition_id(partition_id); + let mut messages: Vec = messages + .into_iter() + .map(|message| (*message).clone().inner) + .collect(); + let inner = self.inner.clone(); + + runtime().block_on(async move { + inner + .send_messages(&stream, &topic, &partitioning, messages.as_mut()) + .await + .map_err(to_php_exception) + }) + } + + /// Polls messages from the specified topic and partition. + pub fn poll_messages( + &self, + stream: PhpIdentifier, + topic: PhpIdentifier, + partition_id: u32, + polling_strategy: &PollingStrategy, + count: u32, + auto_commit: bool, + ) -> PhpResult> { + let consumer = RustConsumer::default(); + let stream = stream.into_identifier()?; + let topic = topic.into_identifier()?; + let strategy: RustPollingStrategy = polling_strategy.into(); + let inner = self.inner.clone(); + + runtime().block_on(async move { + let polled_messages = inner + .poll_messages( + &stream, + &topic, + Some(partition_id), + &consumer, + &strategy, + count, + auto_commit, + ) + .await + .map_err(to_php_exception)?; + + Ok(polled_messages + .messages + .into_iter() + .map(|message| ReceiveMessage { + inner: message, + partition_id, + }) + .collect()) + }) + } + + /// Creates and initializes a consumer group consumer. + #[allow(clippy::too_many_arguments)] + pub fn consumer_group( + &self, + name: String, + stream: String, + topic: String, + partition_id: Option, + polling_strategy: Option<&PollingStrategy>, + batch_length: Option, + auto_commit: Option<&AutoCommit>, + create_consumer_group_if_not_exists: bool, + auto_join_consumer_group: bool, + poll_interval_micros: Option, + polling_retry_interval_micros: Option, + init_retries: Option, + init_retry_interval_micros: Option, + allow_replay: bool, + ) -> PhpResult { + let mut builder = self + .inner + .consumer_group(&name, &stream, &topic) + .map_err(to_php_exception)? + .without_encryptor() + .partition(partition_id); + + builder = if create_consumer_group_if_not_exists { + builder.create_consumer_group_if_not_exists() + } else { + builder.do_not_create_consumer_group_if_not_exists() + }; + builder = if auto_join_consumer_group { + builder.auto_join_consumer_group() + } else { + builder.do_not_auto_join_consumer_group() + }; + if let Some(polling_strategy) = polling_strategy { + builder = builder.polling_strategy(polling_strategy.into()); + } + if let Some(batch_length) = batch_length { + builder = builder.batch_length(batch_length); + } + if let Some(auto_commit) = auto_commit { + builder = builder.auto_commit(auto_commit.into()); + } + builder = match poll_interval_micros { + Some(micros) => builder.poll_interval(iggy_duration_from_micros(micros)), + None => builder.without_poll_interval(), + }; + if let Some(micros) = polling_retry_interval_micros { + builder = builder.polling_retry_interval(iggy_duration_from_micros(micros)); + } + + match (init_retries, init_retry_interval_micros) { + (Some(retries), Some(micros)) => { + builder = builder.init_retries(retries, iggy_duration_from_micros(micros)); + } + (Some(_), None) => { + return Err( + "'init_retry_interval_micros' is required if 'init_retries' is set".into(), + ); + } + (None, Some(_)) => { + return Err( + "'init_retries' is required if 'init_retry_interval_micros' is set".into(), + ); + } + (None, None) => {} + } + if allow_replay { + builder = builder.allow_replay(); + } + + let mut consumer = builder.build(); + runtime().block_on(async move { + consumer.init().await.map_err(to_php_exception)?; + Ok(IggyConsumer { + inner: Arc::new(Mutex::new(consumer)), + }) + }) + } +} + +fn to_php_exception(error: impl std::fmt::Display) -> PhpException { + PhpException::default(error.to_string()) +} + +fn iggy_duration_from_micros(micros: u64) -> IggyDuration { + IggyDuration::new(Duration::from_micros(micros)) +} diff --git a/foreign/php/src/consumer.rs b/foreign/php/src/consumer.rs new file mode 100644 index 0000000000..65fa83215f --- /dev/null +++ b/foreign/php/src/consumer.rs @@ -0,0 +1,288 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::{sync::Arc, time::Duration}; + +use ext_php_rs::{ + exception::{PhpException, PhpResult}, + php_class, php_impl, + types::ZendCallable, +}; +use futures::StreamExt; +use iggy::prelude::{ + AutoCommit as RustAutoCommit, AutoCommitAfter as RustAutoCommitAfter, + AutoCommitWhen as RustAutoCommitWhen, IggyConsumer as RustIggyConsumer, IggyDuration, +}; +use tokio::sync::Mutex; + +use crate::receive_message::ReceiveMessage; +use crate::runtime::runtime; + +/// A PHP class representing the Iggy consumer. +#[php_class] +pub struct IggyConsumer { + pub(crate) inner: Arc>, +} + +#[php_impl] +impl IggyConsumer { + /// Get the last consumed offset or null if no offset has been consumed yet. + pub fn get_last_consumed_offset(&self, partition_id: u32) -> Option { + self.with_consumer(|inner| inner.get_last_consumed_offset(partition_id)) + } + + /// Get the last stored offset or null if no offset has been stored yet. + pub fn get_last_stored_offset(&self, partition_id: u32) -> Option { + self.with_consumer(|inner| inner.get_last_stored_offset(partition_id)) + } + + /// Gets the name of the consumer group. + pub fn name(&self) -> String { + self.with_consumer(|inner| inner.name().to_string()) + } + + /// Gets the current partition id or 0 if no messages have been polled yet. + pub fn partition_id(&self) -> u32 { + self.with_consumer(RustIggyConsumer::partition_id) + } + + /// Gets the stream identifier this consumer is configured for. + pub fn stream(&self) -> String { + self.with_consumer(|inner| inner.stream().to_string()) + } + + /// Gets the topic identifier this consumer is configured for. + pub fn topic(&self) -> String { + self.with_consumer(|inner| inner.topic().to_string()) + } + + /// Stores the provided offset for the provided partition id. + /// + /// If partition_id is null, the current partition id is used. + pub fn store_offset(&self, offset: u64, partition_id: Option) -> PhpResult { + let inner = self.inner.clone(); + + runtime().block_on(async move { + inner + .lock() + .await + .store_offset(offset, partition_id) + .await + .map_err(|err| PhpException::default(err.to_string())) + }) + } + + /// Deletes the stored offset for the provided partition id. + /// + /// If partition_id is null, the current partition id is used. + pub fn delete_offset(&self, partition_id: Option) -> PhpResult { + let inner = self.inner.clone(); + + runtime().block_on(async move { + inner + .lock() + .await + .delete_offset(partition_id) + .await + .map_err(|err| PhpException::default(err.to_string())) + }) + } + + /// Consumes messages with a PHP callback. + /// + /// The callback is called as callback(ReceiveMessage $message). If limit is null, + /// this method runs until the consumer stream ends or an error occurs. + pub fn consume_messages(&self, callback: ZendCallable, limit: Option) -> PhpResult { + let mut consumed = 0; + let max_messages = limit.unwrap_or(u32::MAX); + + while consumed < max_messages { + let Some(message) = self.next_message()? else { + break; + }; + + callback + .try_call(vec![&message]) + .map_err(|err| PhpException::default(err.to_string()))?; + consumed += 1; + } + + Ok(consumed) + } +} + +impl IggyConsumer { + fn with_consumer(&self, f: impl FnOnce(&RustIggyConsumer) -> T) -> T { + let inner = self.inner.clone(); + + runtime().block_on(async move { + let inner = inner.lock().await; + f(&inner) + }) + } + + fn next_message(&self) -> PhpResult> { + let inner = self.inner.clone(); + + runtime().block_on(async move { + let mut inner = inner.lock().await; + + match inner.next().await { + Some(Ok(message)) => Ok(Some(ReceiveMessage { + inner: message.message, + partition_id: message.partition_id, + })), + Some(Err(err)) => Err(PhpException::default(err.to_string())), + None => Ok(None), + } + }) + } +} + +#[php_class] +#[derive(Clone, Copy)] +pub struct AutoCommit { + pub(crate) inner: RustAutoCommit, +} + +#[php_impl] +impl AutoCommit { + pub fn disabled() -> Self { + Self { + inner: RustAutoCommit::Disabled, + } + } + + pub fn interval(interval_micros: u64) -> Self { + Self { + inner: RustAutoCommit::Interval(iggy_duration_from_micros(interval_micros)), + } + } + + pub fn interval_or_when(interval_micros: u64, when: &AutoCommitWhen) -> Self { + Self { + inner: RustAutoCommit::IntervalOrWhen( + iggy_duration_from_micros(interval_micros), + when.inner, + ), + } + } + + pub fn interval_or_after(interval_micros: u64, after: &AutoCommitAfter) -> Self { + Self { + inner: RustAutoCommit::IntervalOrAfter( + iggy_duration_from_micros(interval_micros), + after.inner, + ), + } + } + + pub fn when(when: &AutoCommitWhen) -> Self { + Self { + inner: RustAutoCommit::When(when.inner), + } + } + + pub fn after(after: &AutoCommitAfter) -> Self { + Self { + inner: RustAutoCommit::After(after.inner), + } + } +} + +impl From<&AutoCommit> for RustAutoCommit { + fn from(value: &AutoCommit) -> Self { + value.inner + } +} + +#[php_class] +#[derive(Clone, Copy)] +pub struct AutoCommitWhen { + pub(crate) inner: RustAutoCommitWhen, +} + +#[php_impl] +impl AutoCommitWhen { + pub fn polling_messages() -> Self { + Self { + inner: RustAutoCommitWhen::PollingMessages, + } + } + + pub fn consuming_all_messages() -> Self { + Self { + inner: RustAutoCommitWhen::ConsumingAllMessages, + } + } + + pub fn consuming_each_message() -> Self { + Self { + inner: RustAutoCommitWhen::ConsumingEachMessage, + } + } + + pub fn consuming_every_nth_message(n: u32) -> Self { + Self { + inner: RustAutoCommitWhen::ConsumingEveryNthMessage(n), + } + } +} + +impl From<&AutoCommitWhen> for RustAutoCommitWhen { + fn from(value: &AutoCommitWhen) -> Self { + value.inner + } +} + +#[php_class] +#[derive(Clone, Copy)] +pub struct AutoCommitAfter { + pub(crate) inner: RustAutoCommitAfter, +} + +#[php_impl] +impl AutoCommitAfter { + pub fn consuming_all_messages() -> Self { + Self { + inner: RustAutoCommitAfter::ConsumingAllMessages, + } + } + + pub fn consuming_each_message() -> Self { + Self { + inner: RustAutoCommitAfter::ConsumingEachMessage, + } + } + + pub fn consuming_every_nth_message(n: u32) -> Self { + Self { + inner: RustAutoCommitAfter::ConsumingEveryNthMessage(n), + } + } +} + +impl From<&AutoCommitAfter> for RustAutoCommitAfter { + fn from(value: &AutoCommitAfter) -> Self { + value.inner + } +} + +fn iggy_duration_from_micros(micros: u64) -> IggyDuration { + IggyDuration::new(Duration::from_micros(micros)) +} diff --git a/foreign/php/src/identifier.rs b/foreign/php/src/identifier.rs new file mode 100644 index 0000000000..ae751516e9 --- /dev/null +++ b/foreign/php/src/identifier.rs @@ -0,0 +1,73 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::str::FromStr; + +use ext_php_rs::{ + convert::FromZval, + exception::{PhpException, PhpResult}, + flags::DataType, + types::Zval, +}; +use iggy::prelude::{IdKind, Identifier}; + +pub enum PhpIdentifier { + String(String), + Int(u32), +} + +impl FromZval<'_> for PhpIdentifier { + const TYPE: DataType = DataType::Mixed; + + fn from_zval(zval: &Zval) -> Option { + if let Some(value) = zval.string() { + return Some(Self::String(value)); + } + + zval.long() + .and_then(|value| u32::try_from(value).ok()) + .map(Self::Int) + } +} + +impl PhpIdentifier { + pub(crate) fn into_identifier(self) -> PhpResult { + match self { + PhpIdentifier::String(value) => Identifier::from_str(&value), + PhpIdentifier::Int(value) => Identifier::numeric(value), + } + .map_err(|err| PhpException::default(err.to_string())) + } +} + +impl TryFrom<&Identifier> for PhpIdentifier { + type Error = PhpException; + + fn try_from(value: &Identifier) -> Result { + match value.kind { + IdKind::String => value + .get_string_value() + .map(PhpIdentifier::String) + .map_err(|err| PhpException::default(err.to_string())), + IdKind::Numeric => value + .get_u32_value() + .map(PhpIdentifier::Int) + .map_err(|err| PhpException::default(err.to_string())), + } + } +} diff --git a/foreign/php/src/lib.rs b/foreign/php/src/lib.rs new file mode 100644 index 0000000000..40975e57c0 --- /dev/null +++ b/foreign/php/src/lib.rs @@ -0,0 +1,50 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod client; +pub mod consumer; +pub mod identifier; +pub mod receive_message; +pub mod runtime; +pub mod send_message; +pub mod stream; +pub mod topic; + +use ext_php_rs::prelude::*; + +use crate::client::IggyClient; +use crate::consumer::{AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer}; +use crate::receive_message::{PollingStrategy, ReceiveMessage}; +use crate::send_message::SendMessage; +use crate::stream::StreamDetails; +use crate::topic::TopicDetails; + +#[php_module] +pub fn get_module(module: ModuleBuilder) -> ModuleBuilder { + module + .class::() + .class::() + .class::() + .class::() + .class::() + .class::() + .class::() + .class::() + .class::() + .class::() +} diff --git a/foreign/php/src/receive_message.rs b/foreign/php/src/receive_message.rs new file mode 100644 index 0000000000..e34501a84e --- /dev/null +++ b/foreign/php/src/receive_message.rs @@ -0,0 +1,148 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use ext_php_rs::{binary::Binary, php_class, php_impl}; +use iggy::prelude::{ + IggyMessage as RustReceiveMessage, IggyMessageHeader, PollingStrategy as RustPollingStrategy, +}; + +/// A PHP class representing a received message. +/// +/// This class wraps a Rust message, allowing PHP code to access its payload and metadata. +#[php_class] +pub struct ReceiveMessage { + pub(crate) inner: RustReceiveMessage, + pub(crate) partition_id: u32, +} + +impl Clone for ReceiveMessage { + fn clone(&self) -> Self { + Self { + inner: RustReceiveMessage { + header: IggyMessageHeader { + checksum: self.inner.header.checksum, + id: self.inner.header.id, + offset: self.inner.header.offset, + timestamp: self.inner.header.timestamp, + origin_timestamp: self.inner.header.origin_timestamp, + user_headers_length: self.inner.header.user_headers_length, + payload_length: self.inner.header.payload_length, + reserved: self.inner.header.reserved, + }, + payload: self.inner.payload.clone(), + user_headers: self.inner.user_headers.clone(), + }, + partition_id: self.partition_id, + } + } +} + +#[php_impl] +impl ReceiveMessage { + /// Retrieves the payload of the received message. + /// + /// The payload is returned as a PHP string, which can represent both text and binary data. + pub fn payload(&self) -> Binary { + Binary::new(self.inner.payload.to_vec()) + } + + /// Retrieves the offset of the received message. + /// + /// The offset represents the position of the message within its topic. + pub fn offset(&self) -> u64 { + self.inner.header.offset + } + + /// Retrieves the timestamp of the received message. + /// + /// The timestamp represents the time of the message within its topic. + pub fn timestamp(&self) -> u64 { + self.inner.header.timestamp + } + + /// Retrieves the id of the received message. + /// + /// The id represents unique identifier of the message within its topic. + pub fn id(&self) -> String { + self.inner.header.id.to_string() + } + + /// Retrieves the checksum of the received message. + /// + /// The checksum represents the integrity of the message within its topic. + pub fn checksum(&self) -> String { + self.inner.header.checksum.to_string() + } + + /// Retrieves the length of the received message. + /// + /// The length represents the length of the payload. + pub fn length(&self) -> u32 { + self.inner.header.payload_length + } + + /// Retrieves the partition this message belongs to. + pub fn partition_id(&self) -> u32 { + self.partition_id + } +} + +#[php_class] +#[derive(Clone)] +pub struct PollingStrategy { + pub(crate) inner: RustPollingStrategy, +} + +impl From<&PollingStrategy> for RustPollingStrategy { + fn from(value: &PollingStrategy) -> Self { + value.inner + } +} + +#[php_impl] +impl PollingStrategy { + pub fn offset(value: u64) -> Self { + Self { + inner: RustPollingStrategy::offset(value), + } + } + + pub fn timestamp(value: u64) -> Self { + Self { + inner: RustPollingStrategy::timestamp(value.into()), + } + } + + pub fn first() -> Self { + Self { + inner: RustPollingStrategy::first(), + } + } + + pub fn last() -> Self { + Self { + inner: RustPollingStrategy::last(), + } + } + + pub fn next() -> Self { + Self { + inner: RustPollingStrategy::next(), + } + } +} diff --git a/foreign/php/src/runtime.rs b/foreign/php/src/runtime.rs new file mode 100644 index 0000000000..c223db0ba2 --- /dev/null +++ b/foreign/php/src/runtime.rs @@ -0,0 +1,30 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::OnceLock; + +pub fn runtime() -> &'static tokio::runtime::Runtime { + static RUNTIME: OnceLock = OnceLock::new(); + + RUNTIME.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to initialize Tokio runtime") + }) +} diff --git a/foreign/php/src/send_message.rs b/foreign/php/src/send_message.rs new file mode 100644 index 0000000000..1d95bc6a56 --- /dev/null +++ b/foreign/php/src/send_message.rs @@ -0,0 +1,79 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use bytes::Bytes; +use ext_php_rs::{ + binary::Binary, + exception::{PhpException, PhpResult}, + php_class, php_impl, +}; +use iggy::prelude::{IggyMessage as RustIggyMessage, IggyMessageHeader}; + +/// A PHP class representing a message to be sent. +#[php_class] +pub struct SendMessage { + pub(crate) inner: RustIggyMessage, +} + +impl Clone for SendMessage { + fn clone(&self) -> Self { + Self { + inner: RustIggyMessage { + header: IggyMessageHeader { + checksum: self.inner.header.checksum, + id: self.inner.header.id, + offset: self.inner.header.offset, + timestamp: self.inner.header.timestamp, + origin_timestamp: self.inner.header.origin_timestamp, + user_headers_length: self.inner.header.user_headers_length, + payload_length: self.inner.header.payload_length, + reserved: self.inner.header.reserved, + }, + payload: self.inner.payload.clone(), + user_headers: self.inner.user_headers.clone(), + }, + } + } +} + +#[php_impl] +impl SendMessage { + /// Constructs a new `SendMessage` instance from a PHP string. + /// + /// PHP strings are byte strings, so this accepts both text and binary payloads. + #[php(constructor)] + pub fn __construct(data: Binary) -> PhpResult { + // `Binary` already owns the PHP string bytes; `Bytes::from(Vec<_>)` reuses that buffer. + let inner = RustIggyMessage::builder() + .payload(Bytes::from(Vec::::from(data))) + .build() + .map_err(|err| PhpException::default(err.to_string()))?; + + Ok(Self { inner }) + } + + #[php(getter)] + pub fn id(&self) -> String { + self.inner.header.id.to_string() + } + + #[php(getter)] + pub fn payload(&self) -> Binary { + Binary::new(self.inner.payload.to_vec()) + } +} diff --git a/foreign/php/src/stream.rs b/foreign/php/src/stream.rs new file mode 100644 index 0000000000..5da29fb27a --- /dev/null +++ b/foreign/php/src/stream.rs @@ -0,0 +1,56 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use ext_php_rs::{php_class, php_impl}; +use iggy::prelude::StreamDetails as RustStreamDetails; + +#[php_class] +pub struct StreamDetails { + pub(crate) inner: RustStreamDetails, +} + +impl From for StreamDetails { + fn from(stream_details: RustStreamDetails) -> Self { + Self { + inner: stream_details, + } + } +} + +#[php_impl] +impl StreamDetails { + #[php(getter)] + pub fn id(&self) -> u32 { + self.inner.id + } + + #[php(getter)] + pub fn name(&self) -> String { + self.inner.name.to_string() + } + + #[php(getter)] + pub fn messages_count(&self) -> u64 { + self.inner.messages_count + } + + #[php(getter)] + pub fn topics_count(&self) -> u32 { + self.inner.topics_count + } +} diff --git a/foreign/php/src/topic.rs b/foreign/php/src/topic.rs new file mode 100644 index 0000000000..89c20b988a --- /dev/null +++ b/foreign/php/src/topic.rs @@ -0,0 +1,54 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use ext_php_rs::{php_class, php_impl}; +use iggy::prelude::TopicDetails as RustTopicDetails; + +#[php_class] +pub struct TopicDetails { + pub(crate) inner: RustTopicDetails, +} + +impl From for TopicDetails { + fn from(inner: RustTopicDetails) -> Self { + Self { inner } + } +} + +#[php_impl] +impl TopicDetails { + #[php(getter)] + pub fn id(&self) -> u32 { + self.inner.id + } + + #[php(getter)] + pub fn name(&self) -> String { + self.inner.name.to_string() + } + + #[php(getter)] + pub fn messages_count(&self) -> u64 { + self.inner.messages_count + } + + #[php(getter)] + pub fn partitions_count(&self) -> u32 { + self.inner.partitions_count + } +} diff --git a/foreign/php/tests/IggySdkTest.php b/foreign/php/tests/IggySdkTest.php new file mode 100644 index 0000000000..bf39d35b69 --- /dev/null +++ b/foreign/php/tests/IggySdkTest.php @@ -0,0 +1,295 @@ +ping(); + } + + public function testClientNotNull(): void + { + assert_true(new_client() instanceof IggyClient); + } + + public function testClientFromConnectionString(): void + { + $client = new_connection_string_client(); + $client->ping(); + } + + public function testCreateAndGetStream(): void + { + $client = new_client(); + $streamName = unique_name('test-stream'); + + $client->createStream($streamName); + $stream = $client->getStream($streamName); + + assert_not_null($stream); + assert_same($streamName, $stream->name); + assert_true($stream->id >= 0, 'expected non-negative stream id'); + } + + public function testNewStreamHasNoTopics(): void + { + $client = new_client(); + $streamName = unique_name('test-stream'); + + $client->createStream($streamName); + $stream = $client->getStream($streamName); + + assert_not_null($stream); + assert_same($streamName, $stream->name); + assert_true($stream->id > 0, 'expected positive stream id'); + assert_same(0, $stream->topics_count); + } + + public function testCreateAndGetTopic(): void + { + $client = new_client(); + $streamName = unique_name('test-stream'); + $topicName = unique_name('test-topic'); + + $client->createStream($streamName); + $client->createTopic($streamName, $topicName, 2, null, null, null, null); + $topic = $client->getTopic($streamName, $topicName); + + assert_not_null($topic); + assert_same($topicName, $topic->name); + assert_true($topic->id >= 0, 'expected non-negative topic id'); + assert_same(2, $topic->partitions_count); + } + + public function testListTopicsViaGetTopic(): void + { + $client = new_client(); + $streamName = unique_name('test-stream'); + $topicName = unique_name('test-topic'); + + create_stream_and_topic($client, $streamName, $topicName); + $topic = $client->getTopic($streamName, $topicName); + + assert_not_null($topic); + assert_same($topicName, $topic->name); + assert_true($topic->id >= 0, 'expected non-negative topic id'); + assert_same(1, $topic->partitions_count); + } + + public function testSendAndPollBinaryMessages(): void + { + $client = new_client(); + $streamName = unique_name('msg-stream'); + $topicName = unique_name('msg-topic'); + $partitionId = 0; + $messages = array_map( + static fn (int $i): string => random_bytes(16) . pack('C*', 0, $i, 255), + range(1, 3), + ); + + create_stream_and_topic($client, $streamName, $topicName); + $client->sendMessages( + $streamName, + $topicName, + $partitionId, + array_map(static fn (string $payload): SendMessage => new SendMessage($payload), $messages), + ); + + $polled = $client->pollMessages($streamName, $topicName, $partitionId, PollingStrategy::first(), 10, true); + assert_true(count($polled) >= count($messages), 'expected at least the sent messages'); + assert_same($messages, array_slice(collect_payloads($polled), 0, count($messages))); + } + + public function testMessageProperties(): void + { + $client = new_client(); + $streamName = unique_name('msg-stream'); + $topicName = unique_name('msg-topic'); + $partitionId = 0; + $payload = unique_name('Property test'); + + create_stream_and_topic($client, $streamName, $topicName); + $client->sendMessages($streamName, $topicName, $partitionId, [new SendMessage($payload)]); + + $polled = $client->pollMessages($streamName, $topicName, $partitionId, PollingStrategy::last(), 1, true); + assert_true(count($polled) >= 1, 'expected one message'); + + $message = $polled[0]; + assert_same($payload, $message->payload()); + assert_true($message->offset() >= 0, 'expected non-negative offset'); + assert_true($message->id() !== '', 'expected message id'); + assert_true($message->timestamp() > 0, 'expected positive timestamp'); + assert_true(ctype_digit($message->checksum()), 'expected numeric checksum'); + assert_true($message->length() > 0, 'expected positive length'); + assert_same($partitionId, $message->partitionId()); + } + + public function testPollingStrategies(): void + { + $client = new_client(); + $streamName = unique_name('poll-stream'); + $topicName = unique_name('poll-topic'); + $partitionId = 0; + $messages = array_map( + static fn (int $i): string => "Polling test {$i} - {$streamName}", + range(0, 4), + ); + + create_stream_and_topic($client, $streamName, $topicName); + $client->sendMessages( + $streamName, + $topicName, + $partitionId, + array_map(static fn (string $payload): SendMessage => new SendMessage($payload), $messages), + ); + + $firstMessages = $client->pollMessages($streamName, $topicName, $partitionId, PollingStrategy::first(), 1, false); + assert_true(count($firstMessages) >= 1, 'first strategy returned no messages'); + + $lastMessages = $client->pollMessages($streamName, $topicName, $partitionId, PollingStrategy::last(), 1, false); + assert_true(count($lastMessages) >= 1, 'last strategy returned no messages'); + + $nextMessages = $client->pollMessages($streamName, $topicName, $partitionId, PollingStrategy::next(), 2, false); + assert_true(count($nextMessages) >= 1, 'next strategy returned no messages'); + + $offsetMessages = $client->pollMessages( + $streamName, + $topicName, + $partitionId, + PollingStrategy::offset($firstMessages[0]->offset()), + 1, + false, + ); + assert_true(count($offsetMessages) >= 1, 'offset strategy returned no messages'); + } + + public function testDuplicateStreamCreation(): void + { + $client = new_client(); + $streamName = unique_name('duplicate-test'); + + $client->createStream($streamName); + assert_throws(static fn () => $client->createStream($streamName), 'already exists'); + } + + public function testGetNonexistentStream(): void + { + $stream = new_client()->getStream(unique_name('nonexistent')); + + assert_null($stream); + } + + public function testCreateTopicInNonexistentStream(): void + { + $client = new_client(); + + assert_throws( + static fn () => $client->createTopic(unique_name('nonexistent'), 'test-topic', 1, null, null, null, null), + ); + } + + public function testConsumerGroupMeta(): void + { + $client = new_client(); + $consumerName = unique_name('consumer-group-consumer'); + $streamName = unique_name('consumer-group-stream'); + $topicName = unique_name('consumer-group-topic'); + $partitionId = 0; + + create_stream_and_topic($client, $streamName, $topicName); + $consumer = $client->consumerGroup( + $consumerName, + $streamName, + $topicName, + $partitionId, + PollingStrategy::next(), + 10, + AutoCommit::interval(micros(5)), + true, + true, + micros(1), + null, + null, + null, + false, + ); + + assert_same($streamName, $consumer->stream()); + assert_same($topicName, $consumer->topic()); + assert_same(0, $consumer->partitionId()); + assert_null($consumer->getLastConsumedOffset($partitionId)); + assert_null($consumer->getLastStoredOffset($partitionId)); + } + + public function testConsumeMessages(): void + { + $client = new_client(); + $consumerName = unique_name('consumer-group-consumer'); + $streamName = unique_name('consumer-group-stream'); + $topicName = unique_name('consumer-group-topic'); + $partitionId = 0; + $messages = array_map( + static fn (int $i): string => "Consumer group test {$i} - {$streamName}", + range(0, 4), + ); + + create_stream_and_topic($client, $streamName, $topicName); + $client->sendMessages( + $streamName, + $topicName, + $partitionId, + array_map(static fn (string $payload): SendMessage => new SendMessage($payload), $messages), + ); + + $consumer = $client->consumerGroup( + $consumerName, + $streamName, + $topicName, + $partitionId, + PollingStrategy::next(), + 10, + AutoCommit::interval(micros(5)), + true, + true, + micros(1), + null, + null, + null, + false, + ); + $received = []; + $count = $consumer->consumeMessages( + static function (ReceiveMessage $message) use (&$received): void { + $received[] = $message->payload(); + }, + count($messages), + ); + + assert_same(count($messages), $count); + assert_same($messages, $received); + } + +} diff --git a/foreign/php/tests/TlsTest.php b/foreign/php/tests/TlsTest.php new file mode 100644 index 0000000000..c3404d1c0f --- /dev/null +++ b/foreign/php/tests/TlsTest.php @@ -0,0 +1,79 @@ +markTestSkipped('set IGGY_TLS_CONNECTION_STRING to run TLS tests'); + } + + $client = IggyClient::fromConnectionString($connectionString); + $client->connect(); + $client->ping(); + } + + public function testProduceAndConsumeOverTls(): void + { + $connectionString = getenv('IGGY_TLS_CONNECTION_STRING'); + if ($connectionString === false || $connectionString === '') { + $this->markTestSkipped('set IGGY_TLS_CONNECTION_STRING to run TLS tests'); + } + + $client = IggyClient::fromConnectionString($connectionString); + $client->connect(); + $client->ping(); + + $streamName = unique_name('tls-msg-stream'); + $topicName = unique_name('tls-test-topic'); + $partitionId = 0; + $messages = array_map(static fn (int $i): string => "tls-message-{$i}", range(0, 2)); + + create_stream_and_topic($client, $streamName, $topicName); + $client->sendMessages( + $streamName, + $topicName, + $partitionId, + array_map(static fn (string $payload): SendMessage => new SendMessage($payload), $messages), + ); + + $polled = $client->pollMessages($streamName, $topicName, $partitionId, PollingStrategy::first(), 10, true); + assert_true(count($polled) >= count($messages), 'expected TLS messages'); + assert_same($messages, array_slice(collect_payloads($polled), 0, count($messages))); + } + + public function testConnectWithoutTlsShouldFail(): void + { + $address = getenv('IGGY_TLS_PLAINTEXT_ADDRESS'); + if ($address === false || $address === '') { + $this->markTestSkipped('set IGGY_TLS_PLAINTEXT_ADDRESS to run this TLS failure test'); + } + + $client = new IggyClient($address); + assert_throws(static fn () => $client->connect()); + } +} diff --git a/foreign/php/tests/bootstrap.php b/foreign/php/tests/bootstrap.php new file mode 100644 index 0000000000..e0f914a55e --- /dev/null +++ b/foreign/php/tests/bootstrap.php @@ -0,0 +1,148 @@ +getMessage(), $messageContains)) { + Assert::fail( + 'expected exception message to contain ' . var_export($messageContains, true) + . ', got ' . var_export($throwable->getMessage(), true) + ); + } + + return $throwable; + } + + Assert::fail('expected callable to throw'); +} + +function unique_name(string $prefix): string +{ + return $prefix . '-' . bin2hex(random_bytes(4)); +} + +function env_or_default(string $name, string $default): string +{ + $value = getenv($name); + + return $value === false || $value === '' ? $default : $value; +} + +function server_host(): string +{ + return env_or_default('IGGY_HOST', '127.0.0.1'); +} + +function server_port(): int +{ + return (int) env_or_default('IGGY_PORT', '8090'); +} + +function wait_for_server(string $host, int $port, int $timeoutSeconds = 30): void +{ + $deadline = microtime(true) + $timeoutSeconds; + $lastError = null; + + while (microtime(true) < $deadline) { + $socket = @fsockopen($host, $port, $errno, $errstr, 1.0); + if (is_resource($socket)) { + fclose($socket); + + return; + } + + $lastError = trim($errstr !== '' ? $errstr : (string) $errno); + usleep(250_000); + } + + Assert::fail("Iggy server was not reachable at {$host}:{$port}" . ($lastError !== null ? " ({$lastError})" : '')); +} + +function new_client(): IggyClient +{ + $client = new IggyClient(server_host() . ':' . server_port()); + $client->connect(); + $client->loginUser(env_or_default('IGGY_USERNAME', 'iggy'), env_or_default('IGGY_PASSWORD', 'iggy')); + + return $client; +} + +function new_connection_string_client(): IggyClient +{ + $host = server_host(); + $port = server_port(); + $username = rawurlencode(env_or_default('IGGY_USERNAME', 'iggy')); + $password = rawurlencode(env_or_default('IGGY_PASSWORD', 'iggy')); + + $client = IggyClient::fromConnectionString("iggy+tcp://{$username}:{$password}@{$host}:{$port}"); + $client->connect(); + + return $client; +} + +function create_stream_and_topic(object $client, string $stream, string $topic, int $partitions = 1): void +{ + $client->createStream($stream); + $client->createTopic($stream, $topic, $partitions, null, null, null, null); +} + +function collect_payloads(array $messages): array +{ + return array_map(static fn (ReceiveMessage $message): string => $message->payload(), $messages); +} + +function micros(int $seconds): int +{ + return $seconds * 1_000_000; +} + +if (!extension_loaded('iggy-php')) { + Assert::fail('The iggy-php extension is not loaded.'); +} + +wait_for_server(server_host(), server_port());