Skip to content

make airflow run the Celery queue#31

Merged
LuisJG8 merged 1 commit intoMyProjectsfrom
from_json_to_pqt
Feb 13, 2026
Merged

make airflow run the Celery queue#31
LuisJG8 merged 1 commit intoMyProjectsfrom
from_json_to_pqt

Conversation

@LuisJG8
Copy link
Copy Markdown
Owner

@LuisJG8 LuisJG8 commented Feb 13, 2026

No description provided.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @LuisJG8, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the Airflow setup to seamlessly orchestrate Celery queue operations for extracting GitHub data. It achieves this by restructuring the Airflow DAG to manage API rate limits and dispatch Celery tasks, ensuring that the Airflow environment can properly access and execute the necessary project code, and standardizing the data storage path for processed information.

Highlights

  • Airflow DAG Refactoring: The Airflow DAG (run_queue.py) has been significantly refactored to integrate Celery queue processing for GitHub data extraction. This includes using XComs for rate limit management, introducing new tasks for queue execution and data saving, and updating task dependencies for a clearer workflow.
  • Airflow Environment Configuration: Airflow Docker services (docker-compose.override.yml) have been configured with volume mounts and PYTHONPATH settings. This ensures that the project's custom code is accessible within the Airflow environment, allowing DAGs to import and execute necessary modules.
  • Data Storage Standardization: The client.py file now uses a defined DIRECT path for data storage, standardizing where extracted GitHub data (in parquet format) is saved within the project structure.
  • Celery Worker Optimization: The Celery worker (worker.py) has been streamlined by removing an unused task and import. Additionally, the GitHub API rate limit threshold for stopping data collection has been adjusted to a lower value (from 100 to 20) to prevent hitting limits prematurely.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • airflow/dags/run_queue.py
    • Imported get_data_from_queue, timedelta, and time.
    • Removed global GitHub API token and Github object initialization.
    • Modified check_rate_limit task to push multiple outputs via XComs and initialize GitHub API clients locally.
    • Added a new run_queue task to retrieve rate limit information from XComs, check it, and dispatch a Celery task (worker.get_github_data).
    • Added a new save_data_from_queue task to invoke the get_data_from_queue function.
    • Updated the task dependency chain to check_rate_limit() >> run_queue() >> save_data_from_queue().
    • Removed the run_the_queue task.
  • airflow/docker-compose.override.yml
    • Added volume mounts for the project directory (/usr/local/airflow/project) to api-server, dag-processor, scheduler, and triggerer services.
    • Set the PYTHONPATH environment variable to /usr/local/airflow/project for the same services.
  • client.py
    • Defined a DIRECT constant for the base data storage path.
    • Updated the save_to_parquet function to use the DIRECT path for creating daily directories and saving parquet files.
    • Removed the timezone import from datetime.
  • worker.py
    • Removed the import of get_data_from_queue.
    • Deleted the run_queue_and_save task.
    • Lowered the GitHub API rate limit threshold from 100 to 20 for stopping the worker.
Activity
  • No specific activity (comments, reviews, or progress updates) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@LuisJG8
Copy link
Copy Markdown
Owner Author

LuisJG8 commented Feb 13, 2026

/gemini review

@LuisJG8 LuisJG8 merged commit 692ac9f into MyProjects Feb 13, 2026
1 check failed
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request aims to integrate a Celery queue with an Airflow DAG for fetching GitHub data. A medium-severity vulnerability was identified due to hardcoded absolute paths in the airflow/docker-compose.override.yml configuration, which compromises portability and exposes local filesystem information. Additionally, the implementation contains a critical error handling bug that could crash the DAG, and introduces Airflow anti-patterns such as using time.sleep() and flawed DAG logic for rate-limiting. Detailed comments and suggestions have been provided to address these issues and enhance code quality and robustness.


if rate_limit["remaining"] > 4900:
app.send_task("worker.get_data_from_queue", args=[100, 500])
get_data_from_queue()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The called function get_data_from_queue() contains a critical bug. In client.py, if response.get() raises an exception (e.g., a timeout), the the_data variable will not be assigned. The except block catches the exception, but execution then proceeds to return save_to_parquet(the_data), which will raise an UnboundLocalError and crash the task. The error handling in get_data_from_queue must be improved to handle this case gracefully.

Here is a suggested fix for client.py:

# In client.py
def get_data_from_queue():
    the_data = None
    try:
        print("Getting the result")
        response = build_repo_chord(total=5000, batch_size=500)
        the_data = response.get(timeout=3600)
        print(f"Result: {the_data}")
    except Exception as e:
        print(f"Error: {e}")

    if the_data is not None:
        return save_to_parquet(the_data)

Comment on lines +43 to +55
def run_queue(**context):
rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining")
max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total")

if rate_limit > 100:
print("IT WORKS")
print(rate_limit)

celery_worker = app.send_task("worker.get_github_data")

print(celery_worker)

time.sleep(500)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The run_queue task has several significant issues:

  • Blocking Sleep: time.sleep(500) is an Airflow anti-pattern. It blocks a worker slot for over 8 minutes without doing useful work, which can lead to worker starvation. For delays, please use Airflow native concepts like Sensors or Deferrable Operators.
  • Ineffective Logic: The rate limit check inside the if statement does not prevent the downstream save_data_from_queue task from running, which appears to trigger the main workload. This makes the rate limit check ineffective.
  • Unused Variable: max_total_api_calls is fetched from XCom but is never used.
  • Debug Prints: The print() statements should be replaced with proper logging for use in production.

This task's logic is unclear and inefficient. It should be refactored to properly gate the data fetching pipeline based on the rate limit, perhaps using a BranchPythonOperator.

from rb_queue.rabbitmq import consume_repos
import polars as pl

DIRECT = Path("/usr/local/airflow/project/data")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The data directory path is hardcoded. This makes the application less flexible and harder to configure for different environments. It is a best practice to source this kind of configuration from an environment variable, for example by using os.getenv(). This would require importing the os module at the top of the file.

networks:
- etl-shared
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

A medium-severity vulnerability exists due to a hardcoded absolute path to a local user directory (/Users/luisgonzalez/...) in docker-compose.override.yml. This makes the configuration non-portable, causes failures in other environments, and leaks local filesystem information. This issue affects all services defined in this file (api-server, dag-processor, scheduler, triggerer) and should be replaced with a relative path.


dag-processor:
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The docker-compose.override.yml file contains a hardcoded absolute path to a local user directory (/Users/luisgonzalez/...). This makes the configuration non-portable and will cause it to fail on any other developer's machine or in a CI/CD environment. It also leaks information about the developer's local filesystem.

networks:
- etl-shared
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The docker-compose.override.yml file contains a hardcoded absolute path to a local user directory (/Users/luisgonzalez/...). This makes the configuration non-portable and will cause it to fail on any other developer's machine or in a CI/CD environment. It also leaks information about the developer's local filesystem.

networks:
- etl-shared
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The docker-compose.override.yml file contains a hardcoded absolute path to a local user directory (/Users/luisgonzalez/...). This makes the configuration non-portable and will cause it to fail on any other developer's machine or in a CI/CD environment. It also leaks information about the developer's local filesystem.

Comment on lines +30 to +32
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The GitHub client gh_two and its related authentication variables (api_token_two, auth_two) are initialized but never used within this task. This adds unnecessary overhead and should be removed to improve code clarity.

Suggested change
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
api_token = os.getenv("GITHUB_API_TOKEN")
auth = Auth.Token(api_token)
gh = Github(auth=auth)

Comment on lines +58 to 63
def save_data_from_queue():

if rate_limit["remaining"] > 4900:
app.send_task("worker.get_data_from_queue", args=[100, 500])
get_data_from_queue()



Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function definition includes unnecessary blank lines and incorrect indentation, which harms readability. Please remove them.

Suggested change
def save_data_from_queue():
if rate_limit["remaining"] > 4900:
app.send_task("worker.get_data_from_queue", args=[100, 500])
get_data_from_queue()
def save_data_from_queue():
get_data_from_queue()

Comment on lines +14 to +15
# if not Path(f"data/{today}/").exists():
# Path(f"data/{today}").mkdir(parents=True, exist_ok=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This commented-out code should be removed to keep the codebase clean.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request integrates a Celery queue with an Airflow DAG for asynchronous processing. However, it introduces critical security and operational concerns. The Docker configuration leaks sensitive local path information (PII) and uses absolute paths, compromising portability. Additionally, the Airflow DAG contains a long blocking time.sleep call, risking resource exhaustion and Denial of Service. The implementation also includes inefficient patterns and potentially redundant logic. Addressing these issues is crucial for the pipeline's security, stability, and maintainability.

Comment on lines 57 to +60
@task
def run_the_queue(rate_limit: str):
print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}')
def save_data_from_queue():

if rate_limit["remaining"] > 4900:
app.send_task("worker.get_data_from_queue", args=[100, 500])
get_data_from_queue()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The save_data_from_queue task calls get_data_from_queue(), which contains a blocking call response.get(timeout=3600). This will occupy an Airflow worker for up to an hour, which is inefficient and can lead to worker starvation. For long-running asynchronous operations like this, it's better to use deferrable operators or sensors to free up worker slots while waiting.

from rb_queue.rabbitmq import consume_repos
import polars as pl

DIRECT = Path("/usr/local/airflow/project/data")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The DIRECT constant is defined with a hardcoded absolute path. This reduces portability. Consider making this path configurable, for example, by reading it from an environment variable. You will also need to add import os at the top of the file.

Suggested change
DIRECT = Path("/usr/local/airflow/project/data")
DIRECT = Path(os.getenv("PROJECT_DATA_DIR", "/usr/local/airflow/project/data"))

networks:
- etl-shared
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The volume mount in docker-compose.override.yml uses a hardcoded absolute path to a local directory, which leaks sensitive PII (e.g., developer's username and local file system structure). This also severely impacts portability, causing services to fail in other environments. This issue is repeated for dag-processor, scheduler, and triggerer services. It is recommended to use a relative path.

       - ./:/usr/local/airflow/project


print(celery_worker)

time.sleep(500)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The run_queue task includes a time.sleep(500) call, which blocks an Airflow worker slot for over 8 minutes. This can quickly exhaust the worker pool if the DAG is triggered multiple times, posing a resource exhaustion Denial of Service (DoS) risk. Additionally, this pattern is highly inefficient, and the task itself appears redundant as its triggered Celery task's result is unused, and subsequent tasks seem to cover its functionality. The logic and necessity of this task should be reconsidered.

Comment on lines +14 to +15
# if not Path(f"data/{today}/").exists():
# Path(f"data/{today}").mkdir(parents=True, exist_ok=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code is commented out. If it's no longer needed, it should be removed to keep the codebase clean.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant