diff --git a/ea_airflow_util/callables/sql.py b/ea_airflow_util/callables/sql.py index 1d07c97..6b65e58 100644 --- a/ea_airflow_util/callables/sql.py +++ b/ea_airflow_util/callables/sql.py @@ -63,6 +63,7 @@ def s3_to_postgres( truncate: bool = False, delete_qry: Optional[str] = None, metadata_qry: Optional[str] = None, + column_detection_delimiter: Optional[str] = None, # specify the column delimiter character to use column order detection (i.e. '\t') **context ): if column_customization is None: @@ -85,6 +86,11 @@ def s3_to_postgres( print("File at this s3 key is empty.") raise AirflowSkipException + if column_detection_delimiter: + file_string = s3_hook.read_key(s3_key, s3_bucket) + header = file_string.split('\n')[0].split(column_detection_delimiter) + column_customization = ', '.join(header) + if truncate and not delete_qry: with conn.cursor() as cur: logging.info('Truncating table') @@ -130,6 +136,7 @@ def s3_dir_to_postgres( truncate: bool = False, delete_s3_dir: bool = False, metadata_qry: Optional[str] = None, + column_detection_delimiter: Optional[str] = None, # specify the column delimiter character to use column order detection (i.e. '\t') **context ): s3_hook = S3Hook(s3_conn_id) @@ -137,6 +144,11 @@ def s3_dir_to_postgres( s3_bucket = s3_creds.schema s3_keys = s3.list_s3_keys(s3_hook, s3_bucket, s3_key) + if column_detection_delimiter: + file_string = s3_hook.read_key(s3_keys[0], s3_bucket) + header = file_string.split('\n')[0].split(column_detection_delimiter) + column_customization = ', '.join(header) + if truncate: conn = PostgresHook(pg_conn_id).get_conn() with conn.cursor() as cur: