From 08153c58a72738e66f9fd25ac925f33249725e6f Mon Sep 17 00:00:00 2001 From: Atharva Lade Date: Wed, 29 Apr 2026 13:07:18 -0500 Subject: [PATCH] fix(postgres): unify extract_column_value into single free function --- Cargo.lock | 1 + .../sources/postgres_source/Cargo.toml | 1 + .../sources/postgres_source/src/lib.rs | 243 +++++++++--------- 3 files changed, 127 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c213ee6513..a42f4a5959 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6742,6 +6742,7 @@ version = "0.4.0" dependencies = [ "async-trait", "base64 0.22.1", + "chrono", "dashmap", "futures", "humantime", diff --git a/core/connectors/sources/postgres_source/Cargo.toml b/core/connectors/sources/postgres_source/Cargo.toml index 0490b1a30d..2923c9f901 100644 --- a/core/connectors/sources/postgres_source/Cargo.toml +++ b/core/connectors/sources/postgres_source/Cargo.toml @@ -42,6 +42,7 @@ cdc_pg_replicate = [] [dependencies] async-trait = { workspace = true } base64 = { workspace = true } +chrono = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } humantime = { workspace = true } diff --git a/core/connectors/sources/postgres_source/src/lib.rs b/core/connectors/sources/postgres_source/src/lib.rs index f01bcba3f2..640b2d30d5 100644 --- a/core/connectors/sources/postgres_source/src/lib.rs +++ b/core/connectors/sources/postgres_source/src/lib.rs @@ -495,7 +495,7 @@ impl PostgresSource { continue; } - let value = self.extract_column_value(&row, i)?; + let value = extract_column_value(&row, i)?; data.insert(column_name.clone(), value.clone()); if column.name() == tracking_column { @@ -885,124 +885,131 @@ impl PostgresSource { } } } +} - fn extract_column_value( - &self, - row: &sqlx::postgres::PgRow, - column_index: usize, - ) -> Result { - let column = &row.columns()[column_index]; - let type_name = column.type_info().name(); - - match type_name { - "BOOL" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(serde_json::Value::Bool) - .unwrap_or(serde_json::Value::Null)) - } - "INT2" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(|v| serde_json::Value::from(v as i64)) - .unwrap_or(serde_json::Value::Null)) - } - "INT4" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(|v| serde_json::Value::from(v as i64)) - .unwrap_or(serde_json::Value::Null)) - } - "INT8" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(serde_json::Value::from) - .unwrap_or(serde_json::Value::Null)) - } - "FLOAT4" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(|v| serde_json::Value::from(v as f64)) - .unwrap_or(serde_json::Value::Null)) - } - "FLOAT8" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(serde_json::Value::from) - .unwrap_or(serde_json::Value::Null)) - } - "NUMERIC" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .and_then(|s| s.parse::().ok()) - .map(serde_json::Value::from) - .unwrap_or(serde_json::Value::Null)) - } - "VARCHAR" | "TEXT" | "CHAR" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(serde_json::Value::String) - .unwrap_or(serde_json::Value::Null)) - } - "TIMESTAMP" | "TIMESTAMPTZ" => { - let value: Option> = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(|dt| serde_json::Value::String(dt.to_rfc3339())) - .unwrap_or(serde_json::Value::Null)) - } - "UUID" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(|u| serde_json::Value::String(u.to_string())) - .unwrap_or(serde_json::Value::Null)) - } - "JSON" | "JSONB" => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value.unwrap_or(serde_json::Value::Null)) - } - "BYTEA" => { - let value: Option> = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(|bytes| { - use base64::Engine; - serde_json::Value::String( - base64::engine::general_purpose::STANDARD.encode(&bytes), - ) - }) - .unwrap_or(serde_json::Value::Null)) - } - _ => { - let value: Option = row - .try_get(column_index) - .map_err(|_| Error::InvalidRecord)?; - Ok(value - .map(serde_json::Value::String) - .unwrap_or(serde_json::Value::Null)) - } +fn extract_column_value( + row: &sqlx::postgres::PgRow, + column_index: usize, +) -> Result { + let column = &row.columns()[column_index]; + let type_name = column.type_info().name(); + + match type_name { + "BOOL" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::Bool) + .unwrap_or(serde_json::Value::Null)) + } + "INT2" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as i64)) + .unwrap_or(serde_json::Value::Null)) + } + "INT4" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as i64)) + .unwrap_or(serde_json::Value::Null)) + } + "INT8" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::from) + .unwrap_or(serde_json::Value::Null)) + } + "FLOAT4" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as f64)) + .unwrap_or(serde_json::Value::Null)) + } + "FLOAT8" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::from) + .unwrap_or(serde_json::Value::Null)) + } + "NUMERIC" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .and_then(|s| s.parse::().ok()) + .map(serde_json::Value::from) + .unwrap_or(serde_json::Value::Null)) + } + "DATE" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|d| serde_json::Value::String(d.to_string())) + .unwrap_or(serde_json::Value::Null)) + } + "VARCHAR" | "TEXT" | "CHAR" | "BPCHAR" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null)) + } + "TIMESTAMP" | "TIMESTAMPTZ" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|dt| serde_json::Value::String(dt.to_rfc3339())) + .unwrap_or(serde_json::Value::Null)) + } + "UUID" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|u| serde_json::Value::String(u.to_string())) + .unwrap_or(serde_json::Value::Null)) + } + "JSON" | "JSONB" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value.unwrap_or(serde_json::Value::Null)) + } + "BYTEA" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|bytes| { + use base64::Engine; + serde_json::Value::String( + base64::engine::general_purpose::STANDARD.encode(&bytes), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + _ => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null)) } } }