Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/dags/run_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ def run_queue():

@task
def run_the_queue():
app.send_task("worker.get_github_data", args=[0, 500])
app.send_task("worker.get_data_from_queue", args=[100, 500])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

You are trying to send a task named worker.get_data_from_queue, but there is no Celery task with this name. The function get_data_from_queue in client.py is not decorated as a task. I believe you intended to call the run_queue_and_save task defined in worker.py.

Suggested change
app.send_task("worker.get_data_from_queue", args=[100, 500])
app.send_task("worker.run_queue_and_save", args=[100, 500])



run_the_queue()


Expand Down
32 changes: 19 additions & 13 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,9 @@
import polars as pl


today = datetime.now().strftime("%Y-%m-%d")
def save_to_parquet(the_data):
today = datetime.now().strftime("%Y-%m-%d")

print("Waiting for Celery task to complete")

try:
print("Getting the result")
response = build_repo_chord(total=5000, batch_size=500)
the_data = response.get(timeout=3600) # 1 hour timeout
print(f"Result: {the_data}")

except Exception as e:
print(f"Error: {e}")

else:
if not Path(f"data/{today}/").exists():
Path(f"data/{today}").mkdir(parents=True, exist_ok=True)

Expand All @@ -30,3 +19,20 @@
df = pl.DataFrame(the_data)
df.write_parquet(f"data/{today}/github_data.parquet", compression="zstd")
print("Valid Parquet data")


def get_data_from_queue():
try:
print("Getting the result")
response = build_repo_chord(total=5000, batch_size=500)
the_data = response.get(timeout=3600) # 1 hour timeout
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The call to response.get(timeout=3600) in the get_data_from_queue function is a blocking operation that waits for the result of a Celery chord. This is a dangerous anti-pattern as it blocks the worker process, preventing it from performing other work, and can lead to worker deadlocks and Denial of Service in a distributed environment. Refactor the code to avoid blocking on task results within a worker; consider using Celery's callback mechanism (e.g., making save_to_parquet a task and linking it as a callback to the chord) for asynchronous processing. Additionally, there's a potential UnboundLocalError if an exception occurs within the try block, as the_data might not be assigned before save_to_parquet is called. Ensure save_to_parquet is only called when the_data is successfully populated, perhaps by using a try...except...else block.

print(f"Result: {the_data}")

except Exception as e:
print(f"Error: {e}")

return save_to_parquet(the_data)


if __name__ == "__main__":
get_data_from_queue()
9 changes: 8 additions & 1 deletion worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from pathlib import Path
import json
import boto3
import time
from celery import Celery, group, chord
from celery.utils.log import get_task_logger
from datetime import datetime
from github import Auth, Github, GithubException
from dotenv import load_dotenv
from client import get_data_from_queue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This import creates a circular dependency. worker.py imports get_data_from_queue from client.py, while client.py imports build_repo_chord from worker.py (at line 5). This will cause an ImportError at runtime. To fix this, you should restructure your code to avoid circular imports. For example, you could move shared functions into a separate utils.py file that both client.py and worker.py can import from.

from pydantic_models.github import RabbitMQ_Data_Validation
from rb_queue.rabbitmq import get_connection, QUEUE_NAME
load_dotenv()
Expand Down Expand Up @@ -183,6 +183,13 @@ def build_repo_chord(total: int = 5000, batch_size: int = 500):
return chord(header)(aggregate_results.s())


@app.task
def run_queue_and_save(total: int = 5000, batch_size: int = 500):
return get_data_from_queue(total=total, batch_size=batch_size)
Comment on lines +187 to +188
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This task calls get_data_from_queue with total and batch_size arguments. However, the get_data_from_queue function in client.py is defined without any parameters, which will cause a TypeError when this task is executed. You need to modify get_data_from_queue in client.py to accept these arguments and use them when calling build_repo_chord.





# old code that did not work
# @app.task
# def distribute_tasks():
Expand Down
Loading