Skip to content

Redevil10/airflow-postgres-csv

Repository files navigation

airflow-postgres-csv

License License
PyPI python airflow PyPI Downloads
CI lint tests codecov

Airflow operators for bulk PostgreSQL <-> CSV transfers using COPY. Supports Airflow 2.9+ and Airflow 3.

Listed on the Apache Airflow Ecosystem page.

Operators

  • PostgresToCsvOperator - Run a SQL query and export results to a CSV file
  • CsvToPostgresOperator - Load a CSV file into a PostgreSQL table

Both use PostgreSQL's COPY command for maximum throughput.

Installation

pip install airflow-postgres-csv

Usage

from airflow_postgres_csv import PostgresToCsvOperator, CsvToPostgresOperator

# Export query results to CSV
export_task = PostgresToCsvOperator(
    task_id="export_users",
    conn_id="my_postgres",
    sql="SELECT * FROM users WHERE active = %(active)s",
    parameters={"active": True},
    csv_file_path="/tmp/users.csv",
)

# Load CSV into a table (with truncate)
import_task = CsvToPostgresOperator(
    task_id="import_users",
    conn_id="my_postgres",
    table_name="staging.users",
    csv_file_path="/tmp/users.csv",
    truncate=True,
)

SQL from file

The sql parameter supports multiple formats:

# Inline SQL
PostgresToCsvOperator(sql="SELECT * FROM users", ...)

# Relative path (loaded by Airflow from DAG folder or template_searchpath)
PostgresToCsvOperator(sql="sql/export_users.sql", ...)

# Absolute path (loaded directly)
PostgresToCsvOperator(sql="/opt/airflow/sql/export_users.sql", ...)

Gzip compression

Both operators support gzip compression for large files:

# Export to gzip
PostgresToCsvOperator(
    sql="SELECT * FROM large_table",
    csv_file_path="/tmp/data.csv.gz",
    compression="gzip",
    ...
)

# Import from gzip
CsvToPostgresOperator(
    csv_file_path="/tmp/data.csv.gz",
    compression="gzip",
    ...
)

Parameters

PostgresToCsvOperator

Parameter Description Default
conn_id Airflow Postgres connection ID required
csv_file_path Output file path (templated) required
sql SQL query string, or path to .sql file required
parameters Dict passed to cursor.mogrify {}
has_header Include CSV header row True
compression Compression format ("gzip" or None) None
timeout Query timeout in minutes 60
count_lines Count and log the number of lines in the CSV after writing True

CsvToPostgresOperator

Parameter Description Default
conn_id Airflow Postgres connection ID required
table_name Target table (templated, supports schema.table) required
csv_file_path Input file path (templated) required
columns Explicit column list None
has_header CSV has header row True
truncate Truncate table before loading False
compression Compression format ("gzip" or None) None
delimiter CSV delimiter ","
quote_char CSV quote character '"'
null_string String representing NULL ""
timeout Query timeout in minutes 60
count_lines Count and log the number of lines in the CSV before loading True

Development

Running tests

Tests can be run against both supported Airflow versions using tox:

pip install tox

tox -e airflow2   # test against Airflow 2.x
tox -e airflow3   # test against Airflow 3.x
tox               # run both

Each environment installs the correct Airflow and provider versions automatically — no manual dependency management needed.

Requirements

Airflow 2 Airflow 3
apache-airflow >=2.9, <3.0 >=3.0
apache-airflow-providers-postgres >=5.0, <6.0 >=6.0
Python 3.10 – 3.13 3.10 – 3.13

License

Apache 2.0

About

Airflow 3 operators for bulk PostgreSQL ↔ CSV transfers using COPY

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages