From 43d08eaf6b1359524343cca5478a30ebcdef177d Mon Sep 17 00:00:00 2001 From: ryankert01 Date: Wed, 6 May 2026 01:50:34 +0800 Subject: [PATCH 1/6] feat(connectors): add Apache Doris sink connector (#3112) Sink connector that writes Iggy messages to Apache Doris via the HTTP Stream Load API. v1 scope: JSON payloads only, HTTP Basic auth, pre-created tables only (no DDL). Behaviour: - Manual 307/308 redirect following (capped at 5) so the Authorization header survives the FE -> BE hop, which reqwest strips by default. - Deterministic per-batch label ({prefix}-{stream}-{topic}-{partition}-{first_offset}-{last_offset}) so replays are deduplicated by Doris within label_keep_max_second. - Response body Status field drives error classification: Success and "Label Already Exists" -> Ok; Publish Timeout -> CannotStoreData (transient); Fail or any unknown status -> PermanentHttpError so the runtime DLQs the batch instead of looping. - Optional columns / where / max_filter_ratio / batch_size / timeout forwarded as Stream Load headers. - Password held as secrecy::SecretString; auth header wrapped in SecretString so Debug derivation never leaks the base64 credential. - Client built in open() with InitError on failure; fe_url validated there too so a bad config fails at startup rather than first batch. Tests: 6 integration tests under core/integration/tests/connectors/doris backed by an apache/doris all-in-one testcontainer (FE HTTP + FE MySQL). Coverage includes happy path, 1k-row bulk, max_filter_ratio skip path, label-replay dedupe, missing-target-table (proves no auto-create), and the columns derived-expression header. The container must bind host:8040 1:1 because the FE 307-redirects to 127.0.0.1:8040; tests are serialized via a 'doris' nextest test-group (max-threads = 1) so concurrent test processes don't race for that port. --- .config/nextest.toml | 12 + Cargo.lock | 15 + Cargo.toml | 2 + core/connectors/README.md | 1 + .../example_config/connectors/doris_sink.toml | 43 ++ core/connectors/sinks/README.md | 1 + core/connectors/sinks/doris_sink/Cargo.toml | 43 ++ core/connectors/sinks/doris_sink/README.md | 74 +++ core/connectors/sinks/doris_sink/config.toml | 43 ++ core/connectors/sinks/doris_sink/src/lib.rs | 565 ++++++++++++++++ .../tests/connectors/doris/doris_sink.rs | 498 ++++++++++++++ .../integration/tests/connectors/doris/mod.rs | 20 + .../tests/connectors/doris/sink.toml | 20 + .../connectors/fixtures/doris/container.rs | 611 ++++++++++++++++++ .../tests/connectors/fixtures/doris/mod.rs | 25 + .../tests/connectors/fixtures/mod.rs | 5 + core/integration/tests/connectors/mod.rs | 1 + 17 files changed, 1979 insertions(+) create mode 100644 core/connectors/runtime/example_config/connectors/doris_sink.toml create mode 100644 core/connectors/sinks/doris_sink/Cargo.toml create mode 100644 core/connectors/sinks/doris_sink/README.md create mode 100644 core/connectors/sinks/doris_sink/config.toml create mode 100644 core/connectors/sinks/doris_sink/src/lib.rs create mode 100644 core/integration/tests/connectors/doris/doris_sink.rs create mode 100644 core/integration/tests/connectors/doris/mod.rs create mode 100644 core/integration/tests/connectors/doris/sink.toml create mode 100644 core/integration/tests/connectors/fixtures/doris/container.rs create mode 100644 core/integration/tests/connectors/fixtures/doris/mod.rs diff --git a/.config/nextest.toml b/.config/nextest.toml index fb488b6948..47f6b5723d 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -21,6 +21,18 @@ filter = 'package(integration) and test(cli::system::test_cli_session_scenario::should_be_successful)' threads-required = "num-cpus" +# Doris testcontainer must bind host:8040 1:1 (FE→BE 307 redirects to +# 127.0.0.1:8040; BE's priority_networks pins the advertise address) and the +# all-in-one image is heavy enough that running it alongside other +# container-backed tests (e.g. elasticsearch) starves their startup timers. +# threads-required = "num-cpus" forces each doris test to run alone, matching +# the existing pattern used elsewhere in this file. The longer slow-timeout +# accounts for cold container boot (~60-90s on Linux CI) plus the test body. +[[profile.default.overrides]] +filter = 'package(integration) and test(/connectors::doris::/)' +threads-required = "num-cpus" +slow-timeout = { period = "60s", terminate-after = 8 } + [profile.default] slow-timeout = { period = "30s", terminate-after = 4 } diff --git a/Cargo.lock b/Cargo.lock index 3a48443372..f5080e3de0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6563,6 +6563,21 @@ dependencies = [ "url", ] +[[package]] +name = "iggy_connector_doris_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "base64 0.22.1", + "iggy_connector_sdk", + "reqwest 0.13.3", + "secrecy", + "serde", + "serde_json", + "simd-json", + "tracing", +] + [[package]] name = "iggy_connector_elasticsearch_sink" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 949a498f2a..fe2413af0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "core/connectors/runtime", "core/connectors/sdk", "core/connectors/sinks/delta_sink", + "core/connectors/sinks/doris_sink", "core/connectors/sinks/elasticsearch_sink", "core/connectors/sinks/http_sink", "core/connectors/sinks/iceberg_sink", @@ -268,6 +269,7 @@ socket2 = "0.6.3" sqlx = { version = "0.8.6", features = [ "runtime-tokio-rustls", "postgres", + "mysql", "chrono", "uuid", "json", diff --git a/core/connectors/README.md b/core/connectors/README.md index cdef67789a..c36624b1a8 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -78,6 +78,7 @@ Each sink should have its own, custom configuration, which is passed along with ### Available Sinks +- **Doris Sink** - loads JSON messages into Apache Doris tables via the Stream Load HTTP API - **Elasticsearch Sink** - sends messages to Elasticsearch indices - **Iceberg Sink** - writes data to Apache Iceberg tables via REST catalog - **PostgreSQL Sink** - stores messages in PostgreSQL database tables diff --git a/core/connectors/runtime/example_config/connectors/doris_sink.toml b/core/connectors/runtime/example_config/connectors/doris_sink.toml new file mode 100644 index 0000000000..bbc167397d --- /dev/null +++ b/core/connectors/runtime/example_config/connectors/doris_sink.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "doris" +enabled = true +version = 0 +name = "Doris sink" +path = "/target/release/libiggy_connector_doris_sink" +plugin_config_format = "toml" +verbose = false + +[[streams]] +stream = "events" +topics = ["doris_events"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "doris_sink" + +[plugin_config] +fe_url = "http://localhost:8030" +database = "iggy_demo" +table = "events" +username = "root" +password = "replace_with_secret" +label_prefix = "iggy" +batch_size = 1000 +timeout_secs = 30 diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md index 55c639c336..fc0c83e1db 100644 --- a/core/connectors/sinks/README.md +++ b/core/connectors/sinks/README.md @@ -8,6 +8,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s | Sink | Description | | ---- | ----------- | +| **doris_sink** | Loads JSON messages into Apache Doris tables via the Stream Load HTTP API | | **elasticsearch_sink** | Sends messages to Elasticsearch indices for full-text search and analytics | | **iceberg_sink** | Writes data to Apache Iceberg tables via REST catalog with S3/GCS/Azure storage | | **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas | diff --git a/core/connectors/sinks/doris_sink/Cargo.toml b/core/connectors/sinks/doris_sink/Cargo.toml new file mode 100644 index 0000000000..f2795b7ffe --- /dev/null +++ b/core/connectors/sinks/doris_sink/Cargo.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_doris_sink" +version = "0.1.0" +description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +iggy_connector_sdk = { workspace = true } +reqwest = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tracing = { workspace = true } diff --git a/core/connectors/sinks/doris_sink/README.md b/core/connectors/sinks/doris_sink/README.md new file mode 100644 index 0000000000..d3633337f2 --- /dev/null +++ b/core/connectors/sinks/doris_sink/README.md @@ -0,0 +1,74 @@ +# Apache Doris Sink + +The Doris sink connector consumes JSON messages from Iggy streams and writes them to a pre-created Apache Doris table via Doris's [Stream Load HTTP API](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual). + +## Requirements + +- The target Doris **database and table must be pre-created** before enabling the sink. The connector never issues DDL. +- Messages must arrive with `Payload::Json` (i.e. the configured stream schema is `json`). Other payload types are skipped with a warning. +- The Iggy message JSON shape must match the target table columns. Use the optional `columns` plugin setting if the column order differs from the JSON keys. + +## How it works + +1. For each batch of messages, the connector serializes the JSON payloads into a JSON array. +2. It computes a deterministic Stream Load `label` of the form `{label_prefix}-{stream}-{topic}-{partition}-{first_offset}-{last_offset}`. Doris dedupes loads by label inside its `label_keep_max_second` window, so a replayed batch (after restart, retry, etc.) is silently absorbed instead of producing duplicates. +3. It `PUT`s the batch to `{fe_url}/api/{database}/{table}/_stream_load` with HTTP Basic auth and the headers `format: json`, `strip_outer_array: true`, `label: