Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ea_airflow_util/callables/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def s3_to_postgres(
truncate: bool = False,
delete_qry: Optional[str] = None,
metadata_qry: Optional[str] = None,
column_detection_delimiter: Optional[str] = None, # specify the column delimiter character to use column order detection (i.e. '\t')
**context
):
if column_customization is None:
Expand All @@ -85,6 +86,11 @@ def s3_to_postgres(
print("File at this s3 key is empty.")
raise AirflowSkipException

if column_detection_delimiter:
file_string = s3_hook.read_key(s3_key, s3_bucket)
header = file_string.split('\n')[0].split(column_detection_delimiter)
column_customization = ', '.join(header)

if truncate and not delete_qry:
with conn.cursor() as cur:
logging.info('Truncating table')
Expand Down Expand Up @@ -130,13 +136,19 @@ def s3_dir_to_postgres(
truncate: bool = False,
delete_s3_dir: bool = False,
metadata_qry: Optional[str] = None,
column_detection_delimiter: Optional[str] = None, # specify the column delimiter character to use column order detection (i.e. '\t')
**context
):
s3_hook = S3Hook(s3_conn_id)
s3_creds = s3_hook.get_connection(s3_hook.aws_conn_id)
s3_bucket = s3_creds.schema
s3_keys = s3.list_s3_keys(s3_hook, s3_bucket, s3_key)

if column_detection_delimiter:
file_string = s3_hook.read_key(s3_keys[0], s3_bucket)
header = file_string.split('\n')[0].split(column_detection_delimiter)
column_customization = ', '.join(header)

if truncate:
conn = PostgresHook(pg_conn_id).get_conn()
with conn.cursor() as cur:
Expand Down