diff --git a/.github/workflows/publish_lambda.yaml b/.github/workflows/publish_lambda.yaml new file mode 100644 index 00000000000..2275e9241fb --- /dev/null +++ b/.github/workflows/publish_lambda.yaml @@ -0,0 +1,75 @@ +# This workflow creates a new release for a quickwit search aws lambda. +# The artifact is a zip file containing a binary for ARM 64, +# ready to be deployed by the deployer. +# +# See quickwit-lambda/README.md +name: Release Lambda binary + +on: + push: + branches: + - lambda # This is temporary + workflow_dispatch: + inputs: + version: + description: 'Version tag (e.g., v0.8.0)' + required: false + default: 'dev' + +permissions: + contents: read + +jobs: + build-lambda: + name: Build Lambda ARM64 + runs-on: ubuntu-latest + permissions: + contents: write + actions: write + steps: + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + + - name: Set version + run: | + if [ "${{ github.event.inputs.version }}" = "dev" ]; then + echo "ASSET_VERSION=dev-$(git rev-parse --short HEAD)" >> $GITHUB_ENV + else + echo "ASSET_VERSION=${{ github.event.inputs.version }}" >> $GITHUB_ENV + fi + + - name: Install rustup + run: curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain none -y + + - name: Install cross + run: cargo install cross + + - name: Retrieve and export commit date, hash, and tags + run: | + echo "QW_COMMIT_DATE=$(TZ=UTC0 git log -1 --format=%cd --date=format-local:%Y-%m-%dT%H:%M:%SZ)" >> $GITHUB_ENV + echo "QW_COMMIT_HASH=$(git rev-parse HEAD)" >> $GITHUB_ENV + echo "QW_COMMIT_TAGS=$(git tag --points-at HEAD | tr '\n' ',')" >> $GITHUB_ENV + + - name: Build Lambda binary + run: cross build --release --features lambda-release --target aarch64-unknown-linux-gnu -p quickwit-lambda-server --bin quickwit-aws-lambda-leaf-search + env: + QW_COMMIT_DATE: ${{ env.QW_COMMIT_DATE }} + QW_COMMIT_HASH: ${{ env.QW_COMMIT_HASH }} + QW_COMMIT_TAGS: ${{ env.QW_COMMIT_TAGS }} + working-directory: ./quickwit + + - name: Create Lambda zip + run: | + cd quickwit/target/aarch64-unknown-linux-gnu/release + cp quickwit-aws-lambda-leaf-search bootstrap + zip quickwit-aws-lambda-${{ env.ASSET_VERSION }}-aarch64.zip bootstrap + mv quickwit-aws-lambda-${{ env.ASSET_VERSION }}-aarch64.zip ../../../../ + + - name: Upload to GitHub release + uses: quickwit-inc/upload-to-github-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + file: quickwit-aws-lambda-${{ env.ASSET_VERSION }}-aarch64.zip + overwrite: true + draft: true + tag_name: ${{ env.ASSET_VERSION }} diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 8f199a01910..6ace87f3394 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -21,6 +21,8 @@ arrayvec,https://github.com/bluss/arrayvec,MIT OR Apache-2.0,bluss assert-json-diff,https://github.com/davidpdrsn/assert-json-diff,MIT,David Pedersen async-compression,https://github.com/Nullus157/async-compression,MIT OR Apache-2.0,"Wim Looman , Allen Bui " async-speed-limit,https://github.com/tikv/async-speed-limit,MIT OR Apache-2.0,The TiKV Project Developers +async-stream,https://github.com/tokio-rs/async-stream,MIT,Carl Lerche +async-stream-impl,https://github.com/tokio-rs/async-stream,MIT,Carl Lerche async-trait,https://github.com/dtolnay/async-trait,MIT OR Apache-2.0,David Tolnay atomic-waker,https://github.com/smol-rs/atomic-waker,Apache-2.0 OR MIT,"Stjepan Glavina , Contributors to futures-rs" aws-config,https://github.com/smithy-lang/smithy-rs,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " @@ -28,6 +30,7 @@ aws-credential-types,https://github.com/smithy-lang/smithy-rs,Apache-2.0,AWS Rus aws-lc-rs,https://github.com/aws/aws-lc-rs,ISC AND (Apache-2.0 OR ISC),AWS-LibCrypto aws-lc-sys,https://github.com/aws/aws-lc-rs,ISC AND (Apache-2.0 OR ISC) AND OpenSSL,AWS-LC aws-runtime,https://github.com/smithy-lang/smithy-rs,Apache-2.0,AWS Rust SDK Team +aws-sdk-lambda,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-s3,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-sso,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-ssooidc,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " @@ -235,6 +238,8 @@ jiff-static,https://github.com/BurntSushi/jiff,Unlicense OR MIT,Andrew Gallant < jobserver,https://github.com/rust-lang/jobserver-rs,MIT OR Apache-2.0,Alex Crichton js-sys,https://github.com/wasm-bindgen/wasm-bindgen/tree/master/crates/js-sys,MIT OR Apache-2.0,The wasm-bindgen Developers json_comments,https://github.com/tmccombs/json-comments-rs,Apache-2.0,Thayne McCombs +lambda_runtime,https://github.com/awslabs/aws-lambda-rust-runtime,Apache-2.0,"David Calavera , Harold Sun " +lambda_runtime_api_client,https://github.com/awslabs/aws-lambda-rust-runtime,Apache-2.0,"David Calavera , Harold Sun " lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2.0,Marvin Löbel levenshtein_automata,https://github.com/tantivy-search/levenshtein-automata,MIT,Paul Masurel libc,https://github.com/rust-lang/libc,MIT OR Apache-2.0,The Rust Project Developers @@ -424,6 +429,7 @@ serde_core,https://github.com/serde-rs/serde,MIT OR Apache-2.0,"Erick Tryzelaar serde_derive,https://github.com/serde-rs/serde,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " serde_json,https://github.com/serde-rs/json,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " serde_json_borrow,https://github.com/PSeitz/serde_json_borrow,MIT,Pascal Seitz +serde_path_to_error,https://github.com/dtolnay/path-to-error,MIT OR Apache-2.0,David Tolnay serde_qs,https://github.com/samscott89/serde_qs,MIT OR Apache-2.0,Sam Scott serde_spanned,https://github.com/toml-rs/toml,MIT OR Apache-2.0,The serde_spanned Authors serde_urlencoded,https://github.com/nox/serde_urlencoded,MIT OR Apache-2.0,Anthony Ramine @@ -523,9 +529,11 @@ unicode-width,https://github.com/unicode-rs/unicode-width,MIT OR Apache-2.0,"kwa unit-prefix,https://codeberg.org/commons-rs/unit-prefix,MIT,"Fabio Valentini , Benjamin Sago " unsafe-libyaml,https://github.com/dtolnay/unsafe-libyaml,MIT,David Tolnay untrusted,https://github.com/briansmith/untrusted,ISC,Brian Smith +ureq-proto,https://github.com/algesten/ureq-proto,MIT OR Apache-2.0,Martin Algesten url,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers urlencoding,https://github.com/kornelski/rust_urlencoding,MIT,"Kornel , Bertram Truong " username,https://pijul.org/darcs/user,MIT OR Apache-2.0,Pierre-Étienne Meunier +utf-8,https://github.com/SimonSapin/rust-utf8,MIT OR Apache-2.0,Simon Sapin utf8-ranges,https://github.com/BurntSushi/utf8-ranges,Unlicense OR MIT,Andrew Gallant utf8_iter,https://github.com/hsivonen/utf8_iter,Apache-2.0 OR MIT,Henri Sivonen utf8parse,https://github.com/alacritty/vte,Apache-2.0 OR MIT,"Joe Wilm , Christian Duerr " diff --git a/quickwit/CLAUDE.md b/quickwit/CLAUDE.md new file mode 100644 index 00000000000..69770162a01 --- /dev/null +++ b/quickwit/CLAUDE.md @@ -0,0 +1,16 @@ +# Quickwit Claude Guidelines + +## Code Formatting + +Run `make fmt` to check and fix code formatting. This command performs three checks: + +1. **Rust formatting**: Ensures Rust code is properly formatted (via `cargo fmt`) +2. **License headers**: Checks that files are prepended with the correct LICENSE header +3. **Log format policy**: Checks that log statements follow our format rules: + - No trailing punctuation in log messages + - No uppercase for the first character of log messages + - See `scripts/check_log_format.sh` for details + +### Quick Fix + +Use `/fmt` to automatically run format checks and see issues. diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 1e1c0482df0..b0218648453 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -532,6 +532,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-lambda" +version = "1.112.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cad866b2e51c3af758e5c8bb941a8c904262663fde53019aa1c5093172c54d3" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http 0.62.6", + "aws-smithy-json 0.61.9", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-s3" version = "1.62.0" @@ -685,9 +708,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.7" +version = "1.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ee19095c7c4dda59f1697d028ce704c24b2d33c6718790c7f1d5a3015b4107c" +checksum = "52eec3db979d18cb807fc1070961cc51d87d069abe9ab57917769687368a8c6c" dependencies = [ "futures-util", "pin-project-lite", @@ -771,9 +794,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.1.5" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59e62db736db19c488966c8d787f52e6270be565727236fd5579eaa301e7bc4a" +checksum = "12fb0abf49ff0cab20fd31ac1215ed7ce0ea92286ba09e2854b42ba5cabe7525" dependencies = [ "aws-smithy-async", "aws-smithy-protocol-test", @@ -823,6 +846,18 @@ dependencies = [ "aws-smithy-types", ] +[[package]] +name = "aws-smithy-mocks" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "662b7376269470b43978208ff2f2a233512219eb4f60ea08555649bca1bd0855" +dependencies = [ + "aws-smithy-http-client", + "aws-smithy-runtime-api", + "aws-smithy-types", + "http 1.4.0", +] + [[package]] name = "aws-smithy-observability" version = "0.1.5" @@ -834,9 +869,9 @@ dependencies = [ [[package]] name = "aws-smithy-protocol-test" -version = "0.63.7" +version = "0.63.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01317a9e3c5c06f1af35001ef0c873c1e34e458c20b2ee1eee0fb431e6dbb010" +checksum = "7090f3a3657e7c2c0331604d62baa20a9a9f765c3f4bf63ccf48ccba6b8b7240" dependencies = [ "assert-json-diff", "aws-smithy-runtime-api", @@ -888,9 +923,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.9.3" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab0d43d899f9e508300e587bf582ba54c27a452dd0a9ea294690669138ae14a2" +checksum = "49952c52f7eebb72ce2a754d3866cc0f87b97d2a46146b79f80f3a93fb2b3716" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -905,9 +940,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.3.5" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "905cb13a9895626d49cf2ced759b062d913834c7482c38e49557eac4e6193f01" +checksum = "3b3a26048eeab0ddeba4b4f9d51654c79af8c3b32357dc5f336cee85ab331c33" dependencies = [ "base64-simd", "bytes", @@ -4387,6 +4422,55 @@ dependencies = [ "rustversion", ] +[[package]] +name = "lambda_runtime" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed49669d6430292aead991e19bf13153135a884f916e68f32997c951af637ebe" +dependencies = [ + "async-stream", + "base64 0.22.1", + "bytes", + "futures", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "http-serde", + "hyper 1.8.1", + "hyper-util", + "lambda_runtime_api_client", + "pin-project", + "serde", + "serde_json", + "serde_path_to_error", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tracing", +] + +[[package]] +name = "lambda_runtime_api_client" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c90a10f094475a34a04da2be11686c4dcfe214d93413162db9ffdff3d3af293a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "tokio", + "tower 0.4.13", + "tower-service", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -4480,7 +4564,7 @@ dependencies = [ "lindera-core", "once_cell", "tar", - "ureq", + "ureq 2.12.1", ] [[package]] @@ -4565,7 +4649,7 @@ dependencies = [ "lindera-ipadic-builder", "once_cell", "tar", - "ureq", + "ureq 2.12.1", ] [[package]] @@ -4624,7 +4708,7 @@ dependencies = [ "lindera-ko-dic-builder", "once_cell", "tar", - "ureq", + "ureq 2.12.1", ] [[package]] @@ -7275,6 +7359,56 @@ dependencies = [ "utoipa", ] +[[package]] +name = "quickwit-lambda-client" +version = "0.8.0" +dependencies = [ + "anyhow", + "async-trait", + "aws-config", + "aws-sdk-lambda", + "aws-smithy-mocks", + "base64 0.22.1", + "bytesize", + "hex", + "once_cell", + "prost 0.14.1", + "quickwit-common", + "quickwit-config", + "quickwit-lambda-server", + "quickwit-proto", + "quickwit-search", + "quickwit-storage", + "serde_json", + "sha1", + "thiserror 2.0.17", + "tokio", + "tracing", + "ureq 3.1.4", +] + +[[package]] +name = "quickwit-lambda-server" +version = "0.8.0" +dependencies = [ + "anyhow", + "base64 0.22.1", + "bytesize", + "lambda_runtime", + "openssl", + "prost 0.14.1", + "quickwit-common", + "quickwit-config", + "quickwit-proto", + "quickwit-search", + "quickwit-storage", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "quickwit-macros" version = "0.8.0" @@ -7530,6 +7664,7 @@ dependencies = [ "quickwit-ingest", "quickwit-jaeger", "quickwit-janitor", + "quickwit-lambda-client", "quickwit-metastore", "quickwit-opentelemetry", "quickwit-proto", @@ -10586,6 +10721,35 @@ dependencies = [ "webpki-roots 0.26.11", ] +[[package]] +name = "ureq" +version = "3.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d39cb1dbab692d82a977c0392ffac19e188bd9186a9f32806f0aaa859d75585a" +dependencies = [ + "base64 0.22.1", + "flate2", + "log", + "percent-encoding", + "rustls 0.23.36", + "rustls-pki-types", + "ureq-proto", + "utf-8", + "webpki-roots 1.0.5", +] + +[[package]] +name = "ureq-proto" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d81f9efa9df032be5934a46a068815a10a042b494b6a58cb0a1a97bb5467ed6f" +dependencies = [ + "base64 0.22.1", + "http 1.4.0", + "httparse", + "log", +] + [[package]] name = "url" version = "2.5.8" @@ -10615,6 +10779,12 @@ dependencies = [ "winapi 0.2.8", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8-ranges" version = "1.0.5" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 2eaaef4c023..a46d718ec4b 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -19,6 +19,8 @@ members = [ "quickwit-integration-tests", "quickwit-jaeger", "quickwit-janitor", + "quickwit-lambda-client", + "quickwit-lambda-server", "quickwit-macros", "quickwit-metastore", @@ -56,6 +58,8 @@ default-members = [ "quickwit-integration-tests", "quickwit-jaeger", "quickwit-janitor", + "quickwit-lambda-client", + "quickwit-lambda-server", "quickwit-macros", "quickwit-metastore", "quickwit-opentelemetry", @@ -141,6 +145,7 @@ hyper-util = { version = "0.1", default-features = false, features = [ indexmap = { version = "2.12", features = ["serde"] } indicatif = "0.18" itertools = "0.14" +lambda_runtime = "0.13" json_comments = "0.2" libz-sys = "1.1" # Lindera tokenizer 0.30+ versions (tested up to 0.32.3) are currently broken due to upstream build failures. @@ -293,6 +298,7 @@ tracing-subscriber = { version = "0.3", features = [ ttl_cache = "0.5" typetag = "0.2" ulid = "1.2" +ureq = "3" username = "0.2" # We cannot upgrade to utoipa 5.0+ due to significant breaking changes: # 1. The `OpenApi` struct structure changed (fields are private), breaking our manual merging logic in openapi.rs @@ -308,6 +314,7 @@ vrl = { version = "0.29", default-features = false, features = [ warp = { version = "0.4", features = ["server", "test"] } whichlang = "0.1" wiremock = "0.6" +zip = { version = "0.6", default-features = false, features = ["deflate"] } zstd = { version = "0.13", default-features = false } aws-config = "1.8" @@ -315,8 +322,10 @@ aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] } aws-runtime = "1.5" aws-sdk-kinesis = "1.97" aws-sdk-s3 = "=1.62" +aws-sdk-lambda = "1" aws-sdk-sqs = "1.91" aws-smithy-async = "1.2" +aws-smithy-mocks = "0.2" aws-smithy-http-client = { version = "1.1", features = ["default-client"] } aws-smithy-runtime = "1.9" aws-smithy-types = { version = "1.3", features = [ @@ -355,6 +364,8 @@ quickwit-ingest = { path = "quickwit-ingest" } quickwit-integration-tests = { path = "quickwit-integration-tests" } quickwit-jaeger = { path = "quickwit-jaeger" } quickwit-janitor = { path = "quickwit-janitor" } +quickwit-lambda-client = { path = "quickwit-lambda-client" } +quickwit-lambda-server = { path = "quickwit-lambda-server" } quickwit-macros = { path = "quickwit-macros" } quickwit-metastore = { path = "quickwit-metastore" } quickwit-opentelemetry = { path = "quickwit-opentelemetry" } diff --git a/quickwit/quickwit-config/Cargo.toml b/quickwit/quickwit-config/Cargo.toml index 7cf75818444..a1877661490 100644 --- a/quickwit/quickwit-config/Cargo.toml +++ b/quickwit/quickwit-config/Cargo.toml @@ -43,6 +43,7 @@ quickwit-proto = { workspace = true } tokio = { workspace = true } quickwit-proto = { workspace = true, features = ["testsuite"] } +quickwit-common = { workspace = true, features = ["testsuite"] } [features] testsuite = [] diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json index 01def63b10e..b0d04650d32 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json @@ -70,6 +70,15 @@ "min_throughtput_bytes_per_secs": 100000, "timeout_millis": 2000, "max_num_retries": 2 + }, + "lambda": { + "function_name": "quickwit-lambda-leaf-search", + "max_splits_per_invocation": 10, + "auto_deploy": { + "execution_role_arn": "arn:aws:iam::123456789012:role/quickwit-lambda-role", + "memory_size": "5 GiB", + "invocation_timeout_secs": 15 + } } }, "jaeger": { diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml index 0b4e0c30229..16c4b056991 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml @@ -62,6 +62,15 @@ min_throughtput_bytes_per_secs = 100000 timeout_millis = 2000 max_num_retries = 2 +[searcher.lambda] +function_name = "quickwit-lambda-leaf-search" +max_splits_per_invocation = 10 + +[searcher.lambda.auto_deploy] +execution_role_arn = "arn:aws:iam::123456789012:role/quickwit-lambda-role" +memory_size = "5 GiB" +invocation_timeout_secs = 15 + [jaeger] enable_endpoint = true lookback_period_hours = 24 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml index cb16052fbd1..f000bd76c72 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml @@ -64,6 +64,13 @@ searcher: min_throughtput_bytes_per_secs: 100000 timeout_millis: 2000 max_num_retries: 2 + lambda: + function_name: quickwit-lambda-leaf-search + max_splits_per_invocation: 10 + auto_deploy: + execution_role_arn: arn:aws:iam::123456789012:role/quickwit-lambda-role + memory_size: 5 GiB + invocation_timeout_secs: 15 jaeger: enable_endpoint: true diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index df108b844c0..22cdb2538b4 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -74,8 +74,8 @@ pub use crate::metastore_config::{ }; pub use crate::node_config::{ CacheConfig, CachePolicy, DEFAULT_QW_CONFIG_PATH, GrpcConfig, IndexerConfig, IngestApiConfig, - JaegerConfig, KeepAliveConfig, NodeConfig, RestConfig, SearcherConfig, SplitCacheLimits, - StorageTimeoutPolicy, TlsConfig, + JaegerConfig, KeepAliveConfig, LambdaConfig, LambdaDeployConfig, NodeConfig, RestConfig, + SearcherConfig, SplitCacheLimits, StorageTimeoutPolicy, TlsConfig, }; use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index e8c347eb4a5..ea0689299ad 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -300,6 +300,83 @@ pub struct SearcherConfig { pub storage_timeout_policy: Option, pub warmup_memory_budget: ByteSize, pub warmup_single_split_initial_allocation: ByteSize, + /// Lambda configuration for serverless leaf search execution. + /// If set, enables Lambda execution for leaf search. + /// + /// If set, and Quickwit cannot access the Lambda (after a deploy attempt if + /// auto deploy is set up), Quickwit will log an error and + /// fail on startup. + #[serde(default)] + pub lambda: Option, +} + +/// Configuration for AWS Lambda leaf search execution. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct LambdaConfig { + /// AWS Lambda function name or ARN. + #[serde(default = "LambdaConfig::default_function_name")] + pub function_name: String, + /// Maximum number of splits per Lambda invocation. + #[serde(default = "LambdaConfig::default_max_splits_per_invocation")] + pub max_splits_per_invocation: usize, + /// When the number of pending split searches exceeds this threshold, + /// new splits are offloaded to Lambda instead of being queued locally. + /// A value of 0 disables offloading (all splits are processed locally). + #[serde(default = "LambdaConfig::default_offload_threshold")] + pub offload_threshold: usize, + /// Auto-deploy configuration. If set, Quickwit will automatically deploy + /// the Lambda function at startup. + /// If deploying a lambda fails, Quickwit will log an error and fail. + #[serde(default)] + pub auto_deploy: Option, +} + +/// Configuration for automatic Lambda function deployment. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct LambdaDeployConfig { + /// IAM execution role ARN for the Lambda function. + pub execution_role_arn: String, + /// Memory size for the Lambda function. It will be rounded up to the nearest multiple of 1MiB. + #[serde(default = "LambdaDeployConfig::default_memory_size")] + pub memory_size: ByteSize, + /// Timeout for Lambda invocations in seconds. + #[serde(default = "LambdaDeployConfig::default_invocation_timeout_secs")] + pub invocation_timeout_secs: u64, +} + +impl LambdaDeployConfig { + fn default_memory_size() -> ByteSize { + // Empirically this implies between 4 and 6 vCPUs. + ByteSize::gib(5) + } + fn default_invocation_timeout_secs() -> u64 { + 15 + } +} + +impl Default for LambdaConfig { + fn default() -> Self { + Self { + function_name: Self::default_function_name(), + max_splits_per_invocation: Self::default_max_splits_per_invocation(), + offload_threshold: Self::default_offload_threshold(), + auto_deploy: None, + } + } +} + +impl LambdaConfig { + fn default_function_name() -> String { + "quickwit-lambda-search".to_string() + } + fn default_max_splits_per_invocation() -> usize { + 10 + } + fn default_offload_threshold() -> usize { + 20 + } } #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -435,6 +512,7 @@ impl Default for SearcherConfig { storage_timeout_policy: None, warmup_memory_budget: ByteSize::gb(100), warmup_single_split_initial_allocation: ByteSize::gb(1), + lambda: None, } } } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 7d3b5f2c64b..f072e7a9760 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -542,8 +542,8 @@ mod tests { use itertools::Itertools; use super::*; - use crate::CacheConfig; use crate::storage_config::StorageBackendFlavor; + use crate::{CacheConfig, LambdaConfig, LambdaDeployConfig}; fn get_config_filepath(config_filename: &str) -> String { format!( @@ -687,6 +687,16 @@ mod tests { }), warmup_memory_budget: ByteSize::gb(100), warmup_single_split_initial_allocation: ByteSize::gb(1), + lambda: Some(LambdaConfig { + function_name: "quickwit-lambda-leaf-search".to_string(), + max_splits_per_invocation: 10, + auto_deploy: Some(LambdaDeployConfig { + execution_role_arn: "arn:aws:iam::123456789012:role/quickwit-lambda-role" + .to_string(), + memory_size: ByteSize::gib(5), + invocation_timeout_secs: 15, + }), + }), } ); assert_eq!( diff --git a/quickwit/quickwit-lambda-client/Cargo.toml b/quickwit/quickwit-lambda-client/Cargo.toml new file mode 100644 index 00000000000..375bff2bd3a --- /dev/null +++ b/quickwit/quickwit-lambda-client/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "quickwit-lambda-client" +description = "AWS Lambda client for Quickwit leaf search invocation and deployment" + +version.workspace = true +edition.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true +authors.workspace = true +license.workspace = true + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +thiserror = { workspace = true } +aws-config = { workspace = true } +aws-sdk-lambda = { workspace = true } +base64 = { workspace = true } +prost = { workspace = true } +serde_json = { workspace = true } +once_cell = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +quickwit-common = { workspace = true } +quickwit-config = { workspace = true } +quickwit-lambda-server = { workspace = true } +quickwit-proto = { workspace = true } +quickwit-search = { workspace = true } + +[dev-dependencies] +aws-smithy-mocks = { workspace = true } +aws-sdk-lambda = { workspace = true, features = ["test-util"] } +bytesize = { workspace = true } +tokio = { workspace = true, features = ["test-util", "macros"] } + +# Required for complicated reasons. quickwit-storage checks that we +# do use preserve order with serde. aws forces that feature. We disable +# the check by switching on its testsuite feature. +quickwit-storage = { workspace = true, features = ["testsuite"] } + +[build-dependencies] +hex = { workspace = true } +sha1 = "0.10" +ureq = { workspace = true } diff --git a/quickwit/quickwit-lambda-client/build.rs b/quickwit/quickwit-lambda-client/build.rs new file mode 100644 index 00000000000..29f6d2407c3 --- /dev/null +++ b/quickwit/quickwit-lambda-client/build.rs @@ -0,0 +1,93 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Build script for quickwit-lambda-client. +//! +//! This script downloads the pre-built Lambda zip from a GitHub release +//! and places it in OUT_DIR for embedding via include_bytes! +//! +//! The Lambda binary is built separately in CI and published as a GitHub release. + +use std::env; +use std::fs::File; +use std::io::Write; +use std::path::PathBuf; + +use sha1::{Digest, Sha1}; + +/// URL to download the pre-built Lambda zip from GitHub releases. +/// This should be updated when a new Lambda binary is released. +const LAMBDA_ZIP_URL: &str = + "https://github.com/quickwit-oss/quickwit/releases/download/lambda-506751fb/quickwit-aws-lambda--aarch64.zip"; + +/// AWS Lambda direct upload limit is 50MB. +/// Larger artifacts must be uploaded via S3. +const MAX_LAMBDA_ZIP_SIZE: usize = 50 * 1024 * 1024; + +fn main() { + println!("cargo:rerun-if-changed=build.rs"); + println!("cargo:rerun-if-env-changed=QUICKWIT_LAMBDA_ZIP_URL"); + + let out_dir = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR not set")); + let zip_path = out_dir.join("lambda_bootstrap.zip"); + + // Allow overriding the URL via environment variable + let url = env::var("QUICKWIT_LAMBDA_ZIP_URL").unwrap_or_else(|_| LAMBDA_ZIP_URL.to_string()); + + println!("cargo:warning=Downloading Lambda zip from: {}", url); + + match download_lambda_zip(&url) { + Ok(data) => { + let mut file = File::create(&zip_path).expect("Failed to create zip file"); + file.write_all(&data).expect("Failed to write zip file"); + println!( + "cargo:warning=Downloaded Lambda zip to {:?} ({} bytes)", + zip_path, + data.len() + ); + + // Compute SHA1 hash of the zip and export as environment variable. + // This is used to create a unique qualifier for Lambda versioning. + let mut hasher = Sha1::new(); + hasher.update(&data); + let sha1_hash = hasher.finalize(); + let sha1_short = hex::encode(&sha1_hash[..4]); // First 8 hex chars + println!("cargo:rustc-env=LAMBDA_BINARY_SHA1={}", sha1_short); + println!("cargo:warning=Lambda binary SHA1 (short): {}", sha1_short); + } + Err(e) => { + panic!("Failed to download Lambda zip: {}", e); + } + } +} + +fn download_lambda_zip(url: &str) -> Result, Box> { + let response = ureq::get(url).call(); + // Set limit higher than MAX_LAMBDA_ZIP_SIZE so we can provide a better error message + let data = response? + .into_body() + .with_config() + .limit(MAX_LAMBDA_ZIP_SIZE as u64 + 1) // We download one more byte to trigger the panic below. + .read_to_vec()?; + if data.len() > MAX_LAMBDA_ZIP_SIZE { + panic!( + "Lambda zip is too large ({} bytes, max {} bytes).\nAWS Lambda does not support \ + direct upload of binaries larger than 50MB.\nWorkaround: upload the Lambda zip to S3 \ + and deploy from there instead.", + data.len(), + MAX_LAMBDA_ZIP_SIZE + ); + } + Ok(data) +} diff --git a/quickwit/quickwit-lambda-client/src/deploy.rs b/quickwit/quickwit-lambda-client/src/deploy.rs new file mode 100644 index 00000000000..24319eb9ab7 --- /dev/null +++ b/quickwit/quickwit-lambda-client/src/deploy.rs @@ -0,0 +1,974 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Lambda function deployment for auto-deploy feature. +//! +//! This module provides functionality to automatically deploy or update +//! the Lambda function used for leaf search operations. +//! +//! # Versioning Strategy +//! +//! We use AWS Lambda published versions with description-based identification: +//! - Each published version has a description like `quickwit:0_8_0-fa752891` +//! - We list versions to find one matching our qualifier +//! - We invoke the specific version number (not $LATEST) +//! - Old versions are garbage collected (keep current + top 5 most recent) + +use std::collections::HashMap; +use std::sync::{Arc, OnceLock}; + +use anyhow::{Context, anyhow}; +use aws_sdk_lambda::Client as LambdaClient; +use aws_sdk_lambda::error::SdkError; +use aws_sdk_lambda::primitives::Blob; +use aws_sdk_lambda::types::{ + Architecture, Environment, FunctionCode, LastUpdateStatus, Runtime, State, +}; +use quickwit_config::{LambdaConfig, LambdaDeployConfig}; +use quickwit_search::LambdaLeafSearchInvoker; +use tracing::{debug, info, warn}; + +use crate::invoker::create_lambda_invoker_for_version; +use crate::metrics::LAMBDA_METRICS; + +/// Embedded Lambda binary (arm64, compressed). +/// This is included at compile time. +const LAMBDA_BINARY: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/lambda_bootstrap.zip")); + +/// Prefix for version descriptions to identify Quickwit-managed versions. +const VERSION_DESCRIPTION_PREFIX: &str = "quickwit:"; + +/// Number of recent versions to keep during garbage collection (in addition to current). +const GC_KEEP_RECENT_VERSIONS: usize = 5; + +/// Returns the Lambda qualifier combining version and binary hash. +/// Format: "{quickwit_version}-{sha1_short}" with dots replaced by underscores. +/// Example: "0_8_0-fa752891" +fn lambda_qualifier() -> &'static str { + static LAMBDA_QUALIFIER: OnceLock = OnceLock::new(); + LAMBDA_QUALIFIER + .get_or_init(|| { + format!( + "{}-{}", + env!("CARGO_PKG_VERSION").replace('.', "_"), + env!("LAMBDA_BINARY_SHA1") + ) + }) + .as_str() +} + +/// Returns the version description for our qualifier. +fn version_description() -> String { + format!("{}{}", VERSION_DESCRIPTION_PREFIX, lambda_qualifier()) +} + +/// Get or deploy the Lambda function and return an invoker. +/// +/// This function: +/// 1. Lists existing Lambda versions to find one matching our description +/// 2. If not found, (and if a deploy config is provided) attempt to deploy the embedded Lambda +/// binary +/// 3. Garbage collects old versions (keeps current + 5 most recent) +/// 4. Returns an invoker configured to call the specific version +/// +/// The qualifier is computed from the Quickwit version and Lambda binary SHA1, +/// ensuring the deployed Lambda matches the embedded binary. +pub async fn try_get_or_deploy_invoker( + lambda_config: &LambdaConfig, +) -> anyhow::Result> { + let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let client = LambdaClient::new(&aws_config); + let function_name = &lambda_config.function_name; + let target_description = version_description(); + + info!( + function_name = %function_name, + qualifier = %lambda_qualifier(), + "Looking for Lambda function version" + ); + + let version = find_or_deploy_version( + &client, + function_name, + &target_description, + lambda_config.auto_deploy.as_ref(), + ) + .await?; + + // Step 3: Spawn background garbage collection (best effort, non-blocking) + let gc_client = client.clone(); + let gc_function_name = function_name.clone(); + let gc_version = version.clone(); + tokio::spawn(async move { + if let Err(e) = + garbage_collect_old_versions(&gc_client, &gc_function_name, &gc_version).await + { + warn!(error = %e, "Failed to garbage collect old Lambda versions"); + } + }); + + // Step 4: Create and return the invoker + let invoker = create_lambda_invoker_for_version(function_name, &version) + .await + .context("Failed to create Lambda invoker")?; + + info!("created the lambda invoker"); + + Ok(invoker) +} + +/// Find a Lambda version with a description matching our qualifier. +/// +/// If none is found and a deploy config is provided, attempt to deploy a new version. +/// +/// Returns the version number as a string (because it is a string on AWS side, e.g.: "7") if found. +async fn find_or_deploy_version( + client: &LambdaClient, + function_name: &str, + target_description: &str, + deploy_config: Option<&LambdaDeployConfig>, +) -> anyhow::Result { + if let Some(version) = find_matching_version(client, function_name, target_description).await? { + info!( + function_name = %function_name, + version = %version, + "Found existing Lambda version" + ); + return Ok(version); + } + + let deploy_config = deploy_config.with_context(|| { + format!( + "No Lambda version found with description '{}' and auto_deploy is not configured. \ + Either deploy the Lambda function manually or enable auto_deploy.", + target_description + ) + })?; + + info!( + function_name = %function_name, + "No matching version found, deploying Lambda function" + ); + + deploy_lambda_function(client, function_name, deploy_config).await +} + +async fn find_matching_version( + client: &LambdaClient, + function_name: &str, + target_description: &str, +) -> anyhow::Result> { + let mut marker: Option = None; + + loop { + let mut request = client + .list_versions_by_function() + .function_name(function_name); + + if let Some(m) = marker { + request = request.marker(m); + } + + let response = match request.send().await { + Ok(resp) => resp, + Err(SdkError::ServiceError(err)) if err.err().is_resource_not_found_exception() => { + info!( + function_name = %function_name, + "Lambda function does not exist yet" + ); + return Ok(None); + } + Err(e) => { + return Err(anyhow!( + "Failed to list Lambda versions for '{}': {}", + function_name, + e + )); + } + }; + + for version in response.versions() { + if let Some(description) = version.description() { + if description == target_description { + if let Some(ver) = version.version() { + if ver != "$LATEST" { + return Ok(Some(ver.to_string())); + } + } + } + } + } + + marker = response.next_marker().map(|s| s.to_string()); + if marker.is_none() { + break; + } + } + + Ok(None) +} + +/// Deploy the Lambda function and publish a new version. +/// AWS's API is pretty terrible. +/// +/// Lambda's version are integer generated by AWS (we don't have control over them). +/// To publish a new version, we need to implement two paths: +/// - If the function doesn't exist yet, `create_function(publish=true)` atomically creates it and +/// publishes a version in one call. +/// - If the function already exists, we first update the code. We do not +/// publish because strangely the API call does not make it possible to change the +/// description. Updating the code has the effect of create a version $LATEST. +/// - We publish the version $LATEST. That's the moment AWS attributes a version number. +/// That call allows us to change the description. We pass the sha256 hash of the code +/// to ensure that $LATEST has not been overwritten by another concurrent update. +async fn deploy_lambda_function( + client: &LambdaClient, + function_name: &str, + deploy_config: &LambdaDeployConfig, +) -> anyhow::Result { + let result = deploy_lambda_function_inner(client, function_name, deploy_config).await; + + let status = if result.is_ok() { "success" } else { "error" }; + LAMBDA_METRICS + .deploy_total + .with_label_values([status]) + .inc(); + + result +} + +async fn deploy_lambda_function_inner( + client: &LambdaClient, + function_name: &str, + deploy_config: &LambdaDeployConfig, +) -> anyhow::Result { + // Fast path: create + publish atomically if the function doesn't exist yet. + if let Some(version) = try_create_function(client, function_name, deploy_config).await? { + return Ok(version); + } + + // Function already exists — update $LATEST, then publish with code_sha256 guard. + let code_sha256 = update_function_code(client, function_name).await?; + publish_version(client, function_name, &code_sha256).await +} + +/// Try to create the Lambda function with `publish=true`. +/// +/// Returns `Some(version)` if the function was created and published. +/// Returns `None` if the function already exists (`ResourceConflictException`). +async fn try_create_function( + client: &LambdaClient, + function_name: &str, + deploy_config: &LambdaDeployConfig, +) -> anyhow::Result> { + let memory_size_mb = (deploy_config.memory_size.as_u64() / (1024 * 1024)) as i32; + let timeout_secs = deploy_config.invocation_timeout_secs as i32; + let description = version_description(); + + info!( + function_name = %function_name, + memory_size_mb = memory_size_mb, + timeout_secs = timeout_secs, + "Attempting to create Lambda function" + ); + + let function_code = FunctionCode::builder() + .zip_file(Blob::new(LAMBDA_BINARY)) + .build(); + + let create_result = client + .create_function() + .function_name(function_name) + .runtime(Runtime::Providedal2023) + .role(&deploy_config.execution_role_arn) + .handler("bootstrap") + .description(&description) + .code(function_code) + .architectures(Architecture::Arm64) + .memory_size(memory_size_mb) + .timeout(timeout_secs) + .environment(build_environment()) + .set_tags(Some(build_tags())) + .publish(true) + .send() + .await; + + match create_result { + Ok(output) => { + let version = output + .version() + .ok_or_else(|| anyhow!("Created function has no version number"))? + .to_string(); + info!( + function_name = %function_name, + version = %version, + "Lambda function created and published" + ); + Ok(Some(version)) + } + Err(SdkError::ServiceError(err)) if err.err().is_resource_conflict_exception() => { + debug!( + function_name = %function_name, + "Lambda function already exists" + ); + Ok(None) + } + Err(e) => Err(anyhow!( + "Failed to create Lambda function '{}': {}", + function_name, + e + )), + } +} + +/// Update `$LATEST` to our embedded binary. +/// +/// Returns the `code_sha256` of the uploaded code, to be used as a guard +/// when publishing the version (detects if another process overwrote `$LATEST` +/// between our update and publish). +async fn update_function_code( + client: &LambdaClient, + function_name: &str, +) -> anyhow::Result { + info!( + function_name = %function_name, + "Updating Lambda function code to current binary" + ); + + let response = client + .update_function_code() + .function_name(function_name) + .zip_file(Blob::new(LAMBDA_BINARY)) + .architectures(Architecture::Arm64) + .send() + .await + .context("Failed to update Lambda function code")?; + + let code_sha256 = response + .code_sha256() + .ok_or_else(|| anyhow!("update_function_code response missing code_sha256"))? + .to_string(); + + wait_for_function_ready(client, function_name).await?; + + Ok(code_sha256) +} + +/// Publish a new immutable version from `$LATEST` with our description. +/// +/// The `code_sha256` parameter guards against races: if another process +/// overwrote `$LATEST` since our `update_function_code` call, AWS will +/// reject the publish. +/// +/// Returns the version number (e.g., "8"). +async fn publish_version( + client: &LambdaClient, + function_name: &str, + code_sha256: &str, +) -> anyhow::Result { + let description = version_description(); + + info!( + function_name = %function_name, + description = %description, + "Publishing new Lambda version" + ); + + let publish_response = client + .publish_version() + .function_name(function_name) + .description(&description) + .code_sha256(code_sha256) + .send() + .await + .context( + "Failed to publish Lambda version (code_sha256 mismatch means a concurrent deploy \ + race)", + )?; + + let version = publish_response + .version() + .context("Published version has no version number")? + .to_string(); + + info!( + function_name = %function_name, + version = %version, + "Lambda version published successfully" + ); + + Ok(version) +} + +/// Wait for the Lambda function to be ready. +/// +/// "Ready" means `State == Active` and no update is in progress +/// (`LastUpdateStatus` is absent or `Successful`). +/// +/// This matters because: +/// - After `create_function`: `State` transitions `Pending → Active` +/// - After `update_function_code`: `State` stays `Active` but `LastUpdateStatus` transitions +/// `InProgress → Successful` +async fn wait_for_function_ready(client: &LambdaClient, function_name: &str) -> anyhow::Result<()> { + const MAX_WAIT_ATTEMPTS: u32 = 30; + const WAIT_INTERVAL: tokio::time::Duration = tokio::time::Duration::from_secs(1); + + let mut interval = tokio::time::interval(WAIT_INTERVAL); + + for attempt in 0..MAX_WAIT_ATTEMPTS { + interval.tick().await; + + let response = client + .get_function() + .function_name(function_name) + .send() + .await + .context("Failed to get function status")?; + + let Some(config) = response.configuration() else { + continue; + }; + + // Check for terminal failure states. + if config.state() == Some(&State::Failed) { + let reason = config.state_reason().unwrap_or("Unknown reason"); + anyhow::bail!( + "Lambda function '{}' is in Failed state: {}", + function_name, + reason + ); + } + + let last_update_status: &LastUpdateStatus = config + .last_update_status() + .unwrap_or(&LastUpdateStatus::Successful); + + if last_update_status == &LastUpdateStatus::Failed { + let reason = config + .last_update_status_reason() + .unwrap_or("Unknown reason"); + anyhow::bail!( + "Lambda function '{}' last update failed: {}", + function_name, + reason + ); + } + + // Ready = Active state with no update in progress. + let is_active = config.state() == Some(&State::Active); + if is_active && last_update_status == &LastUpdateStatus::Successful { + info!( + function_name = %function_name, + attempts = attempt + 1, + "Lambda function is ready" + ); + return Ok(()); + } + + info!( + function_name = %function_name, + state = ?config.state(), + last_update_status = ?config.last_update_status(), + attempt = attempt + 1, + "Waiting for Lambda function to be ready" + ); + } + + anyhow::bail!( + "Lambda function '{}' did not become ready within {} seconds", + function_name, + MAX_WAIT_ATTEMPTS as u64 * WAIT_INTERVAL.as_secs() + ) +} + +/// Garbage collect old Lambda versions, keeping the current + 5 most recent. +async fn garbage_collect_old_versions( + client: &LambdaClient, + function_name: &str, + current_version: &str, +) -> anyhow::Result<()> { + let mut quickwit_lambda_versions: Vec<(u64, String)> = Vec::new(); + let mut marker: Option = None; + + // Collect all Quickwit-managed versions + loop { + let mut request = client + .list_versions_by_function() + .function_name(function_name); + + if let Some(m) = marker { + request = request.marker(m); + } + + let response = request + .send() + .await + .context("Failed to list Lambda versions for garbage collection")?; + + for version in response.versions() { + let Some(version_str) = version.version() else { + continue; + }; + if version_str == "$LATEST" { + continue; + } + // Only consider Quickwit-managed versions + let Some(description) = version.description() else { + continue; + }; + if description.starts_with(VERSION_DESCRIPTION_PREFIX) { + if let Ok(version_num) = version_str.parse::() { + quickwit_lambda_versions.push((version_num, version_str.to_string())); + } + } + } + + marker = response.next_marker().map(|s| s.to_string()); + if marker.is_none() { + break; + } + } + + // Sort by version number descending (most recent first) + quickwit_lambda_versions.sort(); + + // We keep the last 5 versions. + quickwit_lambda_versions.truncate( + quickwit_lambda_versions + .len() + .saturating_sub(GC_KEEP_RECENT_VERSIONS), + ); + + if let Some(pos) = quickwit_lambda_versions + .iter() + .position(|(_version, version_str)| version_str == current_version) + { + quickwit_lambda_versions.swap_remove(pos); + } + + // Delete old versions + for (version, version_str) in quickwit_lambda_versions { + info!( + function_name = %function_name, + version = %version_str, + "Deleting old Lambda version" + ); + + if let Err(e) = client + .delete_function() + .function_name(function_name) + .qualifier(&version_str) + .send() + .await + { + warn!( + function_name = %function_name, + version = %version, + error = %e, + "Failed to delete old Lambda version" + ); + } + } + + Ok(()) +} + +/// Build environment variables for the Lambda function. +fn build_environment() -> Environment { + let mut env_vars = HashMap::new(); + env_vars.insert("RUST_LOG".to_string(), "info".to_string()); + env_vars.insert("RUST_BACKTRACE".to_string(), "1".to_string()); + Environment::builder().set_variables(Some(env_vars)).build() +} + +/// Build tags for the Lambda function. +fn build_tags() -> HashMap { + let mut tags = HashMap::new(); + tags.insert("managed_by".to_string(), "quickwit".to_string()); + tags +} + +#[cfg(test)] +mod tests { + use aws_sdk_lambda::operation::create_function::{CreateFunctionError, CreateFunctionOutput}; + use aws_sdk_lambda::operation::delete_function::DeleteFunctionOutput; + use aws_sdk_lambda::operation::get_function::GetFunctionOutput; + use aws_sdk_lambda::operation::list_versions_by_function::{ + ListVersionsByFunctionError, ListVersionsByFunctionOutput, + }; + use aws_sdk_lambda::operation::publish_version::PublishVersionOutput; + use aws_sdk_lambda::operation::update_function_code::UpdateFunctionCodeOutput; + use aws_sdk_lambda::types::FunctionConfiguration; + use aws_sdk_lambda::types::error::{ResourceConflictException, ResourceNotFoundException}; + use aws_smithy_mocks::{RuleMode, mock, mock_client}; + use bytesize::ByteSize; + + use super::*; + + fn make_version(version: &str, description: &str) -> FunctionConfiguration { + FunctionConfiguration::builder() + .version(version) + .description(description) + .build() + } + + fn test_deploy_config() -> LambdaDeployConfig { + LambdaDeployConfig { + execution_role_arn: "arn:aws:iam::123456789:role/test-role".to_string(), + memory_size: ByteSize::gib(5), + invocation_timeout_secs: 60, + } + } + + // --- find_matching_version tests --- + + #[tokio::test] + async fn test_find_matching_version_found() { + let target = "quickwit:test_version"; + let rule = mock!(aws_sdk_lambda::Client::list_versions_by_function).then_output(|| { + ListVersionsByFunctionOutput::builder() + .versions(make_version("$LATEST", "")) + .versions(make_version("1", "quickwit:old_version")) + .versions(make_version("7", "quickwit:test_version")) + .build() + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + + let matching_version_opt = find_matching_version(&client, "my-fn", target) + .await + .unwrap(); + assert_eq!(matching_version_opt, Some("7".to_string())); + } + + #[tokio::test] + async fn test_find_matching_version_not_found() { + let rule = mock!(aws_sdk_lambda::Client::list_versions_by_function).then_output(|| { + ListVersionsByFunctionOutput::builder() + .versions(make_version("$LATEST", "")) + .versions(make_version("1", "quickwit:other")) + .build() + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + + let result = find_matching_version(&client, "my-fn", "quickwit:no_match") + .await + .unwrap(); + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_find_matching_version_function_does_not_exist() { + let rule = mock!(aws_sdk_lambda::Client::list_versions_by_function).then_error(|| { + ListVersionsByFunctionError::ResourceNotFoundException( + ResourceNotFoundException::builder().build(), + ) + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + + let result = find_matching_version(&client, "no-such-fn", "quickwit:x") + .await + .unwrap(); + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_find_matching_version_skips_latest_even_if_description_matches() { + let rule = mock!(aws_sdk_lambda::Client::list_versions_by_function).then_output(|| { + ListVersionsByFunctionOutput::builder() + .versions(make_version("$LATEST", "quickwit:match")) + .build() + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + + let result = find_matching_version(&client, "my-fn", "quickwit:match") + .await + .unwrap(); + assert_eq!(result, None); + } + + // --- try_create_function tests --- + + #[tokio::test] + async fn test_try_create_function_success() { + let rule = mock!(aws_sdk_lambda::Client::create_function).then_output(|| { + CreateFunctionOutput::builder() + .version("1") + .function_name("my-fn") + .build() + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + let config = test_deploy_config(); + + let result = try_create_function(&client, "my-fn", &config) + .await + .unwrap(); + assert_eq!(result, Some("1".to_string())); + } + + #[tokio::test] + async fn test_try_create_function_already_exists() { + let rule = mock!(aws_sdk_lambda::Client::create_function).then_error(|| { + CreateFunctionError::ResourceConflictException( + ResourceConflictException::builder().build(), + ) + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + let config = test_deploy_config(); + + let result = try_create_function(&client, "my-fn", &config) + .await + .unwrap(); + assert_eq!(result, None); + } + + // --- deploy (update path) tests --- + + #[tokio::test] + async fn test_deploy_update_path() { + // create_function → conflict (function exists) + let create_rule = mock!(aws_sdk_lambda::Client::create_function).then_error(|| { + CreateFunctionError::ResourceConflictException( + ResourceConflictException::builder().build(), + ) + }); + // update_function_code → success with code_sha256 + let update_rule = mock!(aws_sdk_lambda::Client::update_function_code).then_output(|| { + UpdateFunctionCodeOutput::builder() + .code_sha256("abc123hash") + .build() + }); + // get_function → active and ready (for wait_for_function_ready) + let get_rule = mock!(aws_sdk_lambda::Client::get_function).then_output(|| { + GetFunctionOutput::builder() + .configuration( + FunctionConfiguration::builder() + .state(State::Active) + .last_update_status(LastUpdateStatus::Successful) + .build(), + ) + .build() + }); + // publish_version → success + let publish_rule = mock!(aws_sdk_lambda::Client::publish_version) + .then_output(|| PublishVersionOutput::builder().version("8").build()); + + let client = mock_client!( + aws_sdk_lambda, + RuleMode::MatchAny, + [&create_rule, &update_rule, &get_rule, &publish_rule] + ); + let config = test_deploy_config(); + + tokio::time::pause(); + let version = deploy_lambda_function_inner(&client, "my-fn", &config) + .await + .unwrap(); + assert_eq!(version, "8"); + } + + // --- wait_for_function_ready tests --- + + #[tokio::test] + async fn test_wait_for_function_ready_immediate() { + let rule = mock!(aws_sdk_lambda::Client::get_function).then_output(|| { + GetFunctionOutput::builder() + .configuration( + FunctionConfiguration::builder() + .state(State::Active) + .last_update_status(LastUpdateStatus::Successful) + .build(), + ) + .build() + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + + tokio::time::pause(); + wait_for_function_ready(&client, "my-fn").await.unwrap(); + } + + #[tokio::test] + async fn test_wait_for_function_ready_after_update_in_progress() { + let rule = mock!(aws_sdk_lambda::Client::get_function) + .sequence() + .output(|| { + GetFunctionOutput::builder() + .configuration( + FunctionConfiguration::builder() + .state(State::Active) + .last_update_status(LastUpdateStatus::InProgress) + .build(), + ) + .build() + }) + .output(|| { + GetFunctionOutput::builder() + .configuration( + FunctionConfiguration::builder() + .state(State::Active) + .last_update_status(LastUpdateStatus::Successful) + .build(), + ) + .build() + }) + .build(); + let client = mock_client!(aws_sdk_lambda, RuleMode::Sequential, [&rule]); + + tokio::time::pause(); + wait_for_function_ready(&client, "my-fn").await.unwrap(); + assert_eq!(rule.num_calls(), 2); + } + + #[tokio::test] + async fn test_wait_for_function_ready_fails_on_failed_state() { + let rule = mock!(aws_sdk_lambda::Client::get_function).then_output(|| { + GetFunctionOutput::builder() + .configuration( + FunctionConfiguration::builder() + .state(State::Failed) + .state_reason("Something broke") + .build(), + ) + .build() + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + + tokio::time::pause(); + let err = wait_for_function_ready(&client, "my-fn").await.unwrap_err(); + assert!( + err.to_string().contains("Failed state"), + "unexpected error: {}", + err + ); + } + + #[tokio::test] + async fn test_wait_for_function_ready_fails_on_last_update_failed() { + let rule = mock!(aws_sdk_lambda::Client::get_function).then_output(|| { + GetFunctionOutput::builder() + .configuration( + FunctionConfiguration::builder() + .state(State::Active) + .last_update_status(LastUpdateStatus::Failed) + .last_update_status_reason("Update broke") + .build(), + ) + .build() + }); + let client = mock_client!(aws_sdk_lambda, [&rule]); + + tokio::time::pause(); + let err = wait_for_function_ready(&client, "my-fn").await.unwrap_err(); + assert!( + err.to_string().contains("last update failed"), + "unexpected error: {}", + err + ); + } + + // --- garbage_collect_old_versions tests --- + + #[tokio::test] + async fn test_gc_deletes_old_versions_keeps_recent() { + // 8 quickwit versions (1..=8) + $LATEST + one non-quickwit version + let list_rule = + mock!(aws_sdk_lambda::Client::list_versions_by_function).then_output(|| { + let mut builder = ListVersionsByFunctionOutput::builder() + .versions(make_version("$LATEST", "")) + .versions(make_version("99", "not-quickwit")); + for i in 1..=8 { + builder = builder + .versions(make_version(&i.to_string(), &format!("quickwit:ver_{}", i))); + } + builder.build() + }); + + let delete_rule = mock!(aws_sdk_lambda::Client::delete_function) + .then_output(|| DeleteFunctionOutput::builder().build()); + + let client = mock_client!( + aws_sdk_lambda, + RuleMode::MatchAny, + [&list_rule, &delete_rule] + ); + + // Current version is "7", so keep 7 + the 5 most recent (4,5,6,7,8). + // Should delete versions 1, 2, 3. + garbage_collect_old_versions(&client, "my-fn", "7") + .await + .unwrap(); + + assert_eq!(delete_rule.num_calls(), 3); + } + + #[tokio::test] + async fn test_gc_nothing_to_delete() { + // Only 3 quickwit versions — below the GC_KEEP_RECENT_VERSIONS threshold. + let list_rule = + mock!(aws_sdk_lambda::Client::list_versions_by_function).then_output(|| { + ListVersionsByFunctionOutput::builder() + .versions(make_version("$LATEST", "")) + .versions(make_version("1", "quickwit:v1")) + .versions(make_version("2", "quickwit:v2")) + .versions(make_version("3", "quickwit:v3")) + .build() + }); + + let delete_rule = mock!(aws_sdk_lambda::Client::delete_function) + .then_output(|| DeleteFunctionOutput::builder().build()); + + let client = mock_client!( + aws_sdk_lambda, + RuleMode::MatchAny, + [&list_rule, &delete_rule] + ); + + garbage_collect_old_versions(&client, "my-fn", "3") + .await + .unwrap(); + + assert_eq!(delete_rule.num_calls(), 0); + } + + #[tokio::test] + async fn test_gc_does_not_delete_current_version() { + // 7 quickwit versions, current is "1" (the oldest). + // Without the current-version guard, version 1 would be deleted. + let list_rule = + mock!(aws_sdk_lambda::Client::list_versions_by_function).then_output(|| { + let mut builder = + ListVersionsByFunctionOutput::builder().versions(make_version("$LATEST", "")); + for i in 1..=7 { + builder = builder + .versions(make_version(&i.to_string(), &format!("quickwit:ver_{}", i))); + } + builder.build() + }); + + let delete_rule = mock!(aws_sdk_lambda::Client::delete_function) + .then_output(|| DeleteFunctionOutput::builder().build()); + + let client = mock_client!( + aws_sdk_lambda, + RuleMode::MatchAny, + [&list_rule, &delete_rule] + ); + + // Current version is "1". Without guard: would delete 1,2. With guard: only deletes 2. + garbage_collect_old_versions(&client, "my-fn", "1") + .await + .unwrap(); + + assert_eq!(delete_rule.num_calls(), 1); + } +} diff --git a/quickwit/quickwit-lambda-client/src/error.rs b/quickwit/quickwit-lambda-client/src/error.rs new file mode 100644 index 00000000000..6b45ce969fe --- /dev/null +++ b/quickwit/quickwit-lambda-client/src/error.rs @@ -0,0 +1,72 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_search::SearchError; +use thiserror::Error; + +/// Result type for Lambda deployment operations. +pub type LambdaDeployResult = Result; + +/// Errors that can occur during Lambda function deployment. +#[derive(Debug, Error)] +pub enum LambdaDeployError { + /// Resource conflict (e.g., function already exists during concurrent create). + #[error("resource conflict: Lambda function already exists")] + ResourceConflict, + + /// General deployment error. + #[error("failed to deploy Lambda function: {0}")] + Other(String), +} + +/// Result type for Lambda invoker operations. +pub type InvokerResult = Result; + +/// Errors that can occur during Lambda invoker setup or invocation. +#[derive(Debug, Error)] +pub enum InvokerError { + /// Configuration or validation error. + #[error("Lambda configuration error: {0}")] + Configuration(String), + + /// Error during Lambda invocation. + #[error("Lambda invocation failed: {0}")] + Invocation(String), + + /// Error serializing/deserializing data. + #[error("serialization error: {0}")] + Serialization(#[from] SerializationError), +} + +/// Errors that can occur during serialization/deserialization. +#[derive(Debug, Error)] +pub enum SerializationError { + #[error("protobuf decode error: {0}")] + ProtobufDecode(#[from] prost::DecodeError), + + #[error("protobuf encode error: {0}")] + ProtobufEncode(#[from] prost::EncodeError), + + #[error("base64 decode error: {0}")] + Base64Decode(#[from] base64::DecodeError), + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), +} + +impl From for SearchError { + fn from(err: InvokerError) -> Self { + SearchError::Internal(err.to_string()) + } +} diff --git a/quickwit/quickwit-lambda-client/src/invoker.rs b/quickwit/quickwit-lambda-client/src/invoker.rs new file mode 100644 index 00000000000..fd3c35b8fe0 --- /dev/null +++ b/quickwit/quickwit-lambda-client/src/invoker.rs @@ -0,0 +1,191 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use aws_sdk_lambda::Client as LambdaClient; +use aws_sdk_lambda::primitives::Blob; +use aws_sdk_lambda::types::InvocationType; +use base64::prelude::*; +use prost::Message; +use quickwit_lambda_server::{LeafSearchPayload, LeafSearchResponsePayload}; +use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse, LeafSearchResponses}; +use quickwit_search::{LambdaLeafSearchInvoker, SearchError}; +use tracing::{debug, info, instrument}; + +use crate::error::{InvokerError, InvokerResult}; +use crate::metrics::LAMBDA_METRICS; + +/// Create a Lambda invoker for a specific version. +/// +/// The version number is used as the qualifier when invoking, ensuring we call +/// the exact published version (not $LATEST). +pub(crate) async fn create_lambda_invoker_for_version( + function_name: &str, + version: &str, +) -> InvokerResult> { + let invoker = AwsLambdaInvoker::new(function_name, version).await?; + invoker.validate().await?; + Ok(Arc::new(invoker)) +} + +/// AWS Lambda implementation of RemoteFunctionInvoker. +struct AwsLambdaInvoker { + client: LambdaClient, + function_name: String, + /// The version number to invoke (e.g., "7", "12"). + version: String, +} + +impl AwsLambdaInvoker { + /// Create a new AWS Lambda invoker for a specific version. + async fn new(function_name: &str, version: &str) -> InvokerResult { + let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let client = LambdaClient::new(&aws_config); + + Ok(Self { + client, + function_name: function_name.to_string(), + version: version.to_string(), + }) + } + + /// Validate that the Lambda function version exists and is invocable. + /// Uses DryRun invocation type - validates without executing. + async fn validate(&self) -> InvokerResult<()> { + info!("lambda invoker dry run"); + let request = self + .client + .invoke() + .function_name(&self.function_name) + .qualifier(&self.version) + .invocation_type(InvocationType::DryRun); + + request.send().await.map_err(|e| { + InvokerError::Configuration(format!( + "Failed to validate Lambda function '{}:{}': {}", + self.function_name, self.version, e + )) + })?; + + info!("the lambda invoker dry run was successful"); + Ok(()) + } +} + +#[async_trait] +impl LambdaLeafSearchInvoker for AwsLambdaInvoker { + #[instrument(skip(self, request), fields(function_name = %self.function_name, version = %self.version))] + async fn invoke_leaf_search( + &self, + request: LeafSearchRequest, + ) -> Result, SearchError> { + let start = std::time::Instant::now(); + + let result = self.invoke_leaf_search_inner(request).await; + + let elapsed = start.elapsed().as_secs_f64(); + let status = if result.is_ok() { "success" } else { "error" }; + LAMBDA_METRICS + .leaf_search_requests_total + .with_label_values([status]) + .inc(); + LAMBDA_METRICS + .leaf_search_duration_seconds + .with_label_values([status]) + .observe(elapsed); + + result + } +} + +impl AwsLambdaInvoker { + async fn invoke_leaf_search_inner( + &self, + request: LeafSearchRequest, + ) -> Result, SearchError> { + // Serialize request to protobuf bytes, then base64 encode + let request_bytes = request.encode_to_vec(); + let payload = LeafSearchPayload { + payload: BASE64_STANDARD.encode(&request_bytes), + }; + + let payload_json = serde_json::to_vec(&payload) + .map_err(|e| SearchError::Internal(format!("JSON serialization error: {}", e)))?; + + LAMBDA_METRICS + .leaf_search_request_payload_size_bytes + .observe(payload_json.len() as f64); + + debug!( + payload_size = payload_json.len(), + version = %self.version, + "Invoking Lambda function" + ); + + // Invoke the specific version + let invoke_builder = self + .client + .invoke() + .function_name(&self.function_name) + .qualifier(&self.version) + .invocation_type(InvocationType::RequestResponse) + .payload(Blob::new(payload_json)); + + let response = invoke_builder + .send() + .await + .map_err(|e| SearchError::Internal(format!("Lambda invocation error: {}", e)))?; + + // Check for function error + if let Some(error) = response.function_error() { + let error_payload = response + .payload() + .map(|b| String::from_utf8_lossy(b.as_ref()).to_string()) + .unwrap_or_default(); + return Err(SearchError::Internal(format!( + "Lambda function error: {}: {}", + error, error_payload + ))); + } + + // Deserialize response + let response_payload = response + .payload() + .ok_or_else(|| SearchError::Internal("No response payload from Lambda".into()))?; + + LAMBDA_METRICS + .leaf_search_response_payload_size_bytes + .observe(response_payload.as_ref().len() as f64); + + let lambda_response: LeafSearchResponsePayload = + serde_json::from_slice(response_payload.as_ref()) + .map_err(|e| SearchError::Internal(format!("JSON deserialization error: {}", e)))?; + + let response_bytes = BASE64_STANDARD + .decode(&lambda_response.payload) + .map_err(|e| SearchError::Internal(format!("Base64 decode error: {}", e)))?; + + let leaf_responses = LeafSearchResponses::decode(&response_bytes[..]) + .map_err(|e| SearchError::Internal(format!("Protobuf decode error: {}", e)))?; + + debug!( + num_responses = leaf_responses.responses.len(), + "Lambda invocation completed" + ); + + Ok(leaf_responses.responses) + } +} diff --git a/quickwit/quickwit-lambda-client/src/lib.rs b/quickwit/quickwit-lambda-client/src/lib.rs new file mode 100644 index 00000000000..ead3aef464b --- /dev/null +++ b/quickwit/quickwit-lambda-client/src/lib.rs @@ -0,0 +1,41 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! AWS Lambda client for Quickwit leaf search operations. +//! +//! This crate provides: +//! - An AWS Lambda implementation of the `RemoteFunctionInvoker` trait +//! - Auto-deployment functionality for Lambda functions +//! +//! # Usage +//! +//! Use `get_or_deploy_invoker` to get an invoker that will automatically deploy +//! the Lambda function if needed: +//! +//! ```ignore +//! let invoker = get_or_deploy_invoker(&function_name, &deploy_config).await?; +//! ``` + +// mod deploy; +mod error; +mod invoker; +mod metrics; + +pub use error::{InvokerError, InvokerResult, LambdaDeployError, LambdaDeployResult}; +pub use metrics::LAMBDA_METRICS; +// Re-export payload types from server crate for convenience +pub use quickwit_lambda_server::{LeafSearchPayload, LeafSearchResponsePayload}; +mod deploy; + +pub use deploy::try_get_or_deploy_invoker; diff --git a/quickwit/quickwit-lambda-client/src/metrics.rs b/quickwit/quickwit-lambda-client/src/metrics.rs new file mode 100644 index 00000000000..89eb92f5a07 --- /dev/null +++ b/quickwit/quickwit-lambda-client/src/metrics.rs @@ -0,0 +1,82 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// See https://prometheus.io/docs/practices/naming/ + +use once_cell::sync::Lazy; +use quickwit_common::metrics::{ + Histogram, HistogramVec, IntCounterVec, exponential_buckets, new_counter_vec, new_histogram, + new_histogram_vec, +}; + +/// From 0.008s to 131s +fn duration_buckets() -> Vec { + exponential_buckets(0.008, 2.0, 15).unwrap() +} + +/// From 1KB to 50MB +fn payload_size_buckets() -> Vec { + exponential_buckets(1024.0, 4.0, 8).unwrap() +} + +pub struct LambdaMetrics { + pub leaf_search_requests_total: IntCounterVec<1>, + pub leaf_search_duration_seconds: HistogramVec<1>, + pub leaf_search_request_payload_size_bytes: Histogram, + pub leaf_search_response_payload_size_bytes: Histogram, + pub deploy_total: IntCounterVec<1>, +} + +impl Default for LambdaMetrics { + fn default() -> Self { + LambdaMetrics { + leaf_search_requests_total: new_counter_vec( + "leaf_search_requests_total", + "Total number of Lambda leaf search invocations.", + "lambda", + &[], + ["status"], + ), + leaf_search_duration_seconds: new_histogram_vec( + "leaf_search_duration_seconds", + "Duration of Lambda leaf search invocations in seconds.", + "lambda", + &[], + ["status"], + duration_buckets(), + ), + leaf_search_request_payload_size_bytes: new_histogram( + "leaf_search_request_payload_size_bytes", + "Size of the request payload sent to Lambda in bytes.", + "lambda", + payload_size_buckets(), + ), + leaf_search_response_payload_size_bytes: new_histogram( + "leaf_search_response_payload_size_bytes", + "Size of the response payload received from Lambda in bytes.", + "lambda", + payload_size_buckets(), + ), + deploy_total: new_counter_vec( + "deploy_total", + "Total number of Lambda deployment attempts.", + "lambda", + &[], + ["status"], + ), + } + } +} + +pub static LAMBDA_METRICS: Lazy = Lazy::new(LambdaMetrics::default); diff --git a/quickwit/quickwit-lambda-server/Cargo.toml b/quickwit/quickwit-lambda-server/Cargo.toml new file mode 100644 index 00000000000..53e3725131e --- /dev/null +++ b/quickwit/quickwit-lambda-server/Cargo.toml @@ -0,0 +1,53 @@ +[package] +name = "quickwit-lambda-server" +description = "AWS Lambda handler for Quickwit leaf search" + +version.workspace = true +edition.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true +authors.workspace = true +license.workspace = true + +[package.metadata.cargo-machete] +# Its here even though it is not useful, in order to enable its "vendor" feature, +# allowing the cross-build. +ignored = ["openssl"] + +[dependencies] +anyhow = { workspace = true } +base64 = { workspace = true } +bytesize = { workspace = true } +lambda_runtime = { workspace = true } +prost = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } + +openssl = { workspace = true, optional = true } + +quickwit-common = { workspace = true } +quickwit-config = { workspace = true } +quickwit-proto = { workspace = true } +quickwit-search = { workspace = true } +quickwit-storage = { workspace = true } + +[[bin]] +name = "quickwit-aws-lambda-leaf-search" +path = "src/bin/leaf_search.rs" + +[features] +default = [] +testsuite = [] + +# Keep this in sync with quickwit-cli! +lambda-release = [ + # The vendored OpenSSL will be compiled from source during the + # build, avoiding the pkg-config dependency issue during + # cross-compilation. + "openssl/vendored", + # "quickwit-doc-mapper/multilang", +] diff --git a/quickwit/quickwit-lambda-server/README.md b/quickwit/quickwit-lambda-server/README.md new file mode 100644 index 00000000000..07c8190be41 --- /dev/null +++ b/quickwit/quickwit-lambda-server/README.md @@ -0,0 +1,13 @@ +# Lambda search + +Quickwit makes it possible to run leaf search on lambdas. + +In order to make it possible to update quickwit AND the function you +can have Quickwit itself can deploy/create the function. + +For that purpose Quickwit client's deployer embeds the lambda function zip file +within Quickwit's binary. + +That binary needs to be built and released by manually triggerring the `publish_lambda.yaml` github action. +By default this action only creates a draft release. You will need to manually publish the release for the url to be publicly downloadable. +The URL in quickwit-lambda-client/build.rs then needs to be manually updated. diff --git a/quickwit/quickwit-lambda-server/src/bin/leaf_search.rs b/quickwit/quickwit-lambda-server/src/bin/leaf_search.rs new file mode 100644 index 00000000000..2cf4419259e --- /dev/null +++ b/quickwit/quickwit-lambda-server/src/bin/leaf_search.rs @@ -0,0 +1,48 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! AWS Lambda binary entry point for Quickwit leaf search. + +use std::sync::Arc; + +use lambda_runtime::{Error, LambdaEvent, service_fn}; +use quickwit_lambda_server::{LambdaSearcherContext, LeafSearchPayload, handle_leaf_search}; +use tracing::info; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> Result<(), Error> { + // Initialize tracing with JSON output for CloudWatch + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .json() + .init(); + + // Initialize context on cold start (wrapped in Arc for sharing across invocations) + let context = Arc::new(LambdaSearcherContext::try_from_env()?); + + info!("lambda context initialized, starting handler loop"); + + // Run the Lambda handler + lambda_runtime::run(service_fn(|event: LambdaEvent| { + let ctx = Arc::clone(&context); + async move { + let (payload, _event_ctx) = event.into_parts(); + handle_leaf_search(payload, &ctx) + .await + .map_err(|e| lambda_runtime::Error::from(e.to_string())) + } + })) + .await +} diff --git a/quickwit/quickwit-lambda-server/src/config.rs b/quickwit/quickwit-lambda-server/src/config.rs new file mode 100644 index 00000000000..72a0f26c493 --- /dev/null +++ b/quickwit/quickwit-lambda-server/src/config.rs @@ -0,0 +1,50 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context as _; +use bytesize::ByteSize; + +/// Configuration for the Lambda handler's SearcherContext. +/// These settings are optimized for Lambda's memory constraints. +#[derive(Clone, Debug)] +pub struct LambdaSearcherConfig { + /// Maximum concurrent split searches within a single Lambda invocation. + pub max_concurrent_split_searches: usize, + /// Warmup memory budget. + pub warmup_memory_budget: ByteSize, +} + +impl LambdaSearcherConfig { + pub fn try_from_env() -> anyhow::Result { + let lambda_memory_mib: u64 = + quickwit_common::get_from_env_opt("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", false) + .context("could not get aws lambda function memory size from ENV")?; + let lambda_memory = ByteSize::mib(lambda_memory_mib); + Self::for_memory(lambda_memory) + } + /// Create a Lambda-optimized searcher config based on the allocated memory. + pub fn for_memory(lambda_memory: ByteSize) -> anyhow::Result { + // Warmup budget is about half of memory + anyhow::ensure!( + lambda_memory >= ByteSize::gib(1u64), + "lambda memory must be at least 500MB" + ); + let warmup_memory_budget = + ByteSize::b(lambda_memory.as_u64() - ByteSize::mib(500).as_u64()); + Ok(Self { + max_concurrent_split_searches: 20, + warmup_memory_budget, + }) + } +} diff --git a/quickwit/quickwit-lambda-server/src/context.rs b/quickwit/quickwit-lambda-server/src/context.rs new file mode 100644 index 00000000000..eed01175a9f --- /dev/null +++ b/quickwit/quickwit-lambda-server/src/context.rs @@ -0,0 +1,51 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use quickwit_config::SearcherConfig; +use quickwit_search::SearcherContext; +use quickwit_storage::StorageResolver; +use tracing::info; + +use crate::config::LambdaSearcherConfig; + +/// Lambda-specific searcher context that holds resources for search execution. +pub struct LambdaSearcherContext { + pub searcher_context: Arc, + pub storage_resolver: StorageResolver, +} + +impl LambdaSearcherContext { + /// Create a new Lambda searcher context from environment variables. + pub fn try_from_env() -> anyhow::Result { + info!("initializing Lambda searcher context"); + + let config = LambdaSearcherConfig::try_from_env()?; + let searcher_config = create_searcher_config(&config); + let searcher_context = Arc::new(SearcherContext::new(searcher_config, None, None)); + let storage_resolver = StorageResolver::configured(&Default::default()); + + Ok(Self { + searcher_context, + storage_resolver, + }) + } +} + +fn create_searcher_config(config: &LambdaSearcherConfig) -> SearcherConfig { + let mut searcher_config = SearcherConfig::default(); + searcher_config.max_num_concurrent_split_searches = config.max_concurrent_split_searches; + searcher_config +} diff --git a/quickwit/quickwit-lambda-server/src/error.rs b/quickwit/quickwit-lambda-server/src/error.rs new file mode 100644 index 00000000000..bcc5d78a1de --- /dev/null +++ b/quickwit/quickwit-lambda-server/src/error.rs @@ -0,0 +1,89 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +use quickwit_search::SearchError; + +/// Result type for Lambda operations. +pub type LambdaResult = Result; + +/// Errors that can occur during Lambda handler operations. +#[derive(Debug)] +pub enum LambdaError { + /// Error serializing/deserializing protobuf. + Serialization(String), + /// Error from the search operation. + Search(SearchError), + /// Internal error. + Internal(String), +} + +impl fmt::Display for LambdaError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + LambdaError::Serialization(msg) => write!(f, "Serialization error: {}", msg), + LambdaError::Search(err) => write!(f, "Search error: {}", err), + LambdaError::Internal(msg) => write!(f, "Internal error: {}", msg), + } + } +} + +impl std::error::Error for LambdaError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + LambdaError::Search(err) => Some(err), + _ => None, + } + } +} + +impl From for LambdaError { + fn from(err: SearchError) -> Self { + LambdaError::Search(err) + } +} + +impl From for LambdaError { + fn from(err: prost::DecodeError) -> Self { + LambdaError::Serialization(format!("Protobuf decode error: {}", err)) + } +} + +impl From for LambdaError { + fn from(err: prost::EncodeError) -> Self { + LambdaError::Serialization(format!("Protobuf encode error: {}", err)) + } +} + +impl From for LambdaError { + fn from(err: base64::DecodeError) -> Self { + LambdaError::Serialization(format!("Base64 decode error: {}", err)) + } +} + +impl From for LambdaError { + fn from(err: serde_json::Error) -> Self { + LambdaError::Serialization(format!("JSON error: {}", err)) + } +} + +impl From for SearchError { + fn from(err: LambdaError) -> Self { + match err { + LambdaError::Search(search_err) => search_err, + other => SearchError::Internal(other.to_string()), + } + } +} diff --git a/quickwit/quickwit-lambda-server/src/handler.rs b/quickwit/quickwit-lambda-server/src/handler.rs new file mode 100644 index 00000000000..bc346f89cce --- /dev/null +++ b/quickwit/quickwit-lambda-server/src/handler.rs @@ -0,0 +1,144 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use base64::prelude::*; +use prost::Message; +use quickwit_proto::search::{ + LeafSearchRequest, LeafSearchResponse, LeafSearchResponses, SplitIdAndFooterOffsets, +}; +use quickwit_search::leaf::multi_index_leaf_search; +use serde::{Deserialize, Serialize}; +use tokio::task::JoinSet; +use tracing::{info, instrument}; + +use crate::context::LambdaSearcherContext; +use crate::error::{LambdaError, LambdaResult}; + +/// Payload for leaf search Lambda invocation. +#[derive(Debug, Serialize, Deserialize)] +pub struct LeafSearchPayload { + /// Base64-encoded serialized LeafSearchRequest protobuf. + pub payload: String, +} + +/// Response from leaf search Lambda invocation. +#[derive(Debug, Serialize, Deserialize)] +pub struct LeafSearchResponsePayload { + /// Base64-encoded serialized `LeafSearchResponses` protobuf (one per split). + pub payload: String, +} + +/// Handle a leaf search request in Lambda. +/// +/// Returns one `LeafSearchResponse` per split. Each split is processed +/// independently so that the caller can cache and merge results individually. +#[instrument(skip(ctx), fields(request_id))] +pub async fn handle_leaf_search( + event: LeafSearchPayload, + ctx: &LambdaSearcherContext, +) -> LambdaResult { + // Decode base64 payload + let request_bytes = BASE64_STANDARD + .decode(&event.payload) + .map_err(|e| LambdaError::Serialization(format!("Base64 decode error: {}", e)))?; + + // Deserialize LeafSearchRequest + let leaf_search_request = LeafSearchRequest::decode(&request_bytes[..])?; + + let all_splits: Vec<(usize, SplitIdAndFooterOffsets)> = leaf_search_request + .leaf_requests + .iter() + .enumerate() + .flat_map(|(leaf_req_idx, leaf_request_ref)| { + leaf_request_ref + .split_offsets + .iter() + .cloned() + .map(move |split_id_and_footer_offsets| (leaf_req_idx, split_id_and_footer_offsets)) + }) + .collect(); + + let num_splits = all_splits.len(); + info!(num_splits, "processing leaf search request (per-split)"); + + // Process each split in parallel. The SearchPermitProvider inside + // SearcherContext gates concurrency based on memory budget. + let mut join_set = JoinSet::new(); + for (split_idx, (leaf_req_idx, split)) in all_splits.into_iter().enumerate() { + let leaf_request_ref = &leaf_search_request.leaf_requests[leaf_req_idx]; + let single_split_request = LeafSearchRequest { + search_request: leaf_search_request.search_request.clone(), + doc_mappers: leaf_search_request.doc_mappers.clone(), + index_uris: leaf_search_request.index_uris.clone(), + leaf_requests: vec![quickwit_proto::search::LeafRequestRef { + index_uri_ord: leaf_request_ref.index_uri_ord, + doc_mapper_ord: leaf_request_ref.doc_mapper_ord, + split_offsets: vec![split], + }], + }; + + let searcher_context = ctx.searcher_context.clone(); + let storage_resolver = ctx.storage_resolver.clone(); + join_set.spawn(async move { + let response = multi_index_leaf_search( + searcher_context, + single_split_request, + &storage_resolver, + ) + .await; + (split_idx, response) + }); + } + + // Collect results, preserving split order. + let mut indexed_responses: Vec<(usize, LeafSearchResponse)> = + Vec::with_capacity(num_splits); + while let Some(join_result) = join_set.join_next().await { + let (split_idx, search_result) = join_result + .map_err(|e| LambdaError::Internal(format!("split search task failed: {e}")))?; + let response = search_result + .map_err(|e| LambdaError::Internal(format!("leaf search failed: {e}")))?; + indexed_responses.push((split_idx, response)); + } + indexed_responses.sort_by_key(|(idx, _)| *idx); + let responses: Vec = + indexed_responses.into_iter().map(|(_, r)| r).collect(); + + info!( + num_responses = responses.len(), + "leaf search completed (per-split)" + ); + + // Serialize as LeafSearchResponses wrapper + let wrapper = LeafSearchResponses { responses }; + let response_bytes = wrapper.encode_to_vec(); + let payload = BASE64_STANDARD.encode(&response_bytes); + + Ok(LeafSearchResponsePayload { payload }) +} + +/// Decode a LeafSearchRequest from base64-encoded protobuf bytes. +#[allow(dead_code)] +pub fn decode_leaf_search_request(payload: &str) -> LambdaResult { + let bytes = BASE64_STANDARD + .decode(payload) + .map_err(|e| LambdaError::Serialization(format!("Base64 decode error: {}", e)))?; + LeafSearchRequest::decode(&bytes[..]).map_err(LambdaError::from) +} + +/// Encode a LeafSearchResponse to base64-encoded protobuf bytes. +#[allow(dead_code)] +pub fn encode_leaf_search_response(response: &LeafSearchResponse) -> String { + BASE64_STANDARD.encode(response.encode_to_vec()) +} diff --git a/quickwit/quickwit-lambda-server/src/lib.rs b/quickwit/quickwit-lambda-server/src/lib.rs new file mode 100644 index 00000000000..d67bc6085d4 --- /dev/null +++ b/quickwit/quickwit-lambda-server/src/lib.rs @@ -0,0 +1,28 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! AWS Lambda handler for Quickwit leaf search operations. +//! +//! This crate provides the Lambda handler that executes leaf search requests. +//! It is designed to be deployed as an AWS Lambda function. + +mod config; +mod context; +mod error; +mod handler; + +pub use config::LambdaSearcherConfig; +pub use context::LambdaSearcherContext; +pub use error::{LambdaError, LambdaResult}; +pub use handler::{LeafSearchPayload, LeafSearchResponsePayload, handle_leaf_search}; diff --git a/quickwit/quickwit-lambda/README.md b/quickwit/quickwit-lambda/README.md deleted file mode 100644 index 88fa9c8748a..00000000000 --- a/quickwit/quickwit-lambda/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# Deprecation - -This package was removed in Q3 2025. The maintenance burden was high and the -feature was unused. \ No newline at end of file diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 04fa0cedf2e..33eb2fdd01c 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -493,6 +493,12 @@ message LeafSearchResponse { ResourceStats resource_stats = 8; } +// Wrapper for multiple LeafSearchResponse messages, used by Lambda to return +// per-split results. +message LeafSearchResponses { + repeated LeafSearchResponse responses = 1; +} + message SnippetRequest { repeated string snippet_fields = 1; string query_ast_resolved = 2; diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index e1201ce7a0e..f0d9ad3d434 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -456,6 +456,14 @@ pub struct LeafSearchResponse { #[prost(message, optional, tag = "8")] pub resource_stats: ::core::option::Option, } +/// Wrapper for multiple LeafSearchResponse messages, used by Lambda to return +/// per-split results. +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LeafSearchResponses { + #[prost(message, repeated, tag = "1")] + pub responses: ::prost::alloc::vec::Vec, +} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct SnippetRequest { diff --git a/quickwit/quickwit-proto/src/error.rs b/quickwit/quickwit-proto/src/error.rs index b03536d0283..aa3905135a1 100644 --- a/quickwit/quickwit-proto/src/error.rs +++ b/quickwit/quickwit-proto/src/error.rs @@ -97,7 +97,7 @@ where E: ServiceError } /// A trait for encoding/decoding service errors to/from gRPC statuses. Errors are stored in JSON -/// in the gRPC header [`QW_ERROR_HEADER_NAME`]. This allows for propagating them transparently +/// in the gRPC header `qw-error-bin`. This allows for propagating them transparently /// between clients and servers over the network without being semantically limited to a status code /// and a message. However, it also means that modifying the serialization format of existing errors /// or introducing new ones is not backward compatible. diff --git a/quickwit/quickwit-search/src/invoker.rs b/quickwit/quickwit-search/src/invoker.rs new file mode 100644 index 00000000000..f160d88019f --- /dev/null +++ b/quickwit/quickwit-search/src/invoker.rs @@ -0,0 +1,35 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Trait for invoking remote serverless functions for leaf search. + +use async_trait::async_trait; +use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse}; + +use crate::SearchError; + +/// Trait for invoking remote serverless functions (e.g., AWS Lambda) for leaf search. +/// +/// This abstraction allows different cloud providers to be supported. +/// Implementations are provided by the `quickwit-lambda` crate. +#[async_trait] +pub trait LambdaLeafSearchInvoker: Send + Sync + 'static { + /// Invoke the remote function with a LeafSearchRequest. + /// + /// Returns one `LeafSearchResponse` per split in the request. + async fn invoke_leaf_search( + &self, + request: LeafSearchRequest, + ) -> Result, SearchError>; +} diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 4caf2587909..f6b93a82d8c 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -685,7 +685,8 @@ fn visit_aggregation_mut( modified_something } -// equivalent to Bound::map, which is unstable +/// Maps a `Bound` to a `Bound` by applying a function to the contained value. +/// Equivalent to `Bound::map`, which is currently unstable. pub fn map_bound(bound: Bound, f: impl FnOnce(T) -> U) -> Bound { use Bound::*; match bound { @@ -1354,7 +1355,7 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) { /// Searches multiple splits for a specific index and a single doc mapping /// /// The leaf search collects all kind of information, and returns a set of -/// [PartialHit](quickwit_proto::search::PartialHit) candidates. The root will be in +/// [PartialHit] candidates. The root will be in /// charge to consolidate, identify the actual final top hits to display, and /// fetch the actual documents to convert the partial hits into actual Hits. pub async fn single_doc_mapping_leaf_search( @@ -1378,20 +1379,93 @@ pub async fn single_doc_mapping_leaf_search( let incremental_merge_collector = IncrementalCollector::new(merge_collector); let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector)); - // We acquire all of the leaf search permits to make sure our single split search tasks - // do no interleave with other leaf search requests. - let permit_sizes = split_with_req.iter().map(|(split, _)| { - compute_initial_memory_allocation( - split, - searcher_context - .searcher_config - .warmup_single_split_initial_allocation, - ) - }); - let permit_futures = searcher_context - .search_permit_provider - .get_permits(permit_sizes) - .await; + // Step 1: Check cache for each split before acquiring permits. + let mut uncached_splits: Vec<(SplitIdAndFooterOffsets, SearchRequest)> = + Vec::with_capacity(split_with_req.len()); + for (split, search_request) in split_with_req { + let mut rewritten_request = search_request.clone(); + rewrite_request( + &mut rewritten_request, + &split, + doc_mapper.timestamp_field_name(), + ); + if let Some(cached_response) = searcher_context + .leaf_search_cache + .get(split.clone(), rewritten_request) + { + incremental_merge_collector + .lock() + .unwrap() + .add_result(cached_response) + .ok(); + } else { + uncached_splits.push((split, search_request)); + } + } + + if uncached_splits.is_empty() { + let incremental_merge_collector = match Arc::try_unwrap(incremental_merge_collector) { + Ok(filter_merger) => filter_merger.into_inner().unwrap(), + Err(filter_merger) => filter_merger.lock().unwrap().clone(), + }; + return crate::search_thread_pool() + .run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into)) + .instrument(info_span!("incremental_merge_intermediate")) + .await + .context("failed to merge split search responses")?; + } + + // Step 2: Determine which uncached splits to process locally vs offload. + let lambda_available = searcher_context.lambda_invoker.is_some() + && searcher_context.searcher_config.lambda.is_some(); + + let offload_threshold = if lambda_available { + searcher_context + .searcher_config + .lambda + .as_ref() + .unwrap() + .offload_threshold + } else { + 0 + }; + + let permit_sizes: Vec = uncached_splits + .iter() + .map(|(split, _)| { + compute_initial_memory_allocation( + split, + searcher_context + .searcher_config + .warmup_single_split_initial_allocation, + ) + }) + .collect(); + + let (local_splits_with_permits, offloaded_splits) = if lambda_available + && offload_threshold > 0 + { + let partition = searcher_context + .search_permit_provider + .get_permits_with_offload(permit_sizes, offload_threshold) + .await; + (partition.local, partition.offloaded) + } else { + let permit_futures = searcher_context + .search_permit_provider + .get_permits(permit_sizes) + .await; + let local: Vec<(usize, _)> = permit_futures.into_iter().enumerate().collect(); + (local, Vec::new()) + }; + + if !offloaded_splits.is_empty() { + info!( + num_local = local_splits_with_permits.len(), + num_offloaded = offloaded_splits.len(), + "partitioned splits between local and Lambda" + ); + } let leaf_search_context = Arc::new(LeafSearchContext { searcher_context: searcher_context.clone(), @@ -1401,17 +1475,17 @@ pub async fn single_doc_mapping_leaf_search( split_filter: split_filter.clone(), }); + // Step 3: Spawn local split search tasks. let mut split_search_futures = JoinSet::new(); - let mut task_id_to_split_id_map = HashMap::with_capacity(split_with_req.len()); - for ((split, search_request), permit_fut) in - split_with_req.into_iter().zip(permit_futures.into_iter()) - { + let mut task_id_to_split_id_map = HashMap::with_capacity(local_splits_with_permits.len()); + for (idx, permit_fut) in local_splits_with_permits { + let (split, search_request) = &uncached_splits[idx]; let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) .await; let Some(simplified_search_request) = - simplify_search_request(search_request, &split, &split_filter) + simplify_search_request(search_request.clone(), split, &split_filter) else { let mut leaf_search_state_guard = SplitSearchStateGuard::new(leaf_search_context.split_outcome_counters.clone()); @@ -1424,7 +1498,7 @@ pub async fn single_doc_mapping_leaf_search( simplified_search_request, leaf_search_context.clone(), index_storage.clone(), - split, + split.clone(), leaf_split_search_permit, aggregations_limits.clone(), ) @@ -1433,15 +1507,75 @@ pub async fn single_doc_mapping_leaf_search( task_id_to_split_id_map.insert(handle.id(), split_id); } - // TODO we could cancel running splits when !run_all_splits and the running split can no - // longer give better results after some other split answered. + // Step 4: Offload splits to Lambda. + if !offloaded_splits.is_empty() { + let lambda_invoker = searcher_context.lambda_invoker.as_ref().unwrap(); + let lambda_config = searcher_context.searcher_config.lambda.as_ref().unwrap(); + let batch_size = lambda_config.max_splits_per_invocation; + + let doc_mapper_str = serde_json::to_string(doc_mapper.as_ref()) + .map_err(|e| SearchError::Internal(format!("failed to serialize doc mapper: {e}")))?; + + // Build LeafSearchRequest with offloaded splits in batches. + let offloaded_split_offsets: Vec = offloaded_splits + .iter() + .map(|&idx| uncached_splits[idx].0.clone()) + .collect(); + + let index_uri = index_storage.uri().to_string(); + + let mut search_request_for_leaf = (*request).clone(); + search_request_for_leaf.start_offset = 0; + search_request_for_leaf.max_hits += request.start_offset; + + let mut lambda_tasks = Vec::new(); + for chunk in offloaded_split_offsets.chunks(batch_size) { + let leaf_request = LeafSearchRequest { + search_request: Some(search_request_for_leaf.clone()), + doc_mappers: vec![doc_mapper_str.clone()], + index_uris: vec![index_uri.clone()], + leaf_requests: vec![quickwit_proto::search::LeafRequestRef { + index_uri_ord: 0, + doc_mapper_ord: 0, + split_offsets: chunk.to_vec(), + }], + }; + let invoker = lambda_invoker.clone(); + lambda_tasks.push(async move { invoker.invoke_leaf_search(leaf_request).await }); + } + + let lambda_results = try_join_all(lambda_tasks).await; + match lambda_results { + Ok(batch_results) => { + let mut locked = incremental_merge_collector.lock().unwrap(); + for per_split_results in batch_results { + for response in per_split_results { + if let Err(err) = locked.add_result(response) { + error!(error = %err, "failed to add Lambda result to collector"); + } + } + } + } + Err(err) => { + error!(error = %err, "Lambda invocation failed for offloaded splits"); + let mut locked = incremental_merge_collector.lock().unwrap(); + for &idx in &offloaded_splits { + locked.add_failed_split(SplitSearchError { + split_id: uncached_splits[idx].0.split_id.clone(), + error: format!("Lambda invocation error: {err}"), + retryable_error: true, + }); + } + } + } + } + + // Step 5: Await all local tasks. let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new(); while let Some(leaf_search_join_result) = split_search_futures.join_next().await { - // splits that did not panic were already added to the collector if let Err(join_error) = leaf_search_join_result { if join_error.is_cancelled() { - // An explicit task cancellation is not an error. continue; } let split_id = task_id_to_split_id_map.get(&join_error.id()).unwrap(); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 008556d595f..89b45a69014 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -23,7 +23,10 @@ mod collector; mod error; mod fetch_docs; mod find_trace_ids_collector; -mod leaf; + +mod invoker; +/// Leaf search operations. +pub mod leaf; mod leaf_cache; mod list_fields; mod list_fields_cache; @@ -80,6 +83,7 @@ pub use crate::client::{ pub use crate::cluster_client::ClusterClient; pub use crate::error::{SearchError, parse_grpc_error}; use crate::fetch_docs::fetch_docs; +pub use crate::invoker::LambdaLeafSearchInvoker; pub use crate::root::{ IndexMetasForLeafSearch, SearchJob, check_all_index_metadata_found, jobs_to_leaf_request, root_search, search_plan, @@ -283,7 +287,7 @@ pub async fn single_node_search( let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); let cluster_client = ClusterClient::new(search_job_placer); let searcher_config = SearcherConfig::default(); - let searcher_context = Arc::new(SearcherContext::new(searcher_config, None)); + let searcher_context = Arc::new(SearcherContext::new(searcher_config, None, None)); let search_service = Arc::new(SearchServiceImpl::new( metastore.clone(), storage_resolver, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 581665fa14f..ed292523ea0 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -13,8 +13,8 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; -use std::sync::OnceLock; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::OnceLock; use std::time::{Duration, Instant}; use anyhow::Context; @@ -45,7 +45,7 @@ use tantivy::aggregation::agg_result::AggregationResults; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::collector::Collector; use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; -use tracing::{debug, info, info_span, instrument}; +use tracing::{debug, error, info, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{QuickwitAggregations, make_merge_collector}; @@ -1233,7 +1233,7 @@ pub async fn root_search( if let Some(max_total_split_searches) = searcher_context.searcher_config.max_splits_per_search && max_total_split_searches < num_splits { - tracing::error!( + error!( num_splits, max_total_split_searches, index=?search_request.index_id_patterns, diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 2ceb8ec5d59..13b18fa0e63 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -43,6 +43,12 @@ pub enum SearchPermitMessage { permit_sender: oneshot::Sender>, permit_sizes: Vec, }, + RequestWithOffload { + permit_sender: oneshot::Sender, + permit_sizes: Vec, + /// Maximum number of additional pending requests before offloading. + offload_threshold: usize, + }, UpdateMemory { memory_delta: i64, }, @@ -53,6 +59,16 @@ pub enum SearchPermitMessage { }, } +/// Result of a permit request that supports offloading some splits to Lambda. +pub struct PermitPartition { + /// Permits for splits to process locally. + /// Each entry is (split_index, permit_future) where split_index + /// refers to the position in the original request. + pub local: Vec<(usize, SearchPermitFuture)>, + /// Indices of splits that should be offloaded to Lambda. + pub offloaded: Vec, +} + /// Makes very pessimistic estimate of the memory allocation required for a split search /// /// This is refined later on when more data is available about the split. @@ -123,6 +139,32 @@ impl SearchPermitProvider { .await .expect("Receiver lives longer than sender") } + + /// Returns permits for local splits and a list of split indices to offload. + /// + /// The actor checks the current pending queue depth. If adding all splits + /// would exceed `offload_threshold` pending requests, only enough splits + /// to fill up to the threshold are processed locally; the rest are offloaded. + /// + /// If `offload_threshold` is 0, all splits go local (no offloading). + pub async fn get_permits_with_offload( + &self, + splits: impl IntoIterator, + offload_threshold: usize, + ) -> PermitPartition { + let (permit_sender, permit_receiver) = oneshot::channel(); + let permit_sizes = splits.into_iter().map(|size| size.as_u64()).collect(); + self.message_sender + .send(SearchPermitMessage::RequestWithOffload { + permit_sender, + permit_sizes, + offload_threshold, + }) + .expect("Receiver lives longer than sender"); + permit_receiver + .await + .expect("Receiver lives longer than sender") + } } struct SearchPermitActor { @@ -162,11 +204,33 @@ impl SearchPermitActor { permits.push(SearchPermitFuture(rx)); } self.assign_available_permits(); - // The receiver could be dropped in the (unlikely) situation - // where the future requesting these permits is cancelled before - // this message is processed. let _ = permit_sender.send(permits); } + SearchPermitMessage::RequestWithOffload { + permit_sizes, + permit_sender, + offload_threshold, + } => { + let current_pending = self.permits_requests.len(); + // How many new splits can we accept locally before hitting the threshold. + let local_capacity = offload_threshold.saturating_sub(current_pending); + let num_local = permit_sizes.len().min(local_capacity); + + let mut local = Vec::with_capacity(num_local); + let mut offloaded = Vec::with_capacity(permit_sizes.len() - num_local); + + for (idx, permit_size) in permit_sizes.into_iter().enumerate() { + if idx < num_local { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back((tx, permit_size)); + local.push((idx, SearchPermitFuture(rx))); + } else { + offloaded.push(idx); + } + } + self.assign_available_permits(); + let _ = permit_sender.send(PermitPartition { local, offloaded }); + } SearchPermitMessage::UpdateMemory { memory_delta } => { if self.total_memory_allocated as i64 + memory_delta < 0 { panic!("More memory released than allocated, should never happen.") diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index cf2c0699b81..ec75a212f14 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -33,6 +33,7 @@ use quickwit_storage::{ }; use tantivy::aggregation::AggregationLimitsGuard; +use crate::invoker::LambdaLeafSearchInvoker; use crate::leaf::multi_index_leaf_search; use crate::leaf_cache::{LeafSearchCache, PredicateCacheImpl}; use crate::list_fields::{leaf_list_fields, root_list_fields}; @@ -416,6 +417,8 @@ pub struct SearcherContext { pub list_fields_cache: ListFieldsCache, /// The aggregation limits are passed to limit the memory usage. pub aggregation_limit: AggregationLimitsGuard, + /// Optional Lambda invoker for offloading leaf search to serverless functions. + pub lambda_invoker: Option>, } impl std::fmt::Debug for SearcherContext { @@ -431,11 +434,15 @@ impl SearcherContext { #[cfg(test)] pub fn for_test() -> SearcherContext { let searcher_config = SearcherConfig::default(); - SearcherContext::new(searcher_config, None) + SearcherContext::new(searcher_config, None, None) } /// Creates a new searcher context, given a searcher config, and an optional `SplitCache`. - pub fn new(searcher_config: SearcherConfig, split_cache_opt: Option>) -> Self { + pub fn new( + searcher_config: SearcherConfig, + split_cache_opt: Option>, + lambda_invoker: Option>, + ) -> Self { let global_split_footer_cache = MemorySizedCache::from_config( &searcher_config.split_footer_cache, &quickwit_storage::STORAGE_METRICS.split_footer_cache, @@ -464,6 +471,7 @@ impl SearcherContext { list_fields_cache, split_cache_opt, aggregation_limit, + lambda_invoker, } } diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 37acf663a1e..e93ae0d398f 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1029,7 +1029,7 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec { ..Default::default() }); let searcher_context: Arc = - Arc::new(SearcherContext::new(SearcherConfig::default(), None)); + Arc::new(SearcherContext::new(SearcherConfig::default(), None, None)); let agg_limits = searcher_context.get_aggregation_limits(); @@ -1669,7 +1669,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> { .into_iter() .map(|split| extract_split_and_footer_offsets(&split.split_metadata)) .collect(); - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default(), None)); + let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default(), None, None)); { let request = ListTermsRequest { diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 363065a3403..16f8a36002d 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -72,6 +72,7 @@ quickwit-opentelemetry = { workspace = true } quickwit-proto = { workspace = true } quickwit-query = { workspace = true } quickwit-search = { workspace = true } +quickwit-lambda-client = { workspace = true } quickwit-storage = { workspace = true } quickwit-telemetry = { workspace = true } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index ca4520ff0ce..400e1e4261a 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -630,9 +630,19 @@ pub async fn serve_quickwit( None }; + // Initialize Lambda invoker if enabled + let lambda_invoker_opt = if let Some(lambda_config) = &node_config.searcher_config.lambda { + info!("initializing AWS Lambda invoker for leaf search"); + let invoker = quickwit_lambda_client::try_get_or_deploy_invoker(lambda_config).await?; + Some(invoker) + } else { + None + }; + let searcher_context = Arc::new(SearcherContext::new( node_config.searcher_config.clone(), split_cache_opt, + lambda_invoker_opt, )); let (search_job_placer, search_service) = setup_searcher( @@ -1017,6 +1027,7 @@ async fn setup_searcher( ) -> anyhow::Result<(SearchJobPlacer, Arc)> { let searcher_pool = SearcherPool::default(); let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); + let search_service = start_searcher_service( metastore, storage_resolver, @@ -1553,7 +1564,8 @@ mod tests { #[tokio::test] async fn test_setup_searcher() { let node_config = NodeConfig::for_test(); - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default(), None)); + let searcher_context = + Arc::new(SearcherContext::new(SearcherConfig::default(), None, None)); let metastore = metastore_for_test(); let (change_stream, change_stream_tx) = ClusterChangeStream::new_unbounded(); let storage_resolver = StorageResolver::unconfigured(); diff --git a/quickwit/rest-api-tests/config/quickwit.yaml b/quickwit/rest-api-tests/config/quickwit.yaml new file mode 100644 index 00000000000..8845b4cbcf9 --- /dev/null +++ b/quickwit/rest-api-tests/config/quickwit.yaml @@ -0,0 +1,157 @@ +# ============================ Node Configuration ============================== +# +# Website: https://quickwit.io +# Docs: https://quickwit.io/docs/configuration/node-config +# +# Configure AWS credentials: https://quickwit.io/docs/guides/aws-setup#aws-credentials +# +# -------------------------------- General settings -------------------------------- +# +# Config file format version. +# +version: 0.8 +# +# Node ID. Must be unique within a cluster. If not set, a random node ID is generated on each startup. +# +# node_id: node-1 +# +# Quickwit opens three sockets. +# - for its HTTP server, hosting the UI and the REST API (TCP) +# - for its gRPC service (TCP) +# - for its Gossip cluster membership service (UDP) +# +# All three services are bound to the same host and a different port. The host can be an IP address or a hostname. +# +# Default HTTP server host is `127.0.0.1` and default HTTP port is 7280. +# The default host value was chosen to avoid exposing the node to the open-world without users' explicit consent. +# This allows for testing Quickwit in single-node mode or with multiple nodes running on the same host and listening +# on different ports. However, in cluster mode, using this value is never appropriate because it causes the node to +# ignore incoming traffic. +# There are two options to set up a node in cluster mode: +# 1. specify the node's hostname or IP +# 2. pass `0.0.0.0` and let Quickwit do its best to discover the node's IP (see `advertise_address`) +# +# listen_address: 127.0.0.1 +# + + #dd_application_key: some_app_key + + +# rest: +# listen_port: 7280 +# cors_allow_origins: +# - "http://localhost:3000" +# extra_headers: +# x-header-1: header-value-1 +# x-header-2: header-value-2 +# +# grpc: +# max_message_size: 10 MiB +# +# IP address advertised by the node, i.e. the IP address that peer nodes should use to connect to the node for RPCs. +# The environment variable `QW_ADVERTISE_ADDRESS` can also be used to override this value. +# The default advertise address is `listen_address`. If `listen_address` is unspecified (`0.0.0.0`), +# Quickwit attempts to sniff the node's IP by scanning the available network interfaces. +# advertise_address: 192.168.0.42 +# +# In order to join a cluster, one needs to specify a list of +# seeds to connect to. If no port is specified, Quickwit will assume +# the seeds are using the same port as the current node gossip port. +# By default, the peer seed list is empty. +# +# peer_seeds: +# - quickwit-searcher-0.local +# - quickwit-searcher-1.local:10000 +# +# Path to directory where temporary data (caches, intermediate indexing data structures) +# is stored. Defaults to `./qwdata`. +# +# data_dir: /path/to/data/dir +# +# Metastore URI. Defaults to `data_dir/indexes#polling_interval=30s`, +# which is a file-backed metastore and mostly convenient for testing. A cluster would +# require a metastore backed by Amzon S3 or PostgreSQL. +# +# metastore_uri: s3://your-bucket/indexes +# metastore_uri: postgres://username:password@host:port/db +# +# When using a file-backed metastore, the state of the metastore will be cached forever. +# If you are indexing and searching from different processes, it is possible to periodically +# refresh the state of the metastore on the searcher using the `polling_interval` hashtag. +# +# metastore_uri: s3://your-bucket/indexes#polling_interval=30s +# +# Default index root URI, which defines where index data (splits) is stored, +# following the scheme `{default_index_root_uri}/{index-id}`. Defaults to `{data_dir}/indexes`. +# +# default_index_root_uri: s3://your-bucket/indexes +# +# -------------------------------- Storage settings -------------------------------- +# https://quickwit.io/docs/configuration/node-config#storage-configuration +# +# Hardcoding credentials into configuration files is not secure and strongly +# discouraged. Prefer the alternative authentication methods that your storage +# backend may provide. +# +# storage: +# azure: +# account: ${QW_AZURE_STORAGE_ACCOUNT} +# access_key: ${QW_AZURE_STORAGE_ACCESS_KEY} +# +# s3: +# access_key_id: ${AWS_ACCESS_KEY_ID} +# secret_access_key: ${AWS_SECRET_ACCESS_KEY} +# region: ${AWS_REGION} +# endpoint: ${QW_S3_ENDPOINT} +# force_path_style_access: ${QW_S3_FORCE_PATH_STYLE_ACCESS:-false} +# disable_multi_object_delete: false +# disable_multipart_upload: false +# +# -------------------------------- Metastore settings -------------------------------- +# https://quickwit.io/docs/configuration/node-config#metastore-configuration +# +# metastore: +# postgres: +# min_connections: 0 +# max_connections: 10 +# acquire_connection_timeout: 10s +# idle_connection_timeout: 10min +# max_connection_lifetime: 30min +# +# -------------------------------- Indexer settings -------------------------------- +# https://quickwit.io/docs/configuration/node-config#indexer-configuration + +indexer: + enable_otlp_endpoint: ${QW_ENABLE_OTLP_ENDPOINT:-true} +# split_store_max_num_bytes: 100G +# split_store_max_num_splits: 1000 +# max_concurrent_split_uploads: 12 +# +# +# -------------------------------- Ingest API settings ------------------------------ +# https://quickwit.io/docs/configuration/node-config#ingest-api-configuration +# +# ingest_api: +# max_queue_memory_usage: 2GiB +# max_queue_disk_usage: 4GiB +# content_length_limit: 10MiB +# +# -------------------------------- Searcher settings -------------------------------- +# https://quickwit.io/docs/configuration/node-config#searcher-configuration +# +searcher: +# fast_field_cache_capacity: 1G +# split_footer_cache_capacity: 500M + partial_request_cache_capacity: 0 +# max_num_concurrent_split_streams: 100 +# max_num_concurrent_split_searches: 100 +# aggregation_memory_limit: 500M +# aggregation_bucket_limit: 65000 +# split_cache: +# max_num_bytes: 1G +# max_num_splits: 10000 +# num_concurrent_downloads: 1 +# -------------------------------- Jaeger settings -------------------------------- + +jaeger: + enable_endpoint: ${QW_ENABLE_JAEGER_ENDPOINT:-true} diff --git a/quickwit/rest-api-tests/quickwit b/quickwit/rest-api-tests/quickwit new file mode 120000 index 00000000000..d48e118f0f9 --- /dev/null +++ b/quickwit/rest-api-tests/quickwit @@ -0,0 +1 @@ +/Users/paul.masurel/git/quickwit/quickwit/target/debug/quickwit \ No newline at end of file