Skip to content

Commit 2976c02

Browse files
Luis GonzalezLuis Gonzalez
authored andcommitted
added new task to dag that gets remaining api call
1 parent f7c8cbc commit 2976c02

1 file changed

Lines changed: 28 additions & 7 deletions

File tree

airflow/dags/run_queue.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
import os
12
from airflow.sdk import dag, task
23
from pendulum import datetime
34
from celery import Celery
4-
import os
5+
from github import Auth, Github, GithubException
6+
7+
8+
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
9+
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
10+
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
511

612
app = Celery(
713
'airflow_client',
814
broker = os.getenv('CELERY_BROKER_URL'),
915
backend = os.getenv('CELERY_BACKEND_URL')
1016
)
1117

18+
1219
@dag(
1320
schedule="@hourly",
1421
start_date=datetime(2026, 2, 3),
@@ -17,14 +24,28 @@
1724
tags=["celery_queue"],
1825
max_consecutive_failed_dag_runs=3,
1926
)
20-
def run_queue():
27+
def run_github_data_queue():
28+
29+
@task
30+
def check_rate_limit():
31+
rate_limit = gh.rate_limiting
32+
print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total")
33+
34+
return {
35+
"remaining": rate_limit[0],
36+
"total": rate_limit[1]
37+
}
38+
2139

2240
@task
23-
def run_the_queue():
24-
app.send_task("worker.get_data_from_queue", args=[100, 500])
41+
def run_the_queue(rate_limit: str):
42+
print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}')
43+
44+
if rate_limit["remaining"] > 4900:
45+
app.send_task("worker.get_data_from_queue", args=[100, 500])
2546

26-
27-
run_the_queue()
47+
val = check_rate_limit()
48+
run_the_queue(rate_limit=val)
2849

2950

30-
run_queue()
51+
run_github_data_queue()

0 commit comments

Comments
 (0)