Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions airflow/dags/run_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
tags=["celery_queue"],
max_consecutive_failed_dag_runs=3
)
def run_github_data_queue():
def run_queue():

@task(do_xcom_push=True, multiple_outputs=True)
def check_rate_limit(**context):
api_token = os.getenv("GITHUB_API_TOKEN")
Expand All @@ -38,15 +38,14 @@ def check_rate_limit(**context):
return {
"remaining": rate_limit[0],
"total": rate_limit[1]
}
}

@task
def run_queue(**context):
def send_task_to_celery_worker(**context):
rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining")
max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total")
Comment on lines 45 to 46
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These variables are fetched from XCom but are never used within the send_task_to_celery_worker task. They should be removed to improve code clarity and avoid confusion.



app.send_task("worker.get_github_data")
app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": 1000, "batch_size": 500})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The values for start_in_repo_num and batch_size are hardcoded. This makes the task less flexible and harder to configure. Consider defining these as constants at the top of the file or loading them from a configuration source like Airflow variables.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The arguments start_in_repo_num and batch_size are hardcoded. This reduces the flexibility of the DAG. Consider making these values configurable, for instance, by using Airflow variables or passing them as part of the DAG run configuration. The hardcoded start_in_repo_num: 1000 is particularly concerning as it might not be the intended starting point for all runs.


print("celery_worker")

Expand All @@ -57,7 +56,7 @@ def save_data_from_queue():



check_rate_limit() >> run_queue() >> save_data_from_queue()
check_rate_limit() >> send_task_to_celery_worker() >> save_data_from_queue()


run_github_data_queue()
run_queue()
23 changes: 23 additions & 0 deletions celery_logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging
import os
from math import log
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log function from the math module is imported but never used in this file. It should be removed to keep the code clean.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log function from the math module is imported but is not used. Unused imports should be removed to keep the code clean.

from celery.signals import after_setup_logger, after_setup_task_logger

def _configure(logger: logging.Logger) -> None:
logger.handlers.clear()
logger.setLevel(os.getenv("CELERY_LOG_LEVEL", "INFO").upper())
logger.propagate = False

handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(name)s %(message)s"
))
logger.addHandler(handler)

@after_setup_logger.connect
def setup_root_logger(logger, *args, **kwargs):
_configure(logger)

@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
_configure(logger)
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ services:
- ./data:/app/data
command: celery -A worker worker --loglevel=info --concurrency=8
env_file: ./config/.env
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "5"
deploy:
resources:
limits:
Expand Down
Empty file removed fluent-bit/fluent-bit.conf
Empty file.
2 changes: 1 addition & 1 deletion rb_queue/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def consume_repos(callback):
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)

# properties is needed becayse channel.basic_consume expects 4 parameters, properties does not have
# properties is needed because channel.basic_consume expects 4 parameters, properties does not have
# any value but it is required to meet the 4 parameters requirement
def on_message(ch, method, properties, body):
try:
Expand Down
68 changes: 28 additions & 40 deletions worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import pika
from pathlib import Path
Expand All @@ -10,6 +11,7 @@
from dotenv import load_dotenv
from pydantic_models.github import RabbitMQ_Data_Validation
from rb_queue.rabbitmq import get_connection, QUEUE_NAME
import celery_logging_config # registers Celery logging signal handlers
load_dotenv()


Expand All @@ -25,46 +27,43 @@
region_name=os.getenv("AWS_REGION", "us-east-1")
)


def save_to_s3(data, file_directory):
s3_client.put_object(
Bucket=S3_BUCKET_NAME,
Key=file_directory,
Body=json.dumps(data, default=str)
)


app = Celery(
'github_repos',
broker = os.getenv('CELERY_BROKER_URL'),
backend = os.getenv('CELERY_BACKEND_URL')
)


api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)


# bind = True allows to get task data, like task id
@app.task(bind=True)
def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, github_instance: Github = gh):
def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The return mylist statement was removed from this function. This breaks the Celery chord (build_repo_chord) which relies on this task's return value to pass to aggregate_results. Without it, aggregate_results will receive None and the final result will be incorrect. Please add return mylist at the end of the function.

counter = 0
connection = None
channel = None
mylist = []

repositories = github_instance.get_repos(since=start_in_repo_num)
rate_limit = github_instance.rate_limiting
print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total")
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
Comment on lines +52 to +54
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The GitHub client instances (gh, gh_two) are initialized inside the get_github_data task. This is inefficient as it will happen on every task execution. These clients should be initialized once at the module level to be reused across task runs, as it was in the previous version of the code.


repositories = gh.get_repos(since=start_in_repo_num)
rate_limit = gh.rate_limiting
logger.info(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total")

try:
connection = get_connection()
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)

for repo in repositories:
print(f"This is the repo printing: {repo}")
logger.info(f"This is the repo printing: {repo}")

try:
github_data_points = {
Expand Down Expand Up @@ -99,12 +98,12 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git

except GithubException as ge:
if ge.status == 403:
print(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}")
logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the rest of the file and to leverage Celery's task-specific logging, please use the task logger logger.exception instead of logging.exception.

Suggested change
logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}")
logger.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}")

else:
print(f"GitHub API error {ge.status} for repo, skipping...")
logging.exception(f"GitHub API error {ge.status} for repo, skipping...")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency, please use logger.exception instead of logging.exception.

Suggested change
logging.exception(f"GitHub API error {ge.status} for repo, skipping...")
logger.exception(f"GitHub API error {ge.status} for repo, skipping...")

continue
except Exception as e:
print(f"Error accessing repo data: {e}, skipping...")
logging.exception(f"Error accessing repo data: {e}, skipping...")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency, please use logger.exception instead of logging.exception.

Suggested change
logging.exception(f"Error accessing repo data: {e}, skipping...")
logger.exception(f"Error accessing repo data: {e}, skipping...")

continue
Comment on lines 99 to 107
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

You're using logging.exception here, but a task-specific logger logger was obtained via get_task_logger. For consistency and to ensure logs have the proper Celery task context, you should use logger.exception instead. This is already done correctly in other parts of the file (e.g., line 156).

Suggested change
except GithubException as ge:
if ge.status == 403:
print(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}")
logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}")
else:
print(f"GitHub API error {ge.status} for repo, skipping...")
logging.exception(f"GitHub API error {ge.status} for repo, skipping...")
continue
except Exception as e:
print(f"Error accessing repo data: {e}, skipping...")
logging.exception(f"Error accessing repo data: {e}, skipping...")
continue
except GithubException as ge:
if ge.status == 403:
logger.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}")
else:
logger.exception(f"GitHub API error {ge.status} for repo, skipping...")
continue
except Exception as e:
logger.exception(f"Error accessing repo data: {e}, skipping...")
continue


try:
Expand All @@ -118,22 +117,25 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git
)

counter += 1
print(github_data_points)
logger.info(github_data_points)

except Exception as validation_error:
print(f"Validation error for repo {github_data_points.get('full_name')}: {validation_error}")
print("Skipping this repo and continuing")
continue
Comment on lines 122 to 125
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These print statements should be converted to logger calls (e.g., logger.error or logger.warning) to be consistent with the logging improvements made elsewhere in this file. This ensures all output is handled by the configured logging system.

Suggested change
except Exception as validation_error:
print(f"Validation error for repo {github_data_points.get('full_name')}: {validation_error}")
print("Skipping this repo and continuing")
continue
except Exception as validation_error:
logger.error(f"Validation error for repo {github_data_points.get('full_name')}: {validation_error}")
logger.warning("Skipping this repo and continuing")
continue


remaining_api_calls = github_instance.rate_limiting
remaining_api_calls = gh.rate_limiting
remaining = remaining_api_calls[0]

if counter >= 5:
break
Comment on lines +130 to +131
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This block unconditionally breaks the loop after 5 iterations. This appears to be leftover debugging code and will prevent the task from processing the full batch_size. It should be removed.

Comment on lines +130 to +131
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This condition if counter >= 5: break appears to be a debugging remnant. It will cause the loop to terminate after processing only 5 repositories, making the batch_size parameter ineffective. This should be removed to ensure the full batch is processed.


if counter >= batch_size:
print(f"Reached batch size of {batch_size}")
logger.info(f"Reached batch size of {batch_size}")
break

if remaining < 20:
print(f"Rate limit approaching ({remaining}). Stopping worker.")
logger.info(f"Rate limit approaching ({remaining}). Stopping worker.")
break

# # raise self.retry(countdown=3600)
Expand All @@ -144,27 +146,25 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git
# # github account and it's credentials to have 1000 more API calls

else:
print("Remaining api calls")
print(remaining)
logger.info("Remaining api calls")
logger.info(remaining)

print("Count")
print(counter)
logger.info("Count")
logger.info(counter)

except Exception as e:
print("Error", e)
logger.exception("Error", e)

finally:
if connection:
connection.close()
else:
print("The connection does not exist")
logger.info("The connection does not exist")


# s3_url = save_to_s3(data=repo_collection, file_directory="github_repos/test.json")
logger.info(f"Processed {counter} repositories")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The get_github_data task no longer returns the mylist of collected repositories because return mylist was removed. This breaks the chord defined in build_repo_chord, as the aggregate_results task will receive a list of None values instead of the repository data. Please restore return mylist at the end of this function to fix the aggregation logic.


return mylist


@app.task
def aggregate_results(results):
Expand All @@ -179,16 +179,4 @@ def build_repo_chord(total: int = 5000, batch_size: int = 500):
header = [
get_github_data.s(start, batch_size) for start in range(0, total, batch_size)
]
return chord(header)(aggregate_results.s())


# old code that did not work
# @app.task
# def distribute_tasks():

# jobs = group([
# get_github_data.s(start, 500)
# for start in range(0, 5000, 500)
# ])

# return chord(jobs)()
return chord(header)(aggregate_results.s())
Loading