Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions airflow/dags/run_queue.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import os
from airflow.sdk import dag, task
from pendulum import datetime
from celery import Celery
import os
from github import Auth, Github, GithubException
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The GithubException import is not used in this file. It's good practice to remove unused imports to keep the code clean and reduce unnecessary dependencies.

Suggested change
from github import Auth, Github, GithubException
from github import Auth, Github



api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
Comment on lines +8 to +10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variables api_token_two, auth_two, and gh_two are defined but not used within this DAG file. Unused variables can lead to confusion and unnecessary resource allocation. Consider removing them if they are not intended for use here, or add a comment explaining their future purpose if they are placeholders.

Suggested change
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
api_token = os.getenv("GITHUB_API_TOKEN")
auth = Auth.Token(api_token)
gh = Github(auth=auth)

Comment on lines +8 to +10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Initializing the GitHub client at the top level of the DAG file is an Airflow anti-pattern. This code is executed on every DAG parsing cycle, which is inefficient and can lead to performance issues. This logic should be moved into the check_rate_limit task. Also, the variables suffixed with _two are unused and should be removed. Please see my other comment on the check_rate_limit task for a suggested implementation. These lines should be removed.


app = Celery(
'airflow_client',
broker = os.getenv('CELERY_BROKER_URL'),
backend = os.getenv('CELERY_BACKEND_URL')
)


@dag(
schedule="@hourly",
start_date=datetime(2026, 2, 3),
Expand All @@ -17,14 +24,28 @@
tags=["celery_queue"],
max_consecutive_failed_dag_runs=3,
)
def run_queue():
def run_github_data_queue():

@task
def check_rate_limit():
rate_limit = gh.rate_limiting
Comment on lines +30 to +31
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

As mentioned in my other comment, the GitHub client should be initialized within the task to follow Airflow best practices. This avoids running authentication logic during DAG parsing. For better security, it's also recommended to use Airflow Connections to manage secrets like API tokens.

    def check_rate_limit():
        api_token = os.getenv("GITHUB_API_TOKEN")
        auth = Auth.Token(api_token)
        gh = Github(auth=auth)
        rate_limit = gh.rate_limiting

print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using print statements directly within Airflow tasks is not ideal for production environments. Airflow provides a robust logging system that should be utilized for better observability and integration with monitoring tools. Please replace print with logging.info() or logging.debug() after importing the logging module.


return {
"remaining": rate_limit[0],
"total": rate_limit[1]
}


@task
def run_the_queue():
app.send_task("worker.get_data_from_queue", args=[100, 500])
def run_the_queue(rate_limit: str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The type hint for rate_limit is str, but the check_rate_limit task returns a dictionary. This type hint is incorrect and could lead to confusion or runtime errors if strict type checking were enforced. Please update the type hint to dict to accurately reflect the expected input.

Suggested change
def run_the_queue(rate_limit: str):
def run_the_queue(rate_limit: dict):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hint for rate_limit is str, but the check_rate_limit task returns a dictionary. This is misleading and could cause issues with static analysis tools or future maintenance. It should be dict.

    def run_the_queue(rate_limit: dict):

print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the check_rate_limit task, using print statements directly within Airflow tasks is not ideal for production environments. Please replace print with logging.info() or logging.debug() for better logging practices.


if rate_limit["remaining"] > 4900:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The number 4900 is a magic number. It's better to define such thresholds as named constants at the top of the file for clarity, easier modification, and improved maintainability. For example, MIN_RATE_LIMIT_THRESHOLD = 4900.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The number 4900 is a magic number, which makes the code harder to understand and maintain. It should be defined as a constant with a descriptive name at the module level, for example: MIN_API_CALLS_THRESHOLD = 4900.

        if rate_limit["remaining"] > MIN_API_CALLS_THRESHOLD:

app.send_task("worker.get_data_from_queue", args=[100, 500])

run_the_queue()
val = check_rate_limit()
run_the_queue(rate_limit=val)


run_queue()
run_github_data_queue()
Loading