Skip to content

added new task to dag that gets remaining api calls to github api#30

Merged
LuisJG8 merged 1 commit intoMyProjectsfrom
from_json_to_pqt
Feb 12, 2026
Merged

added new task to dag that gets remaining api calls to github api#30
LuisJG8 merged 1 commit intoMyProjectsfrom
from_json_to_pqt

Conversation

@LuisJG8
Copy link
Copy Markdown
Owner

@LuisJG8 LuisJG8 commented Feb 12, 2026

No description provided.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @LuisJG8, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the run_queue Airflow DAG by introducing robust GitHub API rate limit management. It ensures that data retrieval tasks are only dispatched when sufficient API calls are available, preventing service interruptions due to rate limiting. This change improves the reliability and stability of the data ingestion pipeline by proactively managing external API constraints.

Highlights

  • GitHub API Integration: Integrated the PyGithub library to interact with the GitHub API, including authentication with multiple tokens loaded from environment variables.
  • Rate Limit Monitoring: Added a new Airflow task check_rate_limit to fetch and report the current GitHub API rate limits, returning the remaining and total calls.
  • Conditional Task Dispatch: Modified the run_the_queue task to only dispatch data retrieval tasks to the Celery worker if the remaining GitHub API calls exceed a specified threshold (4900).
  • DAG Renaming: Renamed the main DAG function from run_queue to run_github_data_queue for improved clarity and specificity.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • airflow/dags/run_queue.py
    • Imported Auth, Github, and GithubException from the github library.
    • Initialized two GitHub API client instances using tokens retrieved from environment variables.
    • Defined a new @task named check_rate_limit to retrieve and return the current GitHub API rate limit status.
    • Modified the @task run_the_queue to accept the rate limit information and conditionally send tasks to the Celery worker based on the remaining API calls.
    • Updated the main DAG function, renaming it from run_queue to run_github_data_queue, to orchestrate the check_rate_limit and run_the_queue tasks.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@LuisJG8
Copy link
Copy Markdown
Owner Author

LuisJG8 commented Feb 12, 2026

/gemini review

@LuisJG8 LuisJG8 changed the title added new task to dag that gets remaining api call added new task to dag that gets remaining api calls to github api Feb 12, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a new task to an Airflow DAG to check the GitHub API rate limit before initiating a Celery task. This enhancement improves the DAG's resilience by preventing unnecessary API calls when the rate limit is low. However, there are several areas for improvement, including correcting type hints, using Airflow's logging system instead of print statements, and addressing unused imports and variables for better code hygiene. Additionally, a magic number used for the rate limit threshold should be replaced with a named constant for clarity.

@task
def run_the_queue():
app.send_task("worker.get_data_from_queue", args=[100, 500])
def run_the_queue(rate_limit: str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The type hint for rate_limit is str, but the check_rate_limit task returns a dictionary. This type hint is incorrect and could lead to confusion or runtime errors if strict type checking were enforced. Please update the type hint to dict to accurately reflect the expected input.

Suggested change
def run_the_queue(rate_limit: str):
def run_the_queue(rate_limit: dict):

from pendulum import datetime
from celery import Celery
import os
from github import Auth, Github, GithubException
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The GithubException import is not used in this file. It's good practice to remove unused imports to keep the code clean and reduce unnecessary dependencies.

Suggested change
from github import Auth, Github, GithubException
from github import Auth, Github

Comment on lines +8 to +10
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variables api_token_two, auth_two, and gh_two are defined but not used within this DAG file. Unused variables can lead to confusion and unnecessary resource allocation. Consider removing them if they are not intended for use here, or add a comment explaining their future purpose if they are placeholders.

Suggested change
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
api_token = os.getenv("GITHUB_API_TOKEN")
auth = Auth.Token(api_token)
gh = Github(auth=auth)

@task
def check_rate_limit():
rate_limit = gh.rate_limiting
print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using print statements directly within Airflow tasks is not ideal for production environments. Airflow provides a robust logging system that should be utilized for better observability and integration with monitoring tools. Please replace print with logging.info() or logging.debug() after importing the logging module.

def run_the_queue():
app.send_task("worker.get_data_from_queue", args=[100, 500])
def run_the_queue(rate_limit: str):
print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the check_rate_limit task, using print statements directly within Airflow tasks is not ideal for production environments. Please replace print with logging.info() or logging.debug() for better logging practices.

def run_the_queue(rate_limit: str):
print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}')

if rate_limit["remaining"] > 4900:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The number 4900 is a magic number. It's better to define such thresholds as named constants at the top of the file for clarity, easier modification, and improved maintainability. For example, MIN_RATE_LIMIT_THRESHOLD = 4900.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds a new task to an Airflow DAG to check the GitHub API rate limit before processing data, which is a good proactive measure. However, the implementation has some issues that should be addressed. The GitHub client is initialized in the global scope of the DAG file, which is an Airflow anti-pattern. This should be moved into the task that uses it. There's also an incorrect type hint and a magic number that should be refactored for better code quality and maintainability. I've provided specific suggestions to resolve these points.

Comment on lines +8 to +10
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Initializing the GitHub client at the top level of the DAG file is an Airflow anti-pattern. This code is executed on every DAG parsing cycle, which is inefficient and can lead to performance issues. This logic should be moved into the check_rate_limit task. Also, the variables suffixed with _two are unused and should be removed. Please see my other comment on the check_rate_limit task for a suggested implementation. These lines should be removed.

Comment on lines +30 to +31
def check_rate_limit():
rate_limit = gh.rate_limiting
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

As mentioned in my other comment, the GitHub client should be initialized within the task to follow Airflow best practices. This avoids running authentication logic during DAG parsing. For better security, it's also recommended to use Airflow Connections to manage secrets like API tokens.

    def check_rate_limit():
        api_token = os.getenv("GITHUB_API_TOKEN")
        auth = Auth.Token(api_token)
        gh = Github(auth=auth)
        rate_limit = gh.rate_limiting

@task
def run_the_queue():
app.send_task("worker.get_data_from_queue", args=[100, 500])
def run_the_queue(rate_limit: str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hint for rate_limit is str, but the check_rate_limit task returns a dictionary. This is misleading and could cause issues with static analysis tools or future maintenance. It should be dict.

    def run_the_queue(rate_limit: dict):

def run_the_queue(rate_limit: str):
print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}')

if rate_limit["remaining"] > 4900:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The number 4900 is a magic number, which makes the code harder to understand and maintain. It should be defined as a constant with a descriptive name at the module level, for example: MIN_API_CALLS_THRESHOLD = 4900.

        if rate_limit["remaining"] > MIN_API_CALLS_THRESHOLD:

@LuisJG8 LuisJG8 merged commit dba7d60 into MyProjects Feb 12, 2026
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant