Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions airflow/dags/run_queue.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from email.policy import default
import os
from socket import timeout
from tracemalloc import start
Comment on lines +1 to +4
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There are several unused imports in this block: default from email.policy (line 1), timeout from socket (line 3), and start from tracemalloc (line 4). These should be removed to keep the code clean.

Comment on lines +1 to +4
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The imports email.policy.default (line 1) and tracemalloc.start (line 4) are not used in this file and should be removed to maintain code cleanliness.

from airflow.sdk import dag, task
from pendulum import datetime
from celery import Celery
from github import Auth, Github, GithubException
from client import get_data_from_queue
from datetime import timedelta
import time
from airflow.models import Variable
from worker import app


app = Celery(
'airflow_client',
broker = os.getenv('CELERY_BROKER_URL'),
backend = os.getenv('CELERY_BACKEND_URL')
)

# app = Celery(
# 'airflow_client',
# broker = os.getenv('CELERY_BROKER_URL'),
# backend = os.getenv('CELERY_BACKEND_URL')
# )

@dag(
schedule="@hourly",
Expand Down Expand Up @@ -45,16 +47,19 @@ def send_task_to_celery_worker(**context):
rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining")
max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total")

app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": 1000, "batch_size": 500})
start_with_repo_number = int(Variable.get("github_repo_number", default_var = "0"))
start_with_repo_number += 500

print("celery_worker")
app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": start_with_repo_number, "batch_size": 500})

Variable.set(key= "github_repo_number", value= str(start_with_repo_number))
Comment on lines +50 to +55
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current logic for determining start_with_repo_number is flawed. It increments the value by a fixed batch_size (500) in each run. However, the worker task (get_github_data) does not return the ID of the last repository it processed. This can lead to skipping or reprocessing repositories, especially since the GitHub get_repos(since=...) API returns repositories with an ID greater than the provided value.

To fix this, the get_github_data task should return the ID of the last repository it processed. The DAG should then use this returned value (via XComs) to update the github_repo_number Airflow Variable for the next run.

print("celery_worker")

@task
def save_data_from_queue():

get_data_from_queue()



check_rate_limit() >> send_task_to_celery_worker() >> save_data_from_queue()

Expand Down
8 changes: 5 additions & 3 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500):
properties=pika.BasicProperties(delivery_mode=2)
)

counter += 1
logger.info(github_data_points)

except Exception as validation_error:
Expand All @@ -127,8 +126,6 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500):
remaining_api_calls = gh.rate_limiting
remaining = remaining_api_calls[0]

if counter >= 5:
break

if counter >= batch_size:
logger.info(f"Reached batch size of {batch_size}")
Expand All @@ -152,6 +149,11 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500):
logger.info("Count")
logger.info(counter)

counter += 1

if counter >= 25:
break
Comment on lines +154 to +155
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This hardcoded break condition if counter >= 25: break will cause the worker to stop after processing only 25 repositories, ignoring the batch_size parameter which is set to 500 by the DAG. This appears to be leftover debugging code and should be removed.

Comment on lines +154 to +155
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This condition introduces a hardcoded limit that causes the loop to break after processing only 25 repositories. This seems like a leftover debugging statement that overrides the batch_size parameter. Please remove it to ensure the worker processes the intended number of repositories.


except Exception as e:
logger.exception("Error", e)

Expand Down
Loading