diff --git a/.github/workflows/_build_rust_artifacts.yml b/.github/workflows/_build_rust_artifacts.yml index 5c1cd5c945..9d07b5902b 100644 --- a/.github/workflows/_build_rust_artifacts.yml +++ b/.github/workflows/_build_rust_artifacts.yml @@ -46,7 +46,7 @@ on: connector_plugins: type: string required: false - default: "iggy_connector_elasticsearch_sink,iggy_connector_elasticsearch_source,iggy_connector_iceberg_sink,iggy_connector_postgres_sink,iggy_connector_postgres_source,iggy_connector_quickwit_sink,iggy_connector_random_source,iggy_connector_stdout_sink" + default: "iggy_connector_elasticsearch_sink,iggy_connector_elasticsearch_source,iggy_connector_iceberg_sink,iggy_connector_postgres_sink,iggy_connector_postgres_source,iggy_connector_quickwit_sink,iggy_connector_random_source,iggy_connector_s3_sink,iggy_connector_stdout_sink" description: "Comma-separated list of connector plugin crates to build as shared libraries" outputs: artifact_name: diff --git a/.github/workflows/edge-release.yml b/.github/workflows/edge-release.yml index 0cbc6e1516..93bca1b0e9 100644 --- a/.github/workflows/edge-release.yml +++ b/.github/workflows/edge-release.yml @@ -102,6 +102,7 @@ jobs: - `iggy_connector_postgres_source` - `iggy_connector_quickwit_sink` - `iggy_connector_random_source` + - `iggy_connector_s3_sink` - `iggy_connector_stdout_sink` ## Downloads diff --git a/Cargo.lock b/Cargo.lock index c213ee6513..52ba04ae3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -383,7 +383,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -394,7 +394,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1160,6 +1160,22 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "attohttpc" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e2cdb6d5ed835199484bb92bb8b3edd526effe995c61732580439c1a67e2e9" +dependencies = [ + "base64 0.22.1", + "http 1.4.0", + "log", + "rustls", + "serde", + "serde_json", + "url", + "webpki-roots 1.0.7", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -1220,9 +1236,9 @@ dependencies = [ [[package]] name = "aws-config" -version = "1.8.15" +version = "1.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11493b0bad143270fb8ad284a096dd529ba91924c5409adeac856cc1bf047dbc" +checksum = "50f156acdd2cf55f5aa53ee416c4ac851cf1222694506c0b1f78c85695e9ca9d" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1260,6 +1276,23 @@ dependencies = [ "zeroize", ] +[[package]] +name = "aws-creds" +version = "0.39.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3b85155d265df828f84e53886ed9e427aed979dd8a39f5b8b2162c77e142d7" +dependencies = [ + "attohttpc", + "home", + "log", + "quick-xml 0.38.4", + "rust-ini", + "serde", + "thiserror 2.0.18", + "time", + "url", +] + [[package]] name = "aws-lc-rs" version = "1.16.3" @@ -1282,6 +1315,15 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-region" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "838b36c8dc927b6db1b6c6b8f5d05865f2213550b9e83bf92fa99ed6525472c0" +dependencies = [ + "thiserror 2.0.18", +] + [[package]] name = "aws-runtime" version = "1.7.3" @@ -1333,9 +1375,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.96.0" +version = "1.98.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f64a6eded248c6b453966e915d32aeddb48ea63ad17932682774eb026fbef5b1" +checksum = "d69c77aafa20460c68b6b3213c84f6423b6e76dbf89accd3e1789a686ffd9489" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1357,9 +1399,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.98.0" +version = "1.100.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db96d720d3c622fcbe08bae1c4b04a72ce6257d8b0584cb5418da00ae20a344f" +checksum = "1c7e7b09346d5ca22a2a08267555843a6a0127fb20d8964cb6ecfb8fdb190225" dependencies = [ "aws-credential-types", "aws-runtime", @@ -2404,6 +2446,15 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.60" @@ -2630,7 +2681,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -2653,6 +2704,19 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "compact_str" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", +] + [[package]] name = "compio" version = "0.18.0" @@ -3644,9 +3708,9 @@ dependencies = [ [[package]] name = "deltalake" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fa1982d9e81550676c64169e8e9feaaf8aafb218c88418a1e50642eecd2508e" +checksum = "8268de472e1692b47d4a3016d3443fa7aa1f24d5a92337ee867204951959c293" dependencies = [ "buoyant_kernel", "ctor", @@ -3699,9 +3763,9 @@ dependencies = [ [[package]] name = "deltalake-core" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fe744c5c69db390db74a114f7d5e412f1b5d8cc3dcef50bf19599035ad8fff6" +checksum = "4b6f1d41164959efaaae6d77fc00f9609ec59159ad4d0278924e79a0738f61b0" dependencies = [ "arrow", "arrow-arith 58.1.0", @@ -4069,7 +4133,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4393,7 +4457,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4984,7 +5048,7 @@ dependencies = [ "libc", "log", "rustversion", - "windows-link 0.1.3", + "windows-link 0.2.1", "windows-result 0.4.1", ] @@ -6791,6 +6855,27 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_s3_sink" +version = "0.4.0" +dependencies = [ + "async-trait", + "base64 0.22.1", + "byte-unit", + "chrono", + "dashmap", + "humantime", + "iggy_common", + "iggy_connector_sdk", + "once_cell", + "rust-s3", + "serde", + "serde_json", + "simd-json", + "tokio", + "tracing", +] + [[package]] name = "iggy_connector_sdk" version = "0.3.0" @@ -7190,7 +7275,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -7847,6 +7932,17 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "maybe-rayon" version = "0.1.1" @@ -7867,6 +7963,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.8.0" @@ -7967,6 +8069,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minidom" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e394a0e3c7ccc2daea3dffabe82f09857b6b510cb25af87d54bf3e910ac1642d" +dependencies = [ + "rxml", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -8274,7 +8385,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -8795,7 +8906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967" dependencies = [ "libc", - "windows-sys 0.45.0", + "windows-sys 0.61.2", ] [[package]] @@ -9638,9 +9749,9 @@ dependencies = [ [[package]] name = "psm" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3852766467df634d74f0b2d7819bf8dc483a0eb2e3b0f50f756f9cfe8b0d18d8" +checksum = "645dbe486e346d9b5de3ef16ede18c26e6c70ad97418f4874b8b1889d6e761ea" dependencies = [ "ar_archive_writer", "cc", @@ -9751,7 +9862,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -9791,7 +9902,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.3", "tracing", "windows-sys 0.60.2", ] @@ -10542,6 +10653,41 @@ dependencies = [ "ordered-multimap", ] +[[package]] +name = "rust-s3" +version = "0.37.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4af74047374528b627109d579ce86b23ccf6ffba7ff363c807126c1aff69e1bb" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64 0.22.1", + "bytes", + "cfg-if", + "futures-util", + "hex", + "hmac 0.12.1", + "http 1.4.0", + "log", + "maybe-async", + "md5", + "minidom", + "percent-encoding", + "quick-xml 0.38.4", + "reqwest 0.12.28", + "serde", + "serde_derive", + "serde_json", + "sha2 0.10.9", + "sysinfo 0.37.2", + "thiserror 2.0.18", + "time", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rust_decimal" version = "1.41.0" @@ -10616,7 +10762,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -10684,7 +10830,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -10729,6 +10875,25 @@ dependencies = [ "unicode-script", ] +[[package]] +name = "rxml" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc94b580d0f5a6b7a2d604e597513d3c673154b52ddeccd1d5c32360d945ee" +dependencies = [ + "bytes", + "rxml_validation", +] + +[[package]] +name = "rxml_validation" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826e80413b9a35e9d33217b3dcac04cf95f6559d15944b93887a08be5496c4a4" +dependencies = [ + "compact_str", +] + [[package]] name = "ryu" version = "1.0.23" @@ -10889,7 +11054,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -11560,7 +11725,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -11838,15 +12003,15 @@ checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" [[package]] name = "stacker" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d74a23609d509411d10e2176dc2a4346e3b4aea2e7b1869f19fdedbc71c013" +checksum = "640c8cdd92b6b12f5bcb1803ca3bbf5ab96e5e6b6b96b9ab77dabe9e880b3190" dependencies = [ "cc", "cfg-if", "libc", "psm", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -12181,7 +12346,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -12191,7 +12356,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" dependencies = [ "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -13726,7 +13891,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6589c4b2ea..ab0333d8c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ members = [ "core/connectors/sinks/mongodb_sink", "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", + "core/connectors/sinks/s3_sink", "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/influxdb_source", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 7c6415ac55..615c514453 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -90,19 +90,22 @@ atoi: 2.0.0, "MIT", atomic: 0.6.1, "Apache-2.0 OR MIT", atomic-polyfill: 1.0.3, "Apache-2.0 OR MIT", atomic-waker: 1.1.2, "Apache-2.0 OR MIT", +attohttpc: 0.30.1, "MPL-2.0", autocfg: 1.5.0, "Apache-2.0 OR MIT", autotools: 0.2.7, "MIT", av-scenechange: 0.14.1, "MIT", av1-grain: 0.2.5, "BSD-2-Clause", avif-serialize: 0.8.8, "BSD-3-Clause", -aws-config: 1.8.15, "Apache-2.0", +aws-config: 1.8.16, "Apache-2.0", aws-credential-types: 1.2.14, "Apache-2.0", +aws-creds: 0.39.1, "MIT", aws-lc-rs: 1.16.3, "(Apache-2.0 OR ISC) AND ISC", aws-lc-sys: 0.40.0, "(Apache-2.0 OR ISC OR MIT) AND (Apache-2.0 OR ISC OR MIT-0) AND (Apache-2.0 OR ISC) AND Apache-2.0 AND BSD-3-Clause AND ISC AND MIT", +aws-region: 0.28.1, "MIT", aws-runtime: 1.7.3, "Apache-2.0", aws-sdk-dynamodb: 1.111.0, "Apache-2.0", -aws-sdk-sso: 1.96.0, "Apache-2.0", -aws-sdk-ssooidc: 1.98.0, "Apache-2.0", +aws-sdk-sso: 1.98.0, "Apache-2.0", +aws-sdk-ssooidc: 1.100.0, "Apache-2.0", aws-sdk-sts: 1.103.0, "Apache-2.0", aws-sigv4: 1.4.3, "Apache-2.0", aws-smithy-async: 1.2.14, "Apache-2.0", @@ -185,6 +188,7 @@ capacity_builder: 0.5.0, "MIT", capacity_builder_macros: 0.3.0, "MIT", cargo-platform: 0.3.2, "Apache-2.0 OR MIT", cargo_metadata: 0.23.1, "MIT", +castaway: 0.2.4, "MIT", cc: 1.2.60, "Apache-2.0 OR MIT", cesu8: 1.1.0, "Apache-2.0 OR MIT", cexpr: 0.6.0, "Apache-2.0 OR MIT", @@ -211,6 +215,7 @@ colorchoice: 1.0.5, "Apache-2.0 OR MIT", colored: 3.1.1, "MPL-2.0", combine: 4.6.7, "MIT", comfy-table: 7.2.2, "MIT", +compact_str: 0.7.1, "MIT", compio: 0.18.0, "MIT", compio-buf: 0.8.1, "MIT", compio-driver: 0.11.4, "MIT", @@ -297,10 +302,10 @@ dbus-secret-service: 4.1.0, "Apache-2.0 OR MIT", deadpool: 0.12.3, "Apache-2.0 OR MIT", deadpool-runtime: 0.1.4, "Apache-2.0 OR MIT", debugid: 0.8.0, "Apache-2.0", -deltalake: 0.32.0, "Apache-2.0", +deltalake: 0.32.1, "Apache-2.0", deltalake-aws: 0.15.0, "Apache-2.0", deltalake-azure: 0.15.0, "Apache-2.0", -deltalake-core: 0.32.0, "Apache-2.0", +deltalake-core: 0.32.1, "Apache-2.0", deltalake-derive: 1.0.0, "Apache-2.0", deltalake-gcp: 0.16.0, "Apache-2.0", deno_core: 0.351.0, "MIT", @@ -543,6 +548,7 @@ iggy_connector_postgres_sink: 0.4.0, "Apache-2.0", iggy_connector_postgres_source: 0.4.0, "Apache-2.0", iggy_connector_quickwit_sink: 0.4.0, "Apache-2.0", iggy_connector_random_source: 0.4.0, "Apache-2.0", +iggy_connector_s3_sink: 0.4.0, "Apache-2.0", iggy_connector_sdk: 0.3.0, "Apache-2.0", iggy_connector_stdout_sink: 0.4.0, "Apache-2.0", iggy_examples: 0.0.6, "Apache-2.0", @@ -643,8 +649,10 @@ macro_rules_attribute: 0.1.3, "MIT", macro_rules_attribute-proc_macro: 0.1.3, "MIT", matchers: 0.2.0, "MIT", matchit: 0.8.4, "BSD-3-Clause AND MIT", +maybe-async: 0.2.10, "MIT", maybe-rayon: 0.1.1, "MIT", md-5: 0.10.6, "Apache-2.0 OR MIT", +md5: 0.8.0, "Apache-2.0 OR MIT", memchr: 2.8.0, "MIT OR Unlicense", memmap2: 0.9.10, "Apache-2.0 OR MIT", message_bus: 0.1.0, "Apache-2.0", @@ -654,6 +662,7 @@ miette-derive: 7.6.0, "Apache-2.0", mimalloc: 0.1.48, "MIT", mime: 0.3.17, "Apache-2.0 OR MIT", mime_guess: 2.0.5, "MIT", +minidom: 0.16.0, "MPL-2.0", minimal-lexical: 0.2.1, "Apache-2.0 OR MIT", miniz_oxide: 0.8.9, "Apache-2.0 OR MIT OR Zlib", mio: 1.2.0, "MIT", @@ -804,7 +813,7 @@ prost-reflect: 0.16.3, "Apache-2.0 OR MIT", prost-types: 0.14.3, "Apache-2.0", protox: 0.9.1, "Apache-2.0 OR MIT", protox-parse: 0.9.0, "Apache-2.0 OR MIT", -psm: 0.1.30, "Apache-2.0 OR MIT", +psm: 0.1.31, "Apache-2.0 OR MIT", ptr_meta: 0.1.4, "MIT", ptr_meta_derive: 0.1.4, "MIT", pxfm: 0.1.28, "Apache-2.0 OR BSD-3-Clause", @@ -877,6 +886,7 @@ rust-embed: 8.11.0, "MIT", rust-embed-impl: 8.11.0, "MIT", rust-embed-utils: 8.11.0, "MIT", rust-ini: 0.21.3, "MIT", +rust-s3: 0.37.1, "MIT", rust_decimal: 1.41.0, "MIT", rustc-hash: 2.1.2, "Apache-2.0 OR MIT", rustc_version: 0.4.1, "Apache-2.0 OR MIT", @@ -893,6 +903,8 @@ rustls-platform-verifier-android: 0.1.1, "Apache-2.0 OR MIT", rustls-webpki: 0.103.13, "ISC", rustversion: 1.0.22, "Apache-2.0 OR MIT", rustybuzz: 0.20.1, "MIT", +rxml: 0.11.1, "MIT", +rxml_validation: 0.11.0, "MIT", ryu: 1.0.23, "Apache-2.0 OR BSL-1.0", same-file: 1.0.6, "MIT OR Unlicense", scc: 2.4.0, "Apache-2.0", @@ -976,7 +988,7 @@ sqlx-postgres: 0.8.6, "Apache-2.0 OR MIT", sqlx-sqlite: 0.8.6, "Apache-2.0 OR MIT", sse-stream: 0.2.1, "Apache-2.0 OR MIT", stable_deref_trait: 1.2.1, "Apache-2.0 OR MIT", -stacker: 0.1.23, "Apache-2.0 OR MIT", +stacker: 0.1.24, "Apache-2.0 OR MIT", static-toml: 1.3.0, "MIT", static_assertions: 1.1.0, "Apache-2.0 OR MIT", strict-num: 0.1.1, "MIT", diff --git a/core/connectors/README.md b/core/connectors/README.md index cdef67789a..f9c724349c 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -82,6 +82,7 @@ Each sink should have its own, custom configuration, which is passed along with - **Iceberg Sink** - writes data to Apache Iceberg tables via REST catalog - **PostgreSQL Sink** - stores messages in PostgreSQL database tables - **Quickwit Sink** - indexes messages in Quickwit search engine +- **S3 Sink** - writes messages to Amazon S3 and S3-compatible stores (MinIO, R2, B2, DO Spaces) - **Stdout Sink** - prints messages to standard output (useful for debugging/development) ## Source diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md index 55c639c336..571dcb2068 100644 --- a/core/connectors/sinks/README.md +++ b/core/connectors/sinks/README.md @@ -12,6 +12,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s | **iceberg_sink** | Writes data to Apache Iceberg tables via REST catalog with S3/GCS/Azure storage | | **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas | | **quickwit_sink** | Indexes messages in Quickwit search engine for log analytics | +| **s3_sink** | Writes messages to Amazon S3 and S3-compatible stores (MinIO, R2, B2, DO Spaces) | | **stdout_sink** | Prints messages to standard output (useful for debugging and development) | The sink is represented by the single `Sink` trait, which defines the basic interface for all sink connectors. It provides methods for initializing the sink, writing data to external destination, and closing the sink. diff --git a/core/connectors/sinks/s3_sink/Cargo.toml b/core/connectors/sinks/s3_sink/Cargo.toml new file mode 100644 index 0000000000..327bd5a549 --- /dev/null +++ b/core/connectors/sinks/s3_sink/Cargo.toml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_s3_sink" +version = "0.4.0" +description = "Iggy S3 sink connector for writing stream messages to Amazon S3 and S3-compatible stores" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "s3", "sink"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell", "simd-json"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +byte-unit = { workspace = true } +chrono = { workspace = true } +dashmap = { workspace = true } +humantime = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +rust-s3 = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/core/connectors/sinks/s3_sink/README.md b/core/connectors/sinks/s3_sink/README.md new file mode 100644 index 0000000000..f58d2f80ec --- /dev/null +++ b/core/connectors/sinks/s3_sink/README.md @@ -0,0 +1,147 @@ +# Apache Iggy S3 Sink Connector + +Writes messages from Iggy streams to Amazon S3 and S3-compatible object stores (MinIO, Cloudflare R2, DigitalOcean Spaces, Backblaze B2). + +## Features + +- Buffered uploads with configurable file rotation (by size or message count) +- Multiple output formats: JSON Lines, JSON Array, Raw +- Configurable path templates with variables for stream, topic, date, hour, partition +- Deterministic S3 keys based on offset ranges for idempotent crash recovery +- Optional metadata and header inclusion in output +- Support for custom endpoints (MinIO, R2) and path-style addressing +- Retry with exponential backoff on upload failures + +## Configuration + +### Connector Runtime Config + +```toml +type = "sink" +key = "s3" +enabled = true +version = 0 +name = "S3 sink" +path = "../../target/release/libiggy_connector_s3_sink" +verbose = false + +[[streams]] +stream = "application_logs" +topics = ["api_requests", "errors"] +schema = "json" +batch_length = 1000 +poll_interval = "100ms" +consumer_group = "s3_sink" +``` + +### Plugin Configuration + +```toml +[plugin_config] +bucket = "my-data-lake" +prefix = "iggy/raw" +region = "us-east-1" +# endpoint = "http://localhost:9000" # for MinIO / S3-compatible stores +# access_key_id = "AKIA..." # omit to use env vars / instance profile +# secret_access_key = "..." # omit to use env vars / instance profile +path_template = "{stream}/{topic}/{date}/{hour}" +file_rotation = "size" +max_file_size = "8MiB" +output_format = "json_lines" +include_metadata = true +include_headers = true +max_retries = 3 +retry_delay = "1s" +``` + +### Options Reference + +| Option | Type | Default | Description | +| ------ | ---- | ------- | ----------- | +| `bucket` | String | **required** | S3 bucket name | +| `region` | String | **required** | AWS region (e.g. `us-east-1`) | +| `prefix` | String | `None` | Key prefix prepended to all objects | +| `endpoint` | String | `None` | Custom S3 endpoint for MinIO, R2, etc. | +| `access_key_id` | String | `None` | AWS access key; omit for env/instance profile | +| `secret_access_key` | String | `None` | AWS secret key; omit for env/instance profile | +| `path_template` | String | `{stream}/{topic}/{date}/{hour}` | Template for S3 key directory structure | +| `file_rotation` | String | `size` | Rotation strategy: `size` or `messages` | +| `max_file_size` | String | `8MiB` | Max file size before rotation (when `file_rotation = "size"`) | +| `max_messages_per_file` | Integer | `None` | Max messages per file (when `file_rotation = "messages"`) | +| `output_format` | String | `json_lines` | Output format: `json_lines`, `json_array`, or `raw` | +| `include_metadata` | Boolean | `true` | Include stream/topic/partition/offset in output | +| `include_headers` | Boolean | `false` | Include message headers in output | +| `max_retries` | Integer | `3` | Max upload retry attempts | +| `retry_delay` | String | `1s` | Base delay between retries (humantime format) | +| `path_style` | Boolean | auto | Force path-style S3 addressing; auto-enabled when `endpoint` is set | + +### Path Template Variables + +| Variable | Description | Example | +| -------- | ----------- | ------- | +| `{stream}` | Iggy stream name | `application_logs` | +| `{topic}` | Iggy topic name | `api_requests` | +| `{partition}` | Partition ID | `1` | +| `{date}` | UTC date from first message | `2026-03-16` | +| `{hour}` | UTC hour from first message | `14` | +| `{timestamp}` | Current epoch milliseconds | `1710597600000` | + +### Credentials + +Credentials can be provided in three ways (in order of precedence): + +1. **Explicit config**: Set both `access_key_id` and `secret_access_key` +2. **Environment variables**: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN` +3. **Instance profile / IAM role**: Automatic when running on EC2/ECS/EKS + +Both `access_key_id` and `secret_access_key` must be provided together or both omitted. + +## Output Example + +With `output_format = "json_lines"` and `include_metadata = true`, writing `api_requests` messages produces: + +```text +s3://my-data-lake/iggy/raw/application_logs/api_requests/2026-03-16/14/000000-000999.jsonl +``` + +Each line: + +```json +{"offset":42,"timestamp":"2026-03-16T14:02:31Z","stream":"application_logs","topic":"api_requests","partition_id":1,"payload":{"method":"GET","path":"/api/users","status":200}} +``` + +## S3-Compatible Stores + +### MinIO + +```toml +[plugin_config] +bucket = "my-bucket" +region = "us-east-1" +endpoint = "http://localhost:9000" +access_key_id = "minioadmin" +secret_access_key = "minioadmin" +``` + +### Cloudflare R2 + +```toml +[plugin_config] +bucket = "my-bucket" +region = "auto" +endpoint = "https://.r2.cloudflarestorage.com" +access_key_id = "..." +secret_access_key = "..." +``` + +## Data Delivery Guarantees + +This connector provides **at-least-once** delivery under normal operation. However, **data loss can occur** if all upload retries are exhausted (controlled by `max_retries`). When an upload fails after all retry attempts, the affected messages are dropped and an error is logged. Monitor your connector logs for `failed to upload` errors in production. Increase `max_retries` and `retry_delay` if transient S3 failures are common in your environment. + +## Building + +```bash +cargo build --release -p iggy_connector_s3_sink +``` + +The compiled plugin will be at `target/release/libiggy_connector_s3_sink.{so,dylib,dll}`. diff --git a/core/connectors/sinks/s3_sink/config.toml b/core/connectors/sinks/s3_sink/config.toml new file mode 100644 index 0000000000..bba894c96a --- /dev/null +++ b/core/connectors/sinks/s3_sink/config.toml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "s3" +enabled = true +version = 0 +name = "S3 sink" +path = "../../target/release/libiggy_connector_s3_sink" +verbose = false + +[[streams]] +stream = "application_logs" +topics = ["api_requests", "errors"] +schema = "json" +batch_length = 1000 +poll_interval = "100ms" +consumer_group = "s3_sink" + +[plugin_config] +bucket = "my-data-lake" +prefix = "iggy/raw" +region = "us-east-1" +# endpoint = "http://localhost:9000" # uncomment for MinIO / S3-compatible stores +# access_key_id = "minioadmin" # omit to use env vars / instance profile +# secret_access_key = "minioadmin" # omit to use env vars / instance profile +path_template = "{stream}/{topic}/{date}/{hour}" +file_rotation = "size" +max_file_size = "8MiB" +# max_messages_per_file = 10000 # used when file_rotation = "messages" +output_format = "json_lines" +include_metadata = true +include_headers = true +max_retries = 3 +retry_delay = "1s" +# path_style = true # auto-enabled when endpoint is set (for MinIO) diff --git a/core/connectors/sinks/s3_sink/src/buffer.rs b/core/connectors/sinks/s3_sink/src/buffer.rs new file mode 100644 index 0000000000..ef2b10a07a --- /dev/null +++ b/core/connectors/sinks/s3_sink/src/buffer.rs @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::FileRotation; + +#[derive(Debug)] +pub struct FileBuffer { + entries: Vec>, + total_size: u64, + message_count: u64, + first_offset: Option, + last_offset: Option, + first_timestamp_micros: u64, +} + +impl FileBuffer { + pub fn new() -> Self { + FileBuffer { + entries: Vec::new(), + total_size: 0, + message_count: 0, + first_offset: None, + last_offset: None, + first_timestamp_micros: 0, + } + } + + pub fn append(&mut self, data: Vec, offset: u64, timestamp_micros: u64) { + self.total_size += data.len() as u64; + self.entries.push(data); + self.message_count += 1; + + if self.first_offset.is_none() { + self.first_offset = Some(offset); + self.first_timestamp_micros = timestamp_micros; + } + self.last_offset = Some(offset); + } + + pub fn should_rotate(&self, rotation: FileRotation, max_size: u64, max_messages: u64) -> bool { + match rotation { + FileRotation::Size => self.total_size >= max_size, + FileRotation::Messages => self.message_count >= max_messages, + } + } + + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + pub fn entries(&self) -> &[Vec] { + &self.entries + } + + pub fn first_offset(&self) -> u64 { + self.first_offset.unwrap_or(0) + } + + pub fn last_offset(&self) -> u64 { + self.last_offset.unwrap_or(0) + } + + pub fn first_timestamp_micros(&self) -> u64 { + self.first_timestamp_micros + } + + pub fn message_count(&self) -> u64 { + self.message_count + } + + pub fn reset(&mut self) { + self.entries.clear(); + self.total_size = 0; + self.message_count = 0; + self.first_offset = None; + self.last_offset = None; + self.first_timestamp_micros = 0; + } +} + +impl Default for FileBuffer { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn new_buffer_is_empty() { + let buf = FileBuffer::new(); + assert!(buf.is_empty()); + assert_eq!(buf.message_count(), 0); + assert_eq!(buf.first_offset(), 0); + assert_eq!(buf.last_offset(), 0); + } + + #[test] + fn append_tracks_offsets() { + let mut buf = FileBuffer::new(); + buf.append(vec![1, 2, 3], 10, 1000); + buf.append(vec![4, 5], 11, 1001); + buf.append(vec![6], 12, 1002); + + assert!(!buf.is_empty()); + assert_eq!(buf.message_count(), 3); + assert_eq!(buf.first_offset(), 10); + assert_eq!(buf.last_offset(), 12); + assert_eq!(buf.first_timestamp_micros(), 1000); + assert_eq!(buf.entries().len(), 3); + } + + #[test] + fn rotation_by_size() { + let mut buf = FileBuffer::new(); + buf.append(vec![0; 500], 0, 100); + assert!(!buf.should_rotate(FileRotation::Size, 1000, 0)); + + buf.append(vec![0; 500], 1, 200); + assert!(buf.should_rotate(FileRotation::Size, 1000, 0)); + + buf.append(vec![0; 100], 2, 300); + assert!(buf.should_rotate(FileRotation::Size, 1000, 0)); + } + + #[test] + fn rotation_by_messages() { + let mut buf = FileBuffer::new(); + buf.append(vec![1], 0, 100); + buf.append(vec![2], 1, 200); + assert!(!buf.should_rotate(FileRotation::Messages, 0, 3)); + + buf.append(vec![3], 2, 300); + assert!(buf.should_rotate(FileRotation::Messages, 0, 3)); + } + + #[test] + fn reset_clears_state() { + let mut buf = FileBuffer::new(); + buf.append(vec![1, 2, 3], 5, 1000); + buf.append(vec![4, 5, 6], 6, 2000); + + buf.reset(); + + assert!(buf.is_empty()); + assert_eq!(buf.message_count(), 0); + assert_eq!(buf.first_offset(), 0); + assert_eq!(buf.last_offset(), 0); + } +} diff --git a/core/connectors/sinks/s3_sink/src/client.rs b/core/connectors/sinks/s3_sink/src/client.rs new file mode 100644 index 0000000000..52d8619cc3 --- /dev/null +++ b/core/connectors/sinks/s3_sink/src/client.rs @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::S3SinkConfig; +use iggy_connector_sdk::Error; +use s3::creds::Credentials; +use s3::{Bucket, Region}; +use tracing::info; + +fn validate_credential_pair(config: &S3SinkConfig) -> Result<(), Error> { + if config.access_key_id.is_some() != config.secret_access_key.is_some() { + return Err(Error::InvalidConfigValue( + "Partially configured credentials. You must provide both access_key_id \ + and secret_access_key, or omit both." + .to_owned(), + )); + } + Ok(()) +} + +pub async fn create_bucket(config: &S3SinkConfig) -> Result, Error> { + validate_credential_pair(config)?; + + let credentials = match (&config.access_key_id, &config.secret_access_key) { + (Some(key), Some(secret)) => { + let redacted_key = key.chars().take(3).collect::(); + info!("Using explicit S3 credentials (access key: {redacted_key}***)"); + Credentials::new(Some(key), Some(secret), None, None, None) + .map_err(|e| Error::InitError(format!("Failed to create S3 credentials: {e}")))? + } + _ => { + info!( + "No explicit credentials provided, using default credential chain (env vars / instance profile)" + ); + Credentials::default().map_err(|e| { + Error::InitError(format!("Failed to load default S3 credentials: {e}")) + })? + } + }; + + let region = match &config.endpoint { + Some(endpoint) => { + info!("Using custom S3 endpoint: {endpoint}"); + Region::Custom { + region: config.region.clone(), + endpoint: endpoint.clone(), + } + } + None => config.region.parse::().map_err(|e| { + Error::InvalidConfigValue(format!("Invalid S3 region '{}': {e}", config.region)) + })?, + }; + + let mut bucket = Bucket::new(&config.bucket, region, credentials) + .map_err(|e| Error::InitError(format!("Failed to create S3 bucket handle: {e}")))?; + + let use_path_style = config.path_style.unwrap_or(config.endpoint.is_some()); + if use_path_style { + bucket.set_path_style(); + } + + Ok(bucket) +} + +pub async fn verify_bucket(bucket: &Bucket) -> Result<(), Error> { + bucket.head_object("/").await.map_err(|e| { + Error::InitError(format!( + "S3 bucket '{}' connectivity check failed: {e}", + bucket.name + )) + })?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + FileRotation, default_max_file_size, default_output_format, default_path_template, + }; + + fn base_config() -> S3SinkConfig { + S3SinkConfig { + bucket: "test".to_string(), + region: "us-east-1".to_string(), + prefix: None, + endpoint: None, + access_key_id: None, + secret_access_key: None, + path_template: default_path_template(), + file_rotation: FileRotation::Size, + max_file_size: default_max_file_size(), + max_messages_per_file: None, + output_format: default_output_format(), + include_metadata: true, + include_headers: false, + max_retries: None, + retry_delay: None, + path_style: None, + } + } + + #[test] + fn validate_both_credentials_present() { + let config = S3SinkConfig { + access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()), + secret_access_key: Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string()), + ..base_config() + }; + assert!(validate_credential_pair(&config).is_ok()); + } + + #[test] + fn validate_no_credentials() { + let config = base_config(); + assert!(validate_credential_pair(&config).is_ok()); + } + + #[test] + fn validate_partial_access_key_only() { + let config = S3SinkConfig { + access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()), + secret_access_key: None, + ..base_config() + }; + assert!(validate_credential_pair(&config).is_err()); + } + + #[test] + fn validate_partial_secret_key_only() { + let config = S3SinkConfig { + access_key_id: None, + secret_access_key: Some("secret".to_string()), + ..base_config() + }; + assert!(validate_credential_pair(&config).is_err()); + } +} diff --git a/core/connectors/sinks/s3_sink/src/formatter.rs b/core/connectors/sinks/s3_sink/src/formatter.rs new file mode 100644 index 0000000000..944bf653c6 --- /dev/null +++ b/core/connectors/sinks/s3_sink/src/formatter.rs @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::OutputFormat; +use chrono::{DateTime, Utc}; +use iggy_connector_sdk::{ConsumedMessage, MessagesMetadata, Payload, TopicMetadata}; +use serde_json::{Map, Value}; + +pub fn format_message( + message: &ConsumedMessage, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + include_metadata: bool, + include_headers: bool, + format: OutputFormat, +) -> Vec { + match format { + OutputFormat::JsonLines | OutputFormat::JsonArray => format_json_message( + message, + topic_metadata, + messages_metadata, + include_metadata, + include_headers, + ), + OutputFormat::Raw => format_raw_message(message), + } +} + +fn format_json_message( + message: &ConsumedMessage, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + include_metadata: bool, + include_headers: bool, +) -> Vec { + let mut obj = Map::new(); + + if include_metadata { + obj.insert("offset".to_string(), Value::Number(message.offset.into())); + let ts = timestamp_to_rfc3339(message.timestamp); + obj.insert("timestamp".to_string(), Value::String(ts)); + obj.insert( + "stream".to_string(), + Value::String(topic_metadata.stream.clone()), + ); + obj.insert( + "topic".to_string(), + Value::String(topic_metadata.topic.clone()), + ); + obj.insert( + "partition_id".to_string(), + Value::Number(messages_metadata.partition_id.into()), + ); + } + + if include_headers && let Some(headers) = &message.headers { + let mut headers_obj = Map::new(); + for (key, value) in headers { + headers_obj.insert(key.to_string(), Value::String(value.to_string())); + } + obj.insert("headers".to_string(), Value::Object(headers_obj)); + } + + let payload_value = payload_to_json_value(&message.payload); + obj.insert("payload".to_string(), payload_value); + + match serde_json::to_vec(&Value::Object(obj)) { + Ok(bytes) => bytes, + Err(e) => { + tracing::warn!( + "Failed to serialize message at offset {}: {e}", + message.offset + ); + Vec::new() + } + } +} + +fn format_raw_message(message: &ConsumedMessage) -> Vec { + message.payload.try_to_bytes().unwrap_or_default() +} + +fn payload_to_json_value(payload: &Payload) -> Value { + match payload { + Payload::Json(value) => { + let bytes = simd_json::to_vec(value).unwrap_or_default(); + serde_json::from_slice(&bytes).unwrap_or(Value::Null) + } + Payload::Text(text) => Value::String(text.clone()), + Payload::Raw(bytes) => match serde_json::from_slice(bytes) { + Ok(v) => v, + Err(_) => Value::String(base64_encode(bytes)), + }, + Payload::Proto(text) => Value::String(text.clone()), + Payload::FlatBuffer(bytes) => Value::String(base64_encode(bytes)), + } +} + +fn base64_encode(bytes: &[u8]) -> String { + use base64::Engine; + base64::engine::general_purpose::STANDARD.encode(bytes) +} + +fn timestamp_to_rfc3339(micros: u64) -> String { + let secs = (micros / 1_000_000) as i64; + let nanos = ((micros % 1_000_000) * 1_000) as u32; + DateTime::::from_timestamp(secs, nanos) + .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)) + .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()) +} + +pub fn finalize_buffer(data: &[Vec], format: OutputFormat) -> Vec { + match format { + OutputFormat::JsonLines => { + let mut result = Vec::new(); + for entry in data { + result.extend_from_slice(entry); + result.push(b'\n'); + } + result + } + OutputFormat::JsonArray => { + let entries: Vec = data + .iter() + .filter_map(|bytes| serde_json::from_slice(bytes).ok()) + .collect(); + serde_json::to_vec(&entries).unwrap_or_default() + } + OutputFormat::Raw => { + let mut result = Vec::new(); + for entry in data { + result.extend_from_slice(entry); + result.push(b'\n'); + } + result + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use iggy_connector_sdk::Schema; + use std::collections::BTreeMap; + + fn make_json_payload(json_str: &str) -> Payload { + let mut bytes = json_str.as_bytes().to_vec(); + let value = simd_json::to_owned_value(&mut bytes).unwrap(); + Payload::Json(value) + } + + fn make_message(offset: u64, payload: Payload) -> ConsumedMessage { + ConsumedMessage { + id: 1, + offset, + checksum: 12345, + timestamp: 1_710_597_751_000_000, + origin_timestamp: 1_710_597_751_000_000, + headers: None, + payload, + } + } + + fn make_topic_metadata() -> TopicMetadata { + TopicMetadata { + stream: "app_logs".to_string(), + topic: "api_requests".to_string(), + } + } + + fn make_messages_metadata() -> MessagesMetadata { + MessagesMetadata { + partition_id: 1, + current_offset: 42, + schema: Schema::Json, + } + } + + #[test] + fn json_lines_with_metadata() { + let payload = make_json_payload(r#"{"method":"GET","status":200}"#); + let msg = make_message(42, payload); + let topic = make_topic_metadata(); + let meta = make_messages_metadata(); + + let bytes = format_message(&msg, &topic, &meta, true, false, OutputFormat::JsonLines); + let value: Value = serde_json::from_slice(&bytes).unwrap(); + + assert_eq!(value["offset"], 42); + assert_eq!(value["stream"], "app_logs"); + assert_eq!(value["topic"], "api_requests"); + assert_eq!(value["partition_id"], 1); + assert_eq!(value["payload"]["method"], "GET"); + assert!(value["timestamp"].is_string()); + } + + #[test] + fn json_lines_without_metadata() { + let payload = make_json_payload(r#"{"key":"value"}"#); + let msg = make_message(10, payload); + let topic = make_topic_metadata(); + let meta = make_messages_metadata(); + + let bytes = format_message(&msg, &topic, &meta, false, false, OutputFormat::JsonLines); + let value: Value = serde_json::from_slice(&bytes).unwrap(); + + assert!(value.get("offset").is_none()); + assert!(value.get("stream").is_none()); + assert_eq!(value["payload"]["key"], "value"); + } + + #[test] + fn json_lines_with_headers() { + let payload = make_json_payload(r#"{"data":1}"#); + let mut msg = make_message(5, payload); + + let mut headers = BTreeMap::new(); + let key = iggy_common::HeaderKey::try_from("content-type").unwrap(); + let value = iggy_common::HeaderValue::try_from("application/json").unwrap(); + headers.insert(key, value); + msg.headers = Some(headers); + + let topic = make_topic_metadata(); + let meta = make_messages_metadata(); + + let bytes = format_message(&msg, &topic, &meta, false, true, OutputFormat::JsonLines); + let value: Value = serde_json::from_slice(&bytes).unwrap(); + + assert!(value["headers"].is_object()); + } + + #[test] + fn raw_format() { + let payload = Payload::Text("hello world".to_string()); + let msg = make_message(1, payload); + let topic = make_topic_metadata(); + let meta = make_messages_metadata(); + + let bytes = format_message(&msg, &topic, &meta, true, false, OutputFormat::Raw); + assert_eq!(bytes, b"hello world"); + } + + #[test] + fn finalize_json_lines() { + let entries = vec![b"{\"a\":1}".to_vec(), b"{\"b\":2}".to_vec()]; + let result = finalize_buffer(&entries, OutputFormat::JsonLines); + assert_eq!(result, b"{\"a\":1}\n{\"b\":2}\n"); + } + + #[test] + fn finalize_json_array() { + let entries = vec![b"{\"a\":1}".to_vec(), b"{\"b\":2}".to_vec()]; + let result = finalize_buffer(&entries, OutputFormat::JsonArray); + let value: Value = serde_json::from_slice(&result).unwrap(); + assert!(value.is_array()); + assert_eq!(value.as_array().unwrap().len(), 2); + } + + #[test] + fn timestamp_conversion() { + let ts = timestamp_to_rfc3339(1_710_597_751_000_000); + assert!(ts.starts_with("2024-03-16T")); + assert!(ts.ends_with('Z')); + } + + #[test] + fn timestamp_zero() { + let ts = timestamp_to_rfc3339(0); + assert_eq!(ts, "1970-01-01T00:00:00Z"); + } +} diff --git a/core/connectors/sinks/s3_sink/src/lib.rs b/core/connectors/sinks/s3_sink/src/lib.rs new file mode 100644 index 0000000000..3177faf898 --- /dev/null +++ b/core/connectors/sinks/s3_sink/src/lib.rs @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_connector_sdk::{Error, sink_connector}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fmt; + +pub mod buffer; +pub mod client; +pub mod formatter; +pub mod path; +pub mod sink; + +sink_connector!(S3Sink); + +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "1s"; +const DEFAULT_MAX_FILE_SIZE: &str = "8MiB"; +const DEFAULT_PATH_TEMPLATE: &str = "{stream}/{topic}/{date}/{hour}"; +const DEFAULT_OUTPUT_FORMAT: &str = "json_lines"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct S3SinkConfig { + pub bucket: String, + pub region: String, + #[serde(default)] + pub prefix: Option, + #[serde(default)] + pub endpoint: Option, + #[serde(default)] + pub access_key_id: Option, + #[serde(default)] + pub secret_access_key: Option, + #[serde(default = "default_path_template")] + pub path_template: String, + #[serde(default = "default_file_rotation")] + pub file_rotation: FileRotation, + #[serde(default = "default_max_file_size")] + pub max_file_size: String, + #[serde(default)] + pub max_messages_per_file: Option, + #[serde(default = "default_output_format")] + pub output_format: String, + #[serde(default = "default_true")] + pub include_metadata: bool, + #[serde(default)] + pub include_headers: bool, + #[serde(default)] + pub max_retries: Option, + #[serde(default)] + pub retry_delay: Option, + #[serde(default)] + pub path_style: Option, +} + +fn default_path_template() -> String { + DEFAULT_PATH_TEMPLATE.to_string() +} + +fn default_file_rotation() -> FileRotation { + FileRotation::Size +} + +fn default_max_file_size() -> String { + DEFAULT_MAX_FILE_SIZE.to_string() +} + +fn default_output_format() -> String { + DEFAULT_OUTPUT_FORMAT.to_string() +} + +fn default_true() -> bool { + true +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum FileRotation { + Size, + Messages, +} + +impl fmt::Display for FileRotation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + FileRotation::Size => write!(f, "size"), + FileRotation::Messages => write!(f, "messages"), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OutputFormat { + JsonLines, + JsonArray, + Raw, +} + +impl TryFrom<&str> for OutputFormat { + type Error = Error; + + fn try_from(s: &str) -> Result { + match s.to_lowercase().as_str() { + "json_lines" | "jsonl" | "jsonlines" => Ok(OutputFormat::JsonLines), + "json_array" | "json" => Ok(OutputFormat::JsonArray), + "raw" => Ok(OutputFormat::Raw), + other => Err(Error::InvalidConfigValue(format!( + "Unknown output format: '{other}'. Expected: json_lines, json_array, or raw" + ))), + } + } +} + +impl OutputFormat { + pub fn file_extension(&self) -> &'static str { + match self { + OutputFormat::JsonLines => "jsonl", + OutputFormat::JsonArray => "json", + OutputFormat::Raw => "bin", + } + } +} + +#[derive(Debug)] +pub struct S3Sink { + id: u32, + config: S3SinkConfig, + bucket: Option>, + buffers: tokio::sync::Mutex>, + max_file_size_bytes: u64, + output_format: OutputFormat, + state: tokio::sync::Mutex, + retry_delay: std::time::Duration, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BufferKey { + pub stream: String, + pub topic: String, + pub partition_id: u32, +} + +#[derive(Debug)] +struct SinkState { + messages_processed: u64, + uploads_completed: u64, + upload_errors: u64, +} + +impl S3Sink { + pub fn new(id: u32, config: S3SinkConfig) -> Self { + S3Sink { + id, + config, + bucket: None, + buffers: tokio::sync::Mutex::new(HashMap::new()), + max_file_size_bytes: 0, + output_format: OutputFormat::JsonLines, + state: tokio::sync::Mutex::new(SinkState { + messages_processed: 0, + uploads_completed: 0, + upload_errors: 0, + }), + retry_delay: std::time::Duration::from_secs(1), + } + } + + pub fn validate_and_parse_config(&mut self) -> Result<(), Error> { + self.output_format = OutputFormat::try_from(self.config.output_format.as_str())?; + self.max_file_size_bytes = parse_file_size(&self.config.max_file_size)?; + + let delay_str = self + .config + .retry_delay + .as_deref() + .unwrap_or(DEFAULT_RETRY_DELAY); + self.retry_delay = humantime::Duration::from_str(delay_str) + .map(|d| d.into()) + .map_err(|e| { + Error::InvalidConfigValue(format!("Invalid retry_delay '{delay_str}': {e}")) + })?; + + if self.config.file_rotation == FileRotation::Messages + && self.config.max_messages_per_file.is_none() + { + return Err(Error::InvalidConfigValue( + "file_rotation is set to 'messages' but max_messages_per_file is not configured" + .to_owned(), + )); + } + + Ok(()) + } + + fn max_retries(&self) -> u32 { + self.config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES) + } +} + +use std::str::FromStr; + +fn parse_file_size(s: &str) -> Result { + byte_unit::Byte::from_str(s) + .map(|b| b.as_u64()) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid file size '{s}': {e}"))) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_file_size_mib() { + assert_eq!(parse_file_size("8MiB").unwrap(), 8 * 1024 * 1024); + } + + #[test] + fn parse_file_size_mb() { + assert_eq!(parse_file_size("10MB").unwrap(), 10_000_000); + } + + #[test] + fn parse_file_size_invalid() { + assert!(parse_file_size("not_a_size").is_err()); + } + + #[test] + fn output_format_json_lines_variants() { + assert_eq!( + OutputFormat::try_from("json_lines").unwrap(), + OutputFormat::JsonLines + ); + assert_eq!( + OutputFormat::try_from("jsonl").unwrap(), + OutputFormat::JsonLines + ); + assert_eq!( + OutputFormat::try_from("JSONLINES").unwrap(), + OutputFormat::JsonLines + ); + } + + #[test] + fn output_format_json_array() { + assert_eq!( + OutputFormat::try_from("json_array").unwrap(), + OutputFormat::JsonArray + ); + assert_eq!( + OutputFormat::try_from("json").unwrap(), + OutputFormat::JsonArray + ); + } + + #[test] + fn output_format_raw() { + assert_eq!(OutputFormat::try_from("raw").unwrap(), OutputFormat::Raw); + } + + #[test] + fn output_format_invalid() { + assert!(OutputFormat::try_from("xml").is_err()); + } + + #[test] + fn file_extensions() { + assert_eq!(OutputFormat::JsonLines.file_extension(), "jsonl"); + assert_eq!(OutputFormat::JsonArray.file_extension(), "json"); + assert_eq!(OutputFormat::Raw.file_extension(), "bin"); + } + + #[test] + fn file_rotation_display() { + assert_eq!(FileRotation::Size.to_string(), "size"); + assert_eq!(FileRotation::Messages.to_string(), "messages"); + } + + #[test] + fn config_deserialization_defaults() { + let json = r#"{"bucket":"test","region":"us-east-1"}"#; + let config: S3SinkConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.bucket, "test"); + assert_eq!(config.region, "us-east-1"); + assert_eq!(config.path_template, DEFAULT_PATH_TEMPLATE); + assert_eq!(config.max_file_size, DEFAULT_MAX_FILE_SIZE); + assert_eq!(config.output_format, DEFAULT_OUTPUT_FORMAT); + assert!(config.include_metadata); + assert!(!config.include_headers); + assert_eq!(config.file_rotation, FileRotation::Size); + assert!(config.prefix.is_none()); + assert!(config.endpoint.is_none()); + assert!(config.access_key_id.is_none()); + assert!(config.secret_access_key.is_none()); + } + + #[test] + fn config_deserialization_full() { + let json = r#"{ + "bucket": "my-bucket", + "region": "eu-west-1", + "prefix": "data/raw", + "endpoint": "http://localhost:9000", + "access_key_id": "AKIA...", + "secret_access_key": "secret", + "path_template": "{stream}/{topic}", + "file_rotation": "messages", + "max_file_size": "16MiB", + "max_messages_per_file": 5000, + "output_format": "json_array", + "include_metadata": false, + "include_headers": true, + "max_retries": 5, + "retry_delay": "2s", + "path_style": true + }"#; + let config: S3SinkConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.bucket, "my-bucket"); + assert_eq!(config.prefix.as_deref(), Some("data/raw")); + assert_eq!(config.endpoint.as_deref(), Some("http://localhost:9000")); + assert_eq!(config.file_rotation, FileRotation::Messages); + assert_eq!(config.max_messages_per_file, Some(5000)); + assert!(!config.include_metadata); + assert!(config.include_headers); + assert_eq!(config.max_retries, Some(5)); + assert_eq!(config.path_style, Some(true)); + } + + #[test] + fn partial_credentials_detected() { + let config_with_key_only = S3SinkConfig { + bucket: "b".to_string(), + region: "us-east-1".to_string(), + prefix: None, + endpoint: None, + access_key_id: Some("key".to_string()), + secret_access_key: None, + path_template: default_path_template(), + file_rotation: FileRotation::Size, + max_file_size: default_max_file_size(), + max_messages_per_file: None, + output_format: default_output_format(), + include_metadata: true, + include_headers: false, + max_retries: None, + retry_delay: None, + path_style: None, + }; + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(client::create_bucket(&config_with_key_only)); + assert!(result.is_err()); + } +} diff --git a/core/connectors/sinks/s3_sink/src/path.rs b/core/connectors/sinks/s3_sink/src/path.rs new file mode 100644 index 0000000000..76cf9022e9 --- /dev/null +++ b/core/connectors/sinks/s3_sink/src/path.rs @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::OutputFormat; +use chrono::{DateTime, Utc}; + +pub struct PathContext<'a> { + pub stream: &'a str, + pub topic: &'a str, + pub partition_id: u32, + pub first_timestamp_micros: u64, +} + +pub fn render_s3_key( + prefix: Option<&str>, + template: &str, + ctx: &PathContext<'_>, + offset_start: u64, + offset_end: u64, + format: OutputFormat, +) -> String { + let rendered = render_template(template, ctx); + + let filename = format!( + "{:06}-{:06}.{}", + offset_start, + offset_end, + format.file_extension() + ); + + match prefix { + Some(p) => { + let p = p.trim_matches('/'); + if p.is_empty() { + format!("{rendered}/{filename}") + } else { + format!("{p}/{rendered}/{filename}") + } + } + None => format!("{rendered}/{filename}"), + } +} + +fn render_template(template: &str, ctx: &PathContext<'_>) -> String { + let dt = timestamp_to_datetime(ctx.first_timestamp_micros); + let date = dt.format("%Y-%m-%d").to_string(); + let hour = dt.format("%H").to_string(); + let now_millis = Utc::now().timestamp_millis().to_string(); + + template + .replace("{stream}", ctx.stream) + .replace("{topic}", ctx.topic) + .replace("{partition}", &ctx.partition_id.to_string()) + .replace("{date}", &date) + .replace("{hour}", &hour) + .replace("{timestamp}", &now_millis) +} + +fn timestamp_to_datetime(micros: u64) -> DateTime { + let secs = (micros / 1_000_000) as i64; + let nanos = ((micros % 1_000_000) * 1_000) as u32; + DateTime::::from_timestamp(secs, nanos).unwrap_or_else(Utc::now) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_ctx() -> PathContext<'static> { + PathContext { + stream: "app_logs", + topic: "api_requests", + partition_id: 1, + first_timestamp_micros: 1_710_597_600_000_000, // 2024-03-16T14:00:00Z + } + } + + #[test] + fn render_default_template() { + let ctx = test_ctx(); + let key = render_s3_key( + Some("iggy/raw"), + "{stream}/{topic}/{date}/{hour}", + &ctx, + 0, + 99, + OutputFormat::JsonLines, + ); + assert_eq!( + key, + "iggy/raw/app_logs/api_requests/2024-03-16/14/000000-000099.jsonl" + ); + } + + #[test] + fn render_with_partition() { + let ctx = test_ctx(); + let key = render_s3_key( + None, + "{stream}/{topic}/{partition}/{date}", + &ctx, + 100, + 199, + OutputFormat::JsonArray, + ); + assert_eq!(key, "app_logs/api_requests/1/2024-03-16/000100-000199.json"); + } + + #[test] + fn render_no_prefix() { + let ctx = test_ctx(); + let key = render_s3_key(None, "{stream}/{topic}", &ctx, 0, 9, OutputFormat::Raw); + assert_eq!(key, "app_logs/api_requests/000000-000009.bin"); + } + + #[test] + fn render_empty_prefix() { + let ctx = test_ctx(); + let key = render_s3_key(Some(""), "{stream}", &ctx, 0, 0, OutputFormat::JsonLines); + assert_eq!(key, "app_logs/000000-000000.jsonl"); + } + + #[test] + fn render_prefix_with_trailing_slash() { + let ctx = test_ctx(); + let key = render_s3_key( + Some("data/"), + "{topic}", + &ctx, + 5, + 10, + OutputFormat::JsonLines, + ); + assert_eq!(key, "data/api_requests/000005-000010.jsonl"); + } + + #[test] + fn timestamp_to_datetime_zero() { + let dt = timestamp_to_datetime(0); + assert_eq!(dt.format("%Y-%m-%d").to_string(), "1970-01-01"); + } + + #[test] + fn timestamp_to_datetime_known() { + let dt = timestamp_to_datetime(1_710_597_600_000_000); + assert_eq!(dt.format("%Y-%m-%dT%H").to_string(), "2024-03-16T14"); + } +} diff --git a/core/connectors/sinks/s3_sink/src/sink.rs b/core/connectors/sinks/s3_sink/src/sink.rs new file mode 100644 index 0000000000..31f620c959 --- /dev/null +++ b/core/connectors/sinks/s3_sink/src/sink.rs @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::buffer::FileBuffer; +use crate::formatter; +use crate::path::{PathContext, render_s3_key}; +use crate::{BufferKey, S3Sink}; +use async_trait::async_trait; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata}; +use tracing::{debug, error, info, warn}; + +#[async_trait] +impl Sink for S3Sink { + async fn open(&mut self) -> Result<(), Error> { + info!("Opening S3 sink connector with ID: {}", self.id); + + self.validate_and_parse_config()?; + + let bucket = crate::client::create_bucket(&self.config).await?; + + info!( + "S3 sink ID: {} connected to bucket '{}' in region '{}'", + self.id, self.config.bucket, self.config.region + ); + + match crate::client::verify_bucket(&bucket).await { + Ok(()) => { + info!( + "S3 sink ID: {} bucket '{}' connectivity verified", + self.id, self.config.bucket + ); + } + Err(e) => { + warn!( + "S3 sink ID: {} bucket verification returned an error (non-fatal, \ + the bucket may still be accessible): {e}", + self.id + ); + } + } + + self.bucket = Some(bucket); + + info!( + "S3 sink ID: {} opened. format={}, rotation={}, max_file_size={}, template='{}'", + self.id, + self.config.output_format, + self.config.file_rotation, + self.config.max_file_size, + self.config.path_template, + ); + + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let bucket = self + .bucket + .as_ref() + .ok_or_else(|| Error::InitError("S3 client not initialized".to_string()))?; + + let key = BufferKey { + stream: topic_metadata.stream.clone(), + topic: topic_metadata.topic.clone(), + partition_id: messages_metadata.partition_id, + }; + + let max_messages = self.config.max_messages_per_file.unwrap_or(u64::MAX); + + let mut buffers = self.buffers.lock().await; + let buffer = buffers.entry(key.clone()).or_insert_with(FileBuffer::new); + + for message in &messages { + let formatted = formatter::format_message( + message, + topic_metadata, + &messages_metadata, + self.config.include_metadata, + self.config.include_headers, + self.output_format, + ); + buffer.append(formatted, message.offset, message.timestamp); + + if buffer.should_rotate( + self.config.file_rotation, + self.max_file_size_bytes, + max_messages, + ) { + self.flush_buffer(bucket, &key, buffer).await; + } + } + + let mut state = self.state.lock().await; + state.messages_processed += messages.len() as u64; + + debug!( + "S3 sink ID: {} processed {} messages for {}/{}/{}", + self.id, + messages.len(), + topic_metadata.stream, + topic_metadata.topic, + messages_metadata.partition_id, + ); + + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + info!("Closing S3 sink connector with ID: {}", self.id); + + if let Some(bucket) = &self.bucket { + let mut buffers = self.buffers.lock().await; + let keys: Vec = buffers.keys().cloned().collect(); + for key in keys { + if let Some(buffer) = buffers.get_mut(&key) + && !buffer.is_empty() + { + self.flush_buffer(bucket, &key, buffer).await; + } + } + } else { + let buffers = self.buffers.lock().await; + let pending: u64 = buffers.values().map(|b| b.message_count()).sum(); + if pending > 0 { + warn!( + "S3 sink ID: {} closing without S3 client — {pending} buffered messages will be lost", + self.id, + ); + } + } + + let state = self.state.lock().await; + info!( + "S3 sink ID: {} closed. messages_processed={}, uploads_completed={}, upload_errors={}", + self.id, state.messages_processed, state.uploads_completed, state.upload_errors, + ); + + Ok(()) + } +} + +impl S3Sink { + async fn flush_buffer(&self, bucket: &s3::Bucket, key: &BufferKey, buffer: &mut FileBuffer) { + if buffer.is_empty() { + return; + } + + let data = formatter::finalize_buffer(buffer.entries(), self.output_format); + + let ctx = PathContext { + stream: &key.stream, + topic: &key.topic, + partition_id: key.partition_id, + first_timestamp_micros: buffer.first_timestamp_micros(), + }; + + let s3_key = render_s3_key( + self.config.prefix.as_deref(), + &self.config.path_template, + &ctx, + buffer.first_offset(), + buffer.last_offset(), + self.output_format, + ); + + let msg_count = buffer.message_count(); + + match self.upload_with_retry(bucket, &s3_key, &data).await { + Ok(()) => { + debug!( + "S3 sink ID: {} uploaded {} ({} messages, {} bytes)", + self.id, + s3_key, + msg_count, + data.len(), + ); + let mut state = self.state.lock().await; + state.uploads_completed += 1; + drop(state); + buffer.reset(); + } + Err(e) => { + error!( + "S3 sink ID: {} failed to upload {} ({} messages lost): {e}", + self.id, s3_key, msg_count + ); + let mut state = self.state.lock().await; + state.upload_errors += 1; + drop(state); + // Reset buffer even on failure to prevent unbounded growth. + // Messages are lost but offsets will be re-delivered by the + // runtime on next poll since consume() returned Ok. + buffer.reset(); + } + } + } + + async fn upload_with_retry( + &self, + bucket: &s3::Bucket, + s3_key: &str, + data: &[u8], + ) -> Result<(), Error> { + let max_retries = self.max_retries(); + let retry_delay = self.retry_delay; + let mut attempts = 0u32; + + loop { + match bucket.put_object(s3_key, data).await { + Ok(response) => { + let status = response.status_code(); + if (200..300).contains(&status) { + return Ok(()); + } + attempts += 1; + if attempts >= max_retries { + return Err(Error::CannotStoreData(format!( + "S3 PutObject returned status {status} after {attempts} attempts for key '{s3_key}'" + ))); + } + warn!( + "S3 sink ID: {} PutObject status {status} (attempt {attempts}/{max_retries}). Retrying...", + self.id + ); + } + Err(e) => { + attempts += 1; + if attempts >= max_retries { + return Err(Error::CannotStoreData(format!( + "S3 PutObject failed after {attempts} attempts for key '{s3_key}': {e}" + ))); + } + warn!( + "S3 sink ID: {} PutObject error (attempt {attempts}/{max_retries}): {e}. Retrying...", + self.id + ); + } + } + tokio::time::sleep(retry_delay * attempts).await; + } + } +}