From 5c7e5001ff136ef526cc3adc86df232e1b3bdbde Mon Sep 17 00:00:00 2001 From: Atharva Lade Date: Wed, 29 Apr 2026 11:47:29 -0500 Subject: [PATCH 1/2] Add unwrap_envelope transform and envelope detection to fix source-sink format mismatch --- .../connectors/sdk/src/transforms/json/mod.rs | 1 + .../src/transforms/json/unwrap_envelope.rs | 213 ++++++++++++++++++ core/connectors/sdk/src/transforms/mod.rs | 8 + .../sdk/src/transforms/unwrap_envelope.rs | 62 +++++ .../sinks/iceberg_sink/src/router/mod.rs | 20 ++ 5 files changed, 304 insertions(+) create mode 100644 core/connectors/sdk/src/transforms/json/unwrap_envelope.rs create mode 100644 core/connectors/sdk/src/transforms/unwrap_envelope.rs diff --git a/core/connectors/sdk/src/transforms/json/mod.rs b/core/connectors/sdk/src/transforms/json/mod.rs index 34a48dd10f..0deca91916 100644 --- a/core/connectors/sdk/src/transforms/json/mod.rs +++ b/core/connectors/sdk/src/transforms/json/mod.rs @@ -25,6 +25,7 @@ use super::ComputedValue; pub mod add_fields; pub mod delete_fields; pub mod filter_fields; +pub mod unwrap_envelope; pub mod update_fields; /// Computes a JSON value based on the specified computed value type diff --git a/core/connectors/sdk/src/transforms/json/unwrap_envelope.rs b/core/connectors/sdk/src/transforms/json/unwrap_envelope.rs new file mode 100644 index 0000000000..6bbdc72f53 --- /dev/null +++ b/core/connectors/sdk/src/transforms/json/unwrap_envelope.rs @@ -0,0 +1,213 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{ + DecodedMessage, Error, Payload, TopicMetadata, transforms::unwrap_envelope::UnwrapEnvelope, +}; +use simd_json::OwnedValue; +use tracing::warn; + +impl UnwrapEnvelope { + pub(crate) fn transform_json( + &self, + _metadata: &TopicMetadata, + mut message: DecodedMessage, + ) -> Result, Error> { + let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload else { + return Ok(Some(message)); + }; + + let Some(inner) = map.remove(self.field.as_str()) else { + warn!( + "unwrap_envelope: field '{}' not found in payload, passing through unchanged", + self.field + ); + return Ok(Some(message)); + }; + + message.payload = Payload::Json(inner); + Ok(Some(message)) + } +} + +#[cfg(test)] +mod tests { + use crate::transforms::Transform; + use crate::transforms::json::test_utils::{ + create_raw_test_message, create_test_message, create_test_topic_metadata, + extract_json_object, + }; + use crate::transforms::unwrap_envelope::{UnwrapEnvelope, UnwrapEnvelopeConfig}; + use crate::{DecodedMessage, Payload}; + use simd_json::OwnedValue; + + #[test] + fn should_extract_data_field_from_database_record_envelope() { + let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig { + field: "data".to_string(), + }); + let msg = create_test_message( + r#"{ + "table_name": "tpch.lineitem", + "operation_type": "SELECT", + "timestamp": "2026-04-24T19:57:00Z", + "data": {"id": 1, "l_orderkey": 123, "l_partkey": 456}, + "old_data": null + }"#, + ); + let result = transform + .transform_json(&create_test_topic_metadata(), msg) + .unwrap() + .unwrap(); + let json_obj = extract_json_object(&result).unwrap(); + assert_eq!(json_obj.len(), 3); + assert!(json_obj.contains_key("id")); + assert!(json_obj.contains_key("l_orderkey")); + assert!(json_obj.contains_key("l_partkey")); + assert!(!json_obj.contains_key("table_name")); + assert!(!json_obj.contains_key("operation_type")); + } + + #[test] + fn should_handle_missing_field_gracefully() { + let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig { + field: "data".to_string(), + }); + let msg = create_test_message(r#"{"table_name": "users", "operation_type": "INSERT"}"#); + let result = transform + .transform_json(&create_test_topic_metadata(), msg) + .unwrap() + .unwrap(); + let json_obj = extract_json_object(&result).unwrap(); + assert_eq!(json_obj.len(), 2); + assert!(json_obj.contains_key("table_name")); + } + + #[test] + fn should_handle_scalar_inner_value() { + let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig { + field: "payload".to_string(), + }); + let msg = create_test_message(r#"{"payload": "just a string", "meta": 1}"#); + let result = transform + .transform_json(&create_test_topic_metadata(), msg) + .unwrap() + .unwrap(); + match &result.payload { + Payload::Json(OwnedValue::String(s)) => assert_eq!(s.as_str(), "just a string"), + other => panic!("Expected Json(String), got {other:?}"), + } + } + + #[test] + fn should_handle_null_inner_value() { + let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig { + field: "old_data".to_string(), + }); + let msg = create_test_message(r#"{"data": {"id": 1}, "old_data": null}"#); + let result = transform + .transform_json(&create_test_topic_metadata(), msg) + .unwrap() + .unwrap(); + match &result.payload { + Payload::Json(OwnedValue::Static(simd_json::StaticNode::Null)) => {} + other => panic!("Expected Json(Null), got {other:?}"), + } + } + + #[test] + fn should_pass_through_non_json_payload() { + let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig { + field: "data".to_string(), + }); + let msg = create_raw_test_message(vec![1, 2, 3, 4]); + let result = transform + .transform(&create_test_topic_metadata(), msg) + .unwrap() + .unwrap(); + if let Payload::Raw(bytes) = &result.payload { + assert_eq!(*bytes, vec![1u8, 2, 3, 4]); + } else { + panic!("Expected Raw payload"); + } + } + + #[test] + fn should_pass_through_non_object_json() { + let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig { + field: "data".to_string(), + }); + let msg = DecodedMessage { + id: None, + offset: None, + checksum: None, + timestamp: None, + origin_timestamp: None, + headers: None, + payload: Payload::Json(OwnedValue::Array(Box::new(vec![ + OwnedValue::Static(simd_json::StaticNode::I64(1)), + OwnedValue::Static(simd_json::StaticNode::I64(2)), + ]))), + }; + let result = transform + .transform(&create_test_topic_metadata(), msg) + .unwrap() + .unwrap(); + match &result.payload { + Payload::Json(OwnedValue::Array(arr)) => assert_eq!(arr.len(), 2), + other => panic!("Expected Json(Array), got {other:?}"), + } + } + + #[test] + fn should_unwrap_nested_array_field() { + let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig { + field: "items".to_string(), + }); + let msg = create_test_message(r#"{"items": [1, 2, 3], "count": 3}"#); + let result = transform + .transform_json(&create_test_topic_metadata(), msg) + .unwrap() + .unwrap(); + match &result.payload { + Payload::Json(OwnedValue::Array(arr)) => assert_eq!(arr.len(), 3), + other => panic!("Expected Json(Array), got {other:?}"), + } + } + + #[test] + fn should_work_with_deeply_nested_data() { + let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig { + field: "data".to_string(), + }); + let msg = create_test_message( + r#"{ + "data": {"user": {"name": "John", "age": 30}, "active": true}, + "meta": "ignored" + }"#, + ); + let result = transform + .transform_json(&create_test_topic_metadata(), msg) + .unwrap() + .unwrap(); + let json_obj = extract_json_object(&result).unwrap(); + assert_eq!(json_obj.len(), 2); + assert!(json_obj.contains_key("user")); + assert!(json_obj.contains_key("active")); + } +} diff --git a/core/connectors/sdk/src/transforms/mod.rs b/core/connectors/sdk/src/transforms/mod.rs index 56acc009b2..8cfeb4699a 100644 --- a/core/connectors/sdk/src/transforms/mod.rs +++ b/core/connectors/sdk/src/transforms/mod.rs @@ -23,6 +23,7 @@ mod filter_fields; pub mod flatbuffer_convert; pub mod json; pub mod proto_convert; +mod unwrap_envelope; mod update_fields; use crate::{DecodedMessage, Error, TopicMetadata}; pub use add_fields::{AddFields, AddFieldsConfig, Field as AddField}; @@ -38,6 +39,7 @@ use serde::{Deserialize, Serialize}; use simd_json::OwnedValue; use std::sync::Arc; use strum_macros::{Display, IntoStaticStr}; +pub use unwrap_envelope::{UnwrapEnvelope, UnwrapEnvelopeConfig}; pub use update_fields::{Field as UpdateField, UpdateCondition, UpdateFields, UpdateFieldsConfig}; /// The value of a field, either static or computed at runtime @@ -89,6 +91,7 @@ pub enum TransformType { ProtoConvert, FlatBufferConvert, AvroConvert, + UnwrapEnvelope, } pub fn from_config( @@ -131,5 +134,10 @@ pub fn from_config( serde_json::from_value(raw.clone()).map_err(|_| Error::InvalidConfig)?; Ok(Arc::new(AvroConvert::new(cfg))) } + TransformType::UnwrapEnvelope => { + let cfg: UnwrapEnvelopeConfig = + serde_json::from_value(raw.clone()).map_err(|_| Error::InvalidConfig)?; + Ok(Arc::new(UnwrapEnvelope::new(cfg))) + } } } diff --git a/core/connectors/sdk/src/transforms/unwrap_envelope.rs b/core/connectors/sdk/src/transforms/unwrap_envelope.rs new file mode 100644 index 0000000000..4541632ece --- /dev/null +++ b/core/connectors/sdk/src/transforms/unwrap_envelope.rs @@ -0,0 +1,62 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{Transform, TransformType}; +use crate::{DecodedMessage, Error, Payload, TopicMetadata}; +use serde::{Deserialize, Serialize}; + +/// Configuration for the UnwrapEnvelope transform. +/// +/// Extracts a nested JSON field from an envelope object and promotes it +/// to the top-level payload. For example, given a Postgres source +/// `DatabaseRecord` with shape `{ table_name, operation_type, timestamp, +/// data: { ... }, old_data }`, setting `field = "data"` replaces the +/// entire payload with the contents of `data`. +#[derive(Debug, Serialize, Deserialize)] +pub struct UnwrapEnvelopeConfig { + pub field: String, +} + +/// Transform that extracts a nested field from a JSON envelope and +/// promotes it as the top-level payload. +pub struct UnwrapEnvelope { + pub field: String, +} + +impl UnwrapEnvelope { + pub fn new(cfg: UnwrapEnvelopeConfig) -> Self { + Self { field: cfg.field } + } +} + +impl Transform for UnwrapEnvelope { + fn r#type(&self) -> TransformType { + TransformType::UnwrapEnvelope + } + + fn transform( + &self, + metadata: &TopicMetadata, + message: DecodedMessage, + ) -> Result, Error> { + match &message.payload { + Payload::Json(_) => self.transform_json(metadata, message), + _ => Ok(Some(message)), + } + } +} diff --git a/core/connectors/sinks/iceberg_sink/src/router/mod.rs b/core/connectors/sinks/iceberg_sink/src/router/mod.rs index 8e60beedfc..0b21c55584 100644 --- a/core/connectors/sinks/iceberg_sink/src/router/mod.rs +++ b/core/connectors/sinks/iceberg_sink/src/router/mod.rs @@ -170,6 +170,19 @@ async fn write_data( }) .collect(); + if let Some(first) = msgs.first() + && looks_like_envelope(first) + { + error!( + "Incoming JSON appears to be wrapped in a source envelope \ + (detected 'table_name' + 'data' fields). The Iceberg sink \ + expects flat JSON matching the target table schema. Add an \ + 'unwrap_envelope' transform with field = \"data\" to your \ + connector config to extract the inner payload." + ); + return Err(Error::InvalidRecord); + } + let cursor = JsonArrowReader::new(msgs.as_slice()); let reader = ReaderBuilder::new(Arc::new( schema_to_arrow_schema(&table.metadata().current_schema().clone()).map_err(|err| { @@ -230,6 +243,13 @@ async fn write_data( Ok(()) } +fn looks_like_envelope(value: &simd_json::OwnedValue) -> bool { + let simd_json::OwnedValue::Object(obj) = value else { + return false; + }; + obj.contains_key("table_name") && obj.contains_key("data") +} + #[async_trait] pub trait Router: std::fmt::Debug + Sync + Send { async fn route_data( From 7c923611ecc8e617a5f0c7d4302b16ca46a324c5 Mon Sep 17 00:00:00 2001 From: Atharva Lade Date: Wed, 29 Apr 2026 13:05:24 -0500 Subject: [PATCH 2/2] Trigger CI