Apache Airflow is a great tool for orchestrating and monitoring data pipelines. One of the most valuable features it provides is the ability to see the logs of the tasks that have been executed.
It allows you to see the output of the commands that were executed, and to debug the issues that might have occurred during the execution of the pipeline. And these logs can be accessed for any execution of the pipeline, going back in time as far as you want.
The problem
But this feature comes at a cost. The logs are stored in the disk, and they can end up taking a lot of space.
The solution
We can actually solve this problem in a pretty funny way. To solve the disk space problem I was facing, I wrote an Airflow DAG that deletes old Airflow logs.
Finding the Airflow logs directory
The directory that contains all the working files of Airflow can be configured by setting the AIRFLOW_HOME
environment variable. By default, it is set to ~/airflow
, but it can be changed to any other directory.
Airflow provides a convenient function to get this path.
import airflow.configuration
from pathlib import Path
AIRFLOW_HOME = airflow.configuration.get_airflow_home()
From here, it’s easy to get the path to the logs directory.
AIRFLOW_LOGS = Path(AIRFLOW_HOME) / 'logs'
Cleaning old log files
The straightforward way to save disk space and provide ongoing maintenance is to delete files that are older than a certain cutoff date. We can implement this logic like follows.
import time
LOG_TTL_DAYS = 7
for path in AIRFLOW_LOGS.glob("**/*"):
if not path.is_file(): continue
stat = path.stat()
file_age = time.time() - stat.st_mtime
if file_age > 60 * 60 * 24 * LOG_TTL_DAYS:
path.unlink()
This should be enough to keep the logs directory clean. This code leaves empty directories behind, but that is again something that can be easily fixed.
Deleting empty log directories
We can recursively delete empty directories with the following code.
def recursive_delete_empty_dirs(path: Path):
if not path.is_dir(): return 0
if is_path_empty(path):
path.rmdir()
return 1
num_deleted = 0
for p in path.iterdir():
num_deleted += recursive_delete_empty_dirs(p)
return num_deleted
for path in AIRFLOW_LOGS.glob("**/*"):
while True:
deleted = recursive_delete_empty_dirs(path)
if deleted == 0: break
With this, we have a DAG that will keep the logs directory clean both from old log files and from any empty directories.