Skip to content

Commit f7c8cbc

Browse files
Luis GonzalezLuis Gonzalez
authored andcommitted
trying to run celery queue with airflow
1 parent 7e16544 commit f7c8cbc

3 files changed

Lines changed: 29 additions & 15 deletions

File tree

airflow/dags/run_queue.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ def run_queue():
2121

2222
@task
2323
def run_the_queue():
24-
app.send_task("worker.get_github_data", args=[0, 500])
24+
app.send_task("worker.get_data_from_queue", args=[100, 500])
2525

26+
2627
run_the_queue()
2728

2829

client.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,9 @@
77
import polars as pl
88

99

10-
today = datetime.now().strftime("%Y-%m-%d")
10+
def save_to_parquet(the_data):
11+
today = datetime.now().strftime("%Y-%m-%d")
1112

12-
print("Waiting for Celery task to complete")
13-
14-
try:
15-
print("Getting the result")
16-
response = build_repo_chord(total=5000, batch_size=500)
17-
the_data = response.get(timeout=3600) # 1 hour timeout
18-
print(f"Result: {the_data}")
19-
20-
except Exception as e:
21-
print(f"Error: {e}")
22-
23-
else:
2413
if not Path(f"data/{today}/").exists():
2514
Path(f"data/{today}").mkdir(parents=True, exist_ok=True)
2615

@@ -30,3 +19,20 @@
3019
df = pl.DataFrame(the_data)
3120
df.write_parquet(f"data/{today}/github_data.parquet", compression="zstd")
3221
print("Valid Parquet data")
22+
23+
24+
def get_data_from_queue():
25+
try:
26+
print("Getting the result")
27+
response = build_repo_chord(total=5000, batch_size=500)
28+
the_data = response.get(timeout=3600) # 1 hour timeout
29+
print(f"Result: {the_data}")
30+
31+
except Exception as e:
32+
print(f"Error: {e}")
33+
34+
return save_to_parquet(the_data)
35+
36+
37+
if __name__ == "__main__":
38+
get_data_from_queue()

worker.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
from pathlib import Path
44
import json
55
import boto3
6-
import time
76
from celery import Celery, group, chord
87
from celery.utils.log import get_task_logger
98
from datetime import datetime
109
from github import Auth, Github, GithubException
1110
from dotenv import load_dotenv
11+
from client import get_data_from_queue
1212
from pydantic_models.github import RabbitMQ_Data_Validation
1313
from rb_queue.rabbitmq import get_connection, QUEUE_NAME
1414
load_dotenv()
@@ -183,6 +183,13 @@ def build_repo_chord(total: int = 5000, batch_size: int = 500):
183183
return chord(header)(aggregate_results.s())
184184

185185

186+
@app.task
187+
def run_queue_and_save(total: int = 5000, batch_size: int = 500):
188+
return get_data_from_queue(total=total, batch_size=batch_size)
189+
190+
191+
192+
186193
# old code that did not work
187194
# @app.task
188195
# def distribute_tasks():

0 commit comments

Comments
 (0)