Skip to content

Commit 9adcb86

Browse files
Luis GonzalezLuis Gonzalez
authored andcommitted
fixed polar data type issue, where it would not save the data properly because a datetime variable would be str instead of datetime
1 parent e6b971f commit 9adcb86

7 files changed

Lines changed: 79 additions & 27 deletions

File tree

airflow/dags/my_dag.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

airflow/dags/run_queue.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from airflow.sdk import dag, task
2+
from pendulum import datetime
3+
4+
@dag(
5+
schedule="@hourly",
6+
start_date=datetime(2026, 2, 2),
7+
description="Run Celery queue with RabbitMQ as the broker \
8+
in order to get GitHub data from the GitHub API",
9+
tags=["celery_queue"],
10+
max_consecutive_failed_dag_runs=3,
11+
)
12+
def run_queue():
13+
14+
@task
15+
def run_the_queue():
16+
print("hello")
17+
18+
run_the_queue()
19+
20+
21+
run_queue()

client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import polars as pl
88

99

10-
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
10+
today = datetime.now().strftime("%Y-%m-%d")
1111

1212
print("Waiting for Celery task to complete")
1313

data/analytics/ducky.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import duckdb
22

3-
df = duckdb.read_parquet("../2026-01-14/hey.parquet")
3+
duckdb.sql("SET display_max_rows=10000")
44

5-
duckdb.sql("DESCRIBE SELECT * FROM df").show()
5+
df = duckdb.read_parquet("../2026-02-02/github_data.parquet")
66

7-
duckdb.sql("SELECT language, COUNT(language) AS c_p \
8-
FROM df \
9-
GROUP BY language \
10-
ORDER BY c_p DESC").show()
7+
# duckdb.sql("DESCRIBE SELECT * FROM df").show()
8+
9+
# duckdb.sql("SELECT language, COUNT(language) AS c_p \
10+
# FROM df \
11+
# GROUP BY language \
12+
# ORDER BY c_p DESC").show()
13+
14+
duckdb.sql("DESCRIBE SELECT * FROM df").show()

docker-compose.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,50 @@ services:
8484
celery_worker:
8585
condition: service_healthy
8686

87+
88+
# airflow-init:
89+
# image: apache/airflow:2.9.2
90+
# environment:
91+
# AIRFLOW__CORE__EXECUTOR: LocalExecutor
92+
# AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
93+
# AIRFLOW__CORE__LOAD_EXAMPLES: "false"
94+
# volumes:
95+
# - ./airflow/dags:/opt/airflow/dags
96+
# - ./:/opt/airflow/project
97+
# - airflow_data:/opt/airflow
98+
# command: ["airflow", "db", "init"]
99+
100+
# airflow-scheduler:
101+
# image: apache/airflow:2.9.2
102+
# depends_on:
103+
# - airflow-init
104+
# environment:
105+
# AIRFLOW__CORE__EXECUTOR: LocalExecutor
106+
# AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
107+
# AIRFLOW__CORE__LOAD_EXAMPLES: "false"
108+
# volumes:
109+
# - ./airflow/dags:/opt/airflow/dags
110+
# - ./:/opt/airflow/project
111+
# - airflow_data:/opt/airflow
112+
# command: ["airflow", "scheduler"]
113+
114+
# airflow-webserver:
115+
# image: apache/airflow:2.9.2
116+
# depends_on:
117+
# - airflow-init
118+
# environment:
119+
# AIRFLOW__CORE__EXECUTOR: LocalExecutor
120+
# AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
121+
# AIRFLOW__CORE__LOAD_EXAMPLES: "false"
122+
# volumes:
123+
# - ./airflow/dags:/opt/airflow/dags
124+
# - ./:/opt/airflow/project
125+
# - airflow_data:/opt/airflow
126+
# ports:
127+
# - "8080:8080"
128+
# command: ["airflow", "webserver"]
129+
130+
87131
volumes:
88132
rabbitmq_data:
89133
redis_data:

pydantic_models/github.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
class RabbitMQ_Data_Validation(BaseModel):
77
# ===== MESSAGE METADATA =====
88
message_id: str
9-
got_data_in: str = Field(default_factory=lambda: datetime.now().isoformat())
9+
got_data_in: datetime = Field(default_factory=lambda: datetime.now())
1010

1111
# ===== BASIC INFO =====
1212
repo_id: int

worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616

1717
logger = get_task_logger(__name__)
18-
todays_date = datetime.now().isoformat()
18+
todays_date = datetime.now()
1919
S3_BUCKET_NAME = "github-etl-data-bucket"
2020

2121

@@ -70,7 +70,7 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git
7070
try:
7171
github_data_points = {
7272
"message_id": self.request.id,
73-
"got_data_in": todays_date if todays_date else datetime.now().isoformat(),
73+
"got_data_in": todays_date if todays_date else datetime.now(),
7474
"repo_id": repo.id if repo.id is not None else 0,
7575
"name": repo.name if repo.name else "unknown",
7676
"full_name": repo.full_name if repo.full_name else "unknown/unknown",

0 commit comments

Comments
 (0)