From 2a9713cfba7155ecc7dbb338cb5c405af93b7f7d Mon Sep 17 00:00:00 2001 From: Jacob Bortell Date: Mon, 20 Oct 2025 15:01:19 -0500 Subject: [PATCH 1/2] DAG for deleting empty log directories --- ea_airflow_util/dags/rotate_airflow_logs.py | 56 +++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 ea_airflow_util/dags/rotate_airflow_logs.py diff --git a/ea_airflow_util/dags/rotate_airflow_logs.py b/ea_airflow_util/dags/rotate_airflow_logs.py new file mode 100644 index 0000000..c22bf2d --- /dev/null +++ b/ea_airflow_util/dags/rotate_airflow_logs.py @@ -0,0 +1,56 @@ +import logging +import os + +from ea_airflow_util import EACustomDAG + +from airflow.decorators import task + + +class RotateLogsDAG: + """ + A DAG for rotating Airflow logs. + + Currently, this DAG only deletes empty directories in the Airflow logs + folder. It does not save off logs or delete logs at this time. See Jira item + https://edanalytics.atlassian.net/browse/STAD-198 + + If located somewhere other than `$AIRFLOW_HOME/logs`, set the `logs_dir` + parameter to the path to the Airflow logs root directory. + + Example entry in an airflow_config.yml file: + + ```yaml + rotate_logs_dag: + schedule_interval: "@weekly" + default_args: *default_task_args + logs_dir: /home/airflow/airflow/logs + ``` + """ + + def __init__(self, *, logs_dir: str = None, **kwargs) -> None: + self.logs_dir = logs_dir or os.path.join(os.environ["AIRFLOW_HOME"], "logs") + self.dag = self.build_dag(**kwargs) + + def get_empty_directories(self) -> list[str]: + empty_dirs = [] + for root, dirs, files in os.walk(self.logs_dir): + # The answer to "What does it mean for a directory to be empty" + # could be more complex. This check works for Dag/Task directories. + if not dirs and not files: + empty_dirs.append(root) + return empty_dirs + + def build_dag(self, **kwargs): + @task + def delete_empty_directories(): + paths = self.get_empty_directories() + logging.info(f"Deleting {len(paths)} empty directories in {self.logs_dir}") + + for path in paths: + logging.info(f"Removing empty directory {path}") + os.rmdir(path) + + with EACustomDAG(**kwargs) as dag: + delete_empty_directories() + + return dag From 78b2b3acd252168631eecb14b9267e7844d42576 Mon Sep 17 00:00:00 2001 From: Jacob Bortell Date: Tue, 21 Oct 2025 15:40:22 -0500 Subject: [PATCH 2/2] Scan for empty files too. Update directory search to handle nested empty directories --- ea_airflow_util/dags/rotate_airflow_logs.py | 42 ++++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/ea_airflow_util/dags/rotate_airflow_logs.py b/ea_airflow_util/dags/rotate_airflow_logs.py index c22bf2d..17e7072 100644 --- a/ea_airflow_util/dags/rotate_airflow_logs.py +++ b/ea_airflow_util/dags/rotate_airflow_logs.py @@ -10,9 +10,8 @@ class RotateLogsDAG: """ A DAG for rotating Airflow logs. - Currently, this DAG only deletes empty directories in the Airflow logs - folder. It does not save off logs or delete logs at this time. See Jira item - https://edanalytics.atlassian.net/browse/STAD-198 + Currently, this DAG only deletes empty log files and empty directories in + the Airflow logs folder. It does save logs or delete them. If located somewhere other than `$AIRFLOW_HOME/logs`, set the `logs_dir` parameter to the path to the Airflow logs root directory. @@ -40,17 +39,40 @@ def get_empty_directories(self) -> list[str]: empty_dirs.append(root) return empty_dirs + def get_empty_files(self) -> list[str]: + empty_files = [] + for root, _, files in os.walk(self.logs_dir): + for filename in files: + filepath = os.path.join(root, filename) + try: + if os.path.getsize(filepath) == 0: + empty_files.append(filepath) + except OSError: + logging.warning(f"Could not access file: {filepath}") + return empty_files + def build_dag(self, **kwargs): @task - def delete_empty_directories(): - paths = self.get_empty_directories() - logging.info(f"Deleting {len(paths)} empty directories in {self.logs_dir}") + def delete_empty_files_and_directories(): + # Some remote logging providers (CloudWatch) leave behind empty log + # files. First we delete those, then delete empty directories. + empty_files = self.get_empty_files() + + logging.info(f"Deleting {len(empty_files)} empty files in {self.logs_dir}") + for file in empty_files: + logging.info(f"Removing empty file {file}") + os.remove(file) - for path in paths: - logging.info(f"Removing empty directory {path}") - os.rmdir(path) + # This is a loop to handle nested empty directories. + while empty_dirs := self.get_empty_directories(): + logging.info( + f"Deleting {len(empty_dirs)} empty directories in {self.logs_dir}" + ) + for path in empty_dirs: + logging.info(f"Removing empty directory {path}") + os.rmdir(path) with EACustomDAG(**kwargs) as dag: - delete_empty_directories() + delete_empty_files_and_directories() return dag