diff --git a/core/connectors/sdk/src/transforms/json/mod.rs b/core/connectors/sdk/src/transforms/json/mod.rs
index 34a48dd10f..0deca91916 100644
--- a/core/connectors/sdk/src/transforms/json/mod.rs
+++ b/core/connectors/sdk/src/transforms/json/mod.rs
@@ -25,6 +25,7 @@ use super::ComputedValue;
pub mod add_fields;
pub mod delete_fields;
pub mod filter_fields;
+pub mod unwrap_envelope;
pub mod update_fields;
/// Computes a JSON value based on the specified computed value type
diff --git a/core/connectors/sdk/src/transforms/json/unwrap_envelope.rs b/core/connectors/sdk/src/transforms/json/unwrap_envelope.rs
new file mode 100644
index 0000000000..6bbdc72f53
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/unwrap_envelope.rs
@@ -0,0 +1,213 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{
+ DecodedMessage, Error, Payload, TopicMetadata, transforms::unwrap_envelope::UnwrapEnvelope,
+};
+use simd_json::OwnedValue;
+use tracing::warn;
+
+impl UnwrapEnvelope {
+ pub(crate) fn transform_json(
+ &self,
+ _metadata: &TopicMetadata,
+ mut message: DecodedMessage,
+ ) -> Result, Error> {
+ let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload else {
+ return Ok(Some(message));
+ };
+
+ let Some(inner) = map.remove(self.field.as_str()) else {
+ warn!(
+ "unwrap_envelope: field '{}' not found in payload, passing through unchanged",
+ self.field
+ );
+ return Ok(Some(message));
+ };
+
+ message.payload = Payload::Json(inner);
+ Ok(Some(message))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::transforms::Transform;
+ use crate::transforms::json::test_utils::{
+ create_raw_test_message, create_test_message, create_test_topic_metadata,
+ extract_json_object,
+ };
+ use crate::transforms::unwrap_envelope::{UnwrapEnvelope, UnwrapEnvelopeConfig};
+ use crate::{DecodedMessage, Payload};
+ use simd_json::OwnedValue;
+
+ #[test]
+ fn should_extract_data_field_from_database_record_envelope() {
+ let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
+ field: "data".to_string(),
+ });
+ let msg = create_test_message(
+ r#"{
+ "table_name": "tpch.lineitem",
+ "operation_type": "SELECT",
+ "timestamp": "2026-04-24T19:57:00Z",
+ "data": {"id": 1, "l_orderkey": 123, "l_partkey": 456},
+ "old_data": null
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 3);
+ assert!(json_obj.contains_key("id"));
+ assert!(json_obj.contains_key("l_orderkey"));
+ assert!(json_obj.contains_key("l_partkey"));
+ assert!(!json_obj.contains_key("table_name"));
+ assert!(!json_obj.contains_key("operation_type"));
+ }
+
+ #[test]
+ fn should_handle_missing_field_gracefully() {
+ let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
+ field: "data".to_string(),
+ });
+ let msg = create_test_message(r#"{"table_name": "users", "operation_type": "INSERT"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert!(json_obj.contains_key("table_name"));
+ }
+
+ #[test]
+ fn should_handle_scalar_inner_value() {
+ let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
+ field: "payload".to_string(),
+ });
+ let msg = create_test_message(r#"{"payload": "just a string", "meta": 1}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ match &result.payload {
+ Payload::Json(OwnedValue::String(s)) => assert_eq!(s.as_str(), "just a string"),
+ other => panic!("Expected Json(String), got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn should_handle_null_inner_value() {
+ let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
+ field: "old_data".to_string(),
+ });
+ let msg = create_test_message(r#"{"data": {"id": 1}, "old_data": null}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ match &result.payload {
+ Payload::Json(OwnedValue::Static(simd_json::StaticNode::Null)) => {}
+ other => panic!("Expected Json(Null), got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn should_pass_through_non_json_payload() {
+ let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
+ field: "data".to_string(),
+ });
+ let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+ let result = transform
+ .transform(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ if let Payload::Raw(bytes) = &result.payload {
+ assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+ } else {
+ panic!("Expected Raw payload");
+ }
+ }
+
+ #[test]
+ fn should_pass_through_non_object_json() {
+ let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
+ field: "data".to_string(),
+ });
+ let msg = DecodedMessage {
+ id: None,
+ offset: None,
+ checksum: None,
+ timestamp: None,
+ origin_timestamp: None,
+ headers: None,
+ payload: Payload::Json(OwnedValue::Array(Box::new(vec![
+ OwnedValue::Static(simd_json::StaticNode::I64(1)),
+ OwnedValue::Static(simd_json::StaticNode::I64(2)),
+ ]))),
+ };
+ let result = transform
+ .transform(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ match &result.payload {
+ Payload::Json(OwnedValue::Array(arr)) => assert_eq!(arr.len(), 2),
+ other => panic!("Expected Json(Array), got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn should_unwrap_nested_array_field() {
+ let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
+ field: "items".to_string(),
+ });
+ let msg = create_test_message(r#"{"items": [1, 2, 3], "count": 3}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ match &result.payload {
+ Payload::Json(OwnedValue::Array(arr)) => assert_eq!(arr.len(), 3),
+ other => panic!("Expected Json(Array), got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn should_work_with_deeply_nested_data() {
+ let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
+ field: "data".to_string(),
+ });
+ let msg = create_test_message(
+ r#"{
+ "data": {"user": {"name": "John", "age": 30}, "active": true},
+ "meta": "ignored"
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert!(json_obj.contains_key("user"));
+ assert!(json_obj.contains_key("active"));
+ }
+}
diff --git a/core/connectors/sdk/src/transforms/mod.rs b/core/connectors/sdk/src/transforms/mod.rs
index 56acc009b2..8cfeb4699a 100644
--- a/core/connectors/sdk/src/transforms/mod.rs
+++ b/core/connectors/sdk/src/transforms/mod.rs
@@ -23,6 +23,7 @@ mod filter_fields;
pub mod flatbuffer_convert;
pub mod json;
pub mod proto_convert;
+mod unwrap_envelope;
mod update_fields;
use crate::{DecodedMessage, Error, TopicMetadata};
pub use add_fields::{AddFields, AddFieldsConfig, Field as AddField};
@@ -38,6 +39,7 @@ use serde::{Deserialize, Serialize};
use simd_json::OwnedValue;
use std::sync::Arc;
use strum_macros::{Display, IntoStaticStr};
+pub use unwrap_envelope::{UnwrapEnvelope, UnwrapEnvelopeConfig};
pub use update_fields::{Field as UpdateField, UpdateCondition, UpdateFields, UpdateFieldsConfig};
/// The value of a field, either static or computed at runtime
@@ -89,6 +91,7 @@ pub enum TransformType {
ProtoConvert,
FlatBufferConvert,
AvroConvert,
+ UnwrapEnvelope,
}
pub fn from_config(
@@ -131,5 +134,10 @@ pub fn from_config(
serde_json::from_value(raw.clone()).map_err(|_| Error::InvalidConfig)?;
Ok(Arc::new(AvroConvert::new(cfg)))
}
+ TransformType::UnwrapEnvelope => {
+ let cfg: UnwrapEnvelopeConfig =
+ serde_json::from_value(raw.clone()).map_err(|_| Error::InvalidConfig)?;
+ Ok(Arc::new(UnwrapEnvelope::new(cfg)))
+ }
}
}
diff --git a/core/connectors/sdk/src/transforms/unwrap_envelope.rs b/core/connectors/sdk/src/transforms/unwrap_envelope.rs
new file mode 100644
index 0000000000..4541632ece
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/unwrap_envelope.rs
@@ -0,0 +1,62 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{Transform, TransformType};
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+use serde::{Deserialize, Serialize};
+
+/// Configuration for the UnwrapEnvelope transform.
+///
+/// Extracts a nested JSON field from an envelope object and promotes it
+/// to the top-level payload. For example, given a Postgres source
+/// `DatabaseRecord` with shape `{ table_name, operation_type, timestamp,
+/// data: { ... }, old_data }`, setting `field = "data"` replaces the
+/// entire payload with the contents of `data`.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct UnwrapEnvelopeConfig {
+ pub field: String,
+}
+
+/// Transform that extracts a nested field from a JSON envelope and
+/// promotes it as the top-level payload.
+pub struct UnwrapEnvelope {
+ pub field: String,
+}
+
+impl UnwrapEnvelope {
+ pub fn new(cfg: UnwrapEnvelopeConfig) -> Self {
+ Self { field: cfg.field }
+ }
+}
+
+impl Transform for UnwrapEnvelope {
+ fn r#type(&self) -> TransformType {
+ TransformType::UnwrapEnvelope
+ }
+
+ fn transform(
+ &self,
+ metadata: &TopicMetadata,
+ message: DecodedMessage,
+ ) -> Result , Error> {
+ match &message.payload {
+ Payload::Json(_) => self.transform_json(metadata, message),
+ _ => Ok(Some(message)),
+ }
+ }
+}
diff --git a/core/connectors/sinks/iceberg_sink/src/router/mod.rs b/core/connectors/sinks/iceberg_sink/src/router/mod.rs
index 8e60beedfc..0b21c55584 100644
--- a/core/connectors/sinks/iceberg_sink/src/router/mod.rs
+++ b/core/connectors/sinks/iceberg_sink/src/router/mod.rs
@@ -170,6 +170,19 @@ async fn write_data(
})
.collect();
+ if let Some(first) = msgs.first()
+ && looks_like_envelope(first)
+ {
+ error!(
+ "Incoming JSON appears to be wrapped in a source envelope \
+ (detected 'table_name' + 'data' fields). The Iceberg sink \
+ expects flat JSON matching the target table schema. Add an \
+ 'unwrap_envelope' transform with field = \"data\" to your \
+ connector config to extract the inner payload."
+ );
+ return Err(Error::InvalidRecord);
+ }
+
let cursor = JsonArrowReader::new(msgs.as_slice());
let reader = ReaderBuilder::new(Arc::new(
schema_to_arrow_schema(&table.metadata().current_schema().clone()).map_err(|err| {
@@ -230,6 +243,13 @@ async fn write_data(
Ok(())
}
+fn looks_like_envelope(value: &simd_json::OwnedValue) -> bool {
+ let simd_json::OwnedValue::Object(obj) = value else {
+ return false;
+ };
+ obj.contains_key("table_name") && obj.contains_key("data")
+}
+
#[async_trait]
pub trait Router: std::fmt::Debug + Sync + Send {
async fn route_data(