From 5a6edc250db200840a90fb2e3390e9a49a838d9c Mon Sep 17 00:00:00 2001 From: sleblanc23 Date: Wed, 26 Mar 2025 15:27:38 -0500 Subject: [PATCH 1/3] add option to detect column names to avoid order issues --- ea_airflow_util/callables/sql.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ea_airflow_util/callables/sql.py b/ea_airflow_util/callables/sql.py index 1d07c97..26bfe79 100644 --- a/ea_airflow_util/callables/sql.py +++ b/ea_airflow_util/callables/sql.py @@ -130,6 +130,7 @@ def s3_dir_to_postgres( truncate: bool = False, delete_s3_dir: bool = False, metadata_qry: Optional[str] = None, + column_detection_delimiter: Optional[str] = None, **context ): s3_hook = S3Hook(s3_conn_id) @@ -137,6 +138,11 @@ def s3_dir_to_postgres( s3_bucket = s3_creds.schema s3_keys = s3.list_s3_keys(s3_hook, s3_bucket, s3_key) + if column_detection_delimiter: + file_string = s3_hook.read_key(s3_keys[0], s3_bucket) + header = file_string.split('\n')[0].split(column_detection_delimiter) + column_customization = ', '.join(header) + if truncate: conn = PostgresHook(pg_conn_id).get_conn() with conn.cursor() as cur: From 3dd2f1ad162a6098c64741c9f9ccd97a9f644f15 Mon Sep 17 00:00:00 2001 From: sleblanc23 Date: Wed, 26 Mar 2025 15:42:10 -0500 Subject: [PATCH 2/3] add the same option to the single key function --- ea_airflow_util/callables/sql.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ea_airflow_util/callables/sql.py b/ea_airflow_util/callables/sql.py index 26bfe79..b13f345 100644 --- a/ea_airflow_util/callables/sql.py +++ b/ea_airflow_util/callables/sql.py @@ -63,6 +63,7 @@ def s3_to_postgres( truncate: bool = False, delete_qry: Optional[str] = None, metadata_qry: Optional[str] = None, + column_detection_delimiter: Optional[str] = None, **context ): if column_customization is None: @@ -85,6 +86,11 @@ def s3_to_postgres( print("File at this s3 key is empty.") raise AirflowSkipException + if column_detection_delimiter: + file_string = s3_hook.read_key(s3_key, s3_bucket) + header = file_string.split('\n')[0].split(column_detection_delimiter) + column_customization = ', '.join(header) + if truncate and not delete_qry: with conn.cursor() as cur: logging.info('Truncating table') From 1e5ee9a097e3f6f5504af8e60203dec17d09297d Mon Sep 17 00:00:00 2001 From: sleblanc23 Date: Wed, 26 Mar 2025 15:49:55 -0500 Subject: [PATCH 3/3] add comment to hopefully clarify config --- ea_airflow_util/callables/sql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ea_airflow_util/callables/sql.py b/ea_airflow_util/callables/sql.py index b13f345..6b65e58 100644 --- a/ea_airflow_util/callables/sql.py +++ b/ea_airflow_util/callables/sql.py @@ -63,7 +63,7 @@ def s3_to_postgres( truncate: bool = False, delete_qry: Optional[str] = None, metadata_qry: Optional[str] = None, - column_detection_delimiter: Optional[str] = None, + column_detection_delimiter: Optional[str] = None, # specify the column delimiter character to use column order detection (i.e. '\t') **context ): if column_customization is None: @@ -136,7 +136,7 @@ def s3_dir_to_postgres( truncate: bool = False, delete_s3_dir: bool = False, metadata_qry: Optional[str] = None, - column_detection_delimiter: Optional[str] = None, + column_detection_delimiter: Optional[str] = None, # specify the column delimiter character to use column order detection (i.e. '\t') **context ): s3_hook = S3Hook(s3_conn_id)