diff --git a/core/connectors/sinks/iceberg_sink/src/router/dynamic_router.rs b/core/connectors/sinks/iceberg_sink/src/router/dynamic_router.rs index 857a312e85..97df6786c4 100644 --- a/core/connectors/sinks/iceberg_sink/src/router/dynamic_router.rs +++ b/core/connectors/sinks/iceberg_sink/src/router/dynamic_router.rs @@ -23,6 +23,7 @@ use iceberg::Catalog; use iceberg::table::Table; use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload}; use simd_json::base::ValueAsObject; +use simd_json::prelude::ValueAsScalar; use std::collections::HashMap; use tracing::{info, warn}; @@ -86,10 +87,7 @@ impl DynamicRouter { fn extract_route_field(&self, message: &ConsumedMessage) -> Option { match &message.payload { - Payload::Json(payload) => payload - .as_object() - .and_then(|obj| obj.get(&self.route_field)) - .map(|val| val.to_string()), + Payload::Json(payload) => extract_route_field_value(payload, &self.route_field), _ => { warn!("Unsupported format for iceberg connector"); None @@ -98,6 +96,18 @@ impl DynamicRouter { } } +fn extract_route_field_value(payload: &simd_json::OwnedValue, route_field: &str) -> Option { + // Route fields are table identifiers, so only JSON string values are valid here. + // Calling `to_string()` on a JSON value serializes it, turning "tpch.lineitem" + // into a string containing literal quote characters. `as_str()` returns the + // underlying string instead, which can be validated and used as the lookup key. + payload + .as_object() + .and_then(|obj| obj.get(route_field)) + .and_then(|val| val.as_str()) + .map(ToString::to_string) +} + #[async_trait] impl Router for DynamicRouter { async fn route_data( @@ -167,3 +177,43 @@ impl Router for DynamicRouter { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn extract_route_field_value( + payload: &simd_json::OwnedValue, + route_field: &str, + ) -> Option { + // Route values must be JSON strings. Using `as_str()` returns the + // underlying value (for example, `tpch.lineitem`) while `to_string()` + // would serialize the JSON value and include quote characters. + payload + .as_object() + .and_then(|obj| obj.get(route_field)) + .and_then(|val| val.as_str()) + .map(ToString::to_string) + } + + #[test] + fn extracts_string_route_field_without_json_quotes() { + let payload = simd_json::json!({ + "table": "tpch.lineitem" + }); + + assert_eq!( + extract_route_field_value(&payload, "table"), + Some("tpch.lineitem".to_string()) + ); + } + + #[test] + fn ignores_non_string_route_field() { + let payload = simd_json::json!({ + "table": 42 + }); + + assert_eq!(extract_route_field_value(&payload, "table"), None); + } +}