Skip to content

Commit ef8d790

Browse files
committed
feat(connectors): fix state & memory leak, test all plugins, enrich sinks
1 parent ff2da26 commit ef8d790

73 files changed

Lines changed: 3456 additions & 383 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 9 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/common/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,8 @@ pub use utils::hash::*;
105105
pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
106106
pub use utils::text;
107107
pub use utils::timestamp::*;
108+
109+
// Re-export chrono types for connectors and other crates that need DateTime
110+
pub use chrono::{DateTime, Duration as ChronoDuration, Utc};
108111
pub use utils::topic_size::MaxTopicSize;
109112
pub use utils::versioning::SemanticVersion;

core/common/src/utils/timestamp.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ impl IggyTimestamp {
6464
self.0.duration_since(UNIX_EPOCH).unwrap().as_micros() as u64
6565
}
6666

67+
pub fn as_millis(&self) -> u64 {
68+
self.0.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
69+
}
70+
71+
pub fn as_nanos(&self) -> u128 {
72+
self.0.duration_since(UNIX_EPOCH).unwrap().as_nanos()
73+
}
74+
6775
pub fn to_rfc3339_string(&self) -> String {
6876
DateTime::<Utc>::from(self.0).to_rfc3339()
6977
}

core/connectors/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,26 @@ Please refer to the **[Sink documentation](https://github.com/apache/iggy/tree/m
5252
When implementing `Sink`, make sure to use the `sink_connector!` macro to expose the FFI interface and allow the connector runtime to register the sink with the runtime. The macro also exports the connector's version (from `Cargo.toml`) which is reported in the runtime's `/stats` endpoint.
5353
Each sink should have its own, custom configuration, which is passed along with the unique plugin ID via expected `new()` method.
5454
55+
### Available Sinks
56+
57+
- **Elasticsearch Sink** - sends messages to Elasticsearch indices
58+
- **Iceberg Sink** - writes data to Apache Iceberg tables via REST catalog
59+
- **PostgreSQL Sink** - stores messages in PostgreSQL database tables
60+
- **Quickwit Sink** - indexes messages in Quickwit search engine
61+
- **Stdout Sink** - prints messages to standard output (useful for debugging/development)
62+
5563
## Source
5664
5765
Sources are responsible for producing the messages to the configured stream(s) and topic(s). For example, the Test source connector will generate the random messages that will be then sent to the configured stream and topic.
5866
5967
Please refer to the **[Source documentation](https://github.com/apache/iggy/tree/master/core/connectors/sources)** for the details about the configuration and the sample implementation.
6068
69+
### Available Sources
70+
71+
- **Elasticsearch Source** - polls documents from Elasticsearch indices
72+
- **PostgreSQL Source** - reads rows from PostgreSQL tables with multiple consumption strategies (delete after read, mark as processed, timestamp tracking)
73+
- **Random Source** - generates random test messages (useful for testing/development)
74+
6175
## Building the connectors
6276
6377
New connector can be built simply by implementing either `Sink` or `Source` trait. Please check the **[sink](https://github.com/apache/iggy/tree/master/core/connectors/sinks)** or **[source](https://github.com/apache/iggy/tree/master/core/connectors/sources)** documentation, as well as the existing examples under `/sinks` and `/sources` directories.

core/connectors/runtime/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ readme = "README.md"
3232
async-trait = { workspace = true }
3333
axum = { workspace = true }
3434
axum-server = { workspace = true }
35-
chrono = { workspace = true }
3635
clap = { workspace = true }
3736
configs = { workspace = true }
3837
configs_derive = { workspace = true }
@@ -47,7 +46,7 @@ flume = { workspace = true }
4746
futures = { workspace = true }
4847
iggy = { workspace = true }
4948
iggy_common = { workspace = true }
50-
iggy_connector_sdk = { workspace = true }
49+
iggy_connector_sdk = { workspace = true, features = ["api"] }
5150
mimalloc = { workspace = true }
5251
once_cell = { workspace = true }
5352
opentelemetry = { workspace = true }

core/connectors/runtime/src/api/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use auth::resolve_api_key;
2323
use axum::{Json, Router, extract::State, middleware, routing::get};
2424
use axum_server::tls_rustls::RustlsConfig;
2525
use config::{HttpConfig, configure_cors};
26+
use iggy_connector_sdk::api::ConnectorRuntimeStats;
2627
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
2728
use tokio::spawn;
2829
use tracing::{error, info};
@@ -126,8 +127,6 @@ async fn get_metrics(State(context): State<Arc<RuntimeContext>>) -> String {
126127
context.metrics.get_formatted_output()
127128
}
128129

129-
async fn get_stats(
130-
State(context): State<Arc<RuntimeContext>>,
131-
) -> Json<stats::ConnectorRuntimeStats> {
130+
async fn get_stats(State(context): State<Arc<RuntimeContext>>) -> Json<ConnectorRuntimeStats> {
132131
Json(stats::get_runtime_stats(&context).await)
133132
}

core/connectors/runtime/src/api/models.rs

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,13 @@
1818
*/
1919

2020
use crate::configs::connectors::{
21-
ConfigFormat, SinkConfig, SourceConfig, StreamConsumerConfig, StreamProducerConfig,
22-
};
23-
use crate::manager::{
24-
sink::SinkInfo,
25-
source::SourceInfo,
26-
status::{ConnectorError, ConnectorStatus},
21+
SinkConfig, SourceConfig, StreamConsumerConfig, StreamProducerConfig,
2722
};
23+
use crate::manager::{sink::SinkInfo, source::SourceInfo};
24+
pub use iggy_connector_sdk::api::{SinkInfoResponse, SourceInfoResponse};
2825
use iggy_connector_sdk::transforms::TransformType;
2926
use serde::{Deserialize, Serialize};
3027

31-
#[derive(Debug, Serialize, Deserialize)]
32-
pub struct SinkInfoResponse {
33-
pub id: u32,
34-
pub key: String,
35-
pub name: String,
36-
pub path: String,
37-
pub enabled: bool,
38-
pub status: ConnectorStatus,
39-
#[serde(skip_serializing_if = "Option::is_none")]
40-
pub last_error: Option<ConnectorError>,
41-
pub plugin_config_format: Option<ConfigFormat>,
42-
}
43-
4428
#[derive(Debug, Serialize, Deserialize)]
4529
pub struct SinkDetailsResponse {
4630
#[serde(flatten)]
@@ -55,19 +39,6 @@ pub struct SinkConfigResponse {
5539
pub active: bool,
5640
}
5741

58-
#[derive(Debug, Serialize, Deserialize)]
59-
pub struct SourceInfoResponse {
60-
pub id: u32,
61-
pub key: String,
62-
pub name: String,
63-
pub path: String,
64-
pub enabled: bool,
65-
pub status: ConnectorStatus,
66-
#[serde(skip_serializing_if = "Option::is_none")]
67-
pub last_error: Option<ConnectorError>,
68-
pub plugin_config_format: Option<ConfigFormat>,
69-
}
70-
7142
#[derive(Debug, Serialize, Deserialize)]
7243
pub struct SourceDetailsResponse {
7344
#[serde(flatten)]
@@ -98,7 +69,7 @@ impl From<SinkInfo> for SinkInfoResponse {
9869
enabled: sink.enabled,
9970
status: sink.status,
10071
last_error: sink.last_error,
101-
plugin_config_format: sink.plugin_config_format,
72+
plugin_config_format: sink.plugin_config_format.map(|f| f.to_string()),
10273
}
10374
}
10475
}
@@ -113,7 +84,7 @@ impl From<SourceInfo> for SourceInfoResponse {
11384
enabled: source.enabled,
11485
status: source.status,
11586
last_error: source.last_error,
116-
plugin_config_format: source.plugin_config_format,
87+
plugin_config_format: source.plugin_config_format.map(|f| f.to_string()),
11788
}
11889
}
11990
}

core/connectors/runtime/src/configs/connectors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use crate::configs::connectors::local_provider::LocalConnectorsConfigProvider;
2525
use crate::configs::runtime::ConnectorsConfig as RuntimeConnectorsConfig;
2626
use crate::error::RuntimeError;
2727
use async_trait::async_trait;
28-
use chrono::{DateTime, Utc};
2928
use configs_derive::ConfigEnv;
29+
use iggy_common::{DateTime, Utc};
3030
use iggy_connector_sdk::Schema;
3131
use iggy_connector_sdk::transforms::TransformType;
3232
use serde::{Deserialize, Serialize};

core/connectors/runtime/src/configs/connectors/local_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ use crate::configs::connectors::{
2424
use crate::error::RuntimeError;
2525
use ::configs::{ConfigProvider, FileConfigProvider, TypedEnvProvider};
2626
use async_trait::async_trait;
27-
use chrono::{DateTime, Utc};
2827
use dashmap::DashMap;
2928
use figment::value::Dict;
3029
use figment::{Metadata, Profile, Provider};
30+
use iggy_common::{DateTime, Utc};
3131
use serde::{Deserialize, Serialize};
3232
use std::collections::HashMap;
3333
use std::path::Path;

core/connectors/runtime/src/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@
1818
*/
1919
use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig, SourceConfig};
2020
use crate::configs::runtime::ConnectorsRuntimeConfig;
21-
use crate::manager::status::ConnectorError;
2221
use crate::metrics::Metrics;
2322
use crate::{
2423
SinkConnectorWrapper, SourceConnectorWrapper,
2524
manager::{
2625
sink::{SinkDetails, SinkInfo, SinkManager},
2726
source::{SourceDetails, SourceInfo, SourceManager},
28-
status::ConnectorStatus,
2927
},
3028
};
3129
use iggy_common::IggyTimestamp;
30+
use iggy_connector_sdk::api::ConnectorError;
31+
use iggy_connector_sdk::api::ConnectorStatus;
3232
use std::collections::HashMap;
3333
use std::sync::Arc;
3434
use tracing::error;

0 commit comments

Comments
 (0)