Skip to content

working on airflow dag#35

Merged
LuisJG8 merged 1 commit intoMyProjectsfrom
from_json_to_pqt
Feb 19, 2026
Merged

working on airflow dag#35
LuisJG8 merged 1 commit intoMyProjectsfrom
from_json_to_pqt

Conversation

@LuisJG8
Copy link
Copy Markdown
Owner

@LuisJG8 LuisJG8 commented Feb 19, 2026

No description provided.

@LuisJG8
Copy link
Copy Markdown
Owner Author

LuisJG8 commented Feb 19, 2026

/gemini review

@LuisJG8 LuisJG8 merged commit 38bddf7 into MyProjects Feb 19, 2026
1 check failed
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @LuisJG8, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the Airflow DAG responsible for fetching GitHub data by introducing stateful tracking of the starting repository number using Airflow Variables. It also refactors the Celery application integration by importing it from the worker module and adjusts the worker's internal loop termination logic for data processing.

Highlights

  • Dynamic Repository Number Tracking: The Airflow DAG now utilizes Airflow Variables to dynamically track and update the starting repository number for data fetching, ensuring continuous processing across DAG runs.
  • Centralized Celery App Definition: The Celery application instance is now imported from the worker module, centralizing its definition and usage within the Airflow DAG.
  • Worker Loop Control Adjustment: The Celery worker's data fetching loop now increments its internal counter and applies a break condition after 25 iterations, modifying the previous logic which was based on message count and a limit of 5.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • airflow/dags/run_queue.py
    • Imported Variable from airflow.models and app from worker.
    • Commented out the local Celery app definition, now importing it.
    • Modified send_task_to_celery_worker to retrieve and update the github_repo_number using Airflow Variables for dynamic starting points.
  • worker.py
    • Adjusted the counter increment logic within the get_github_data task.
    • Updated the loop break condition to terminate after 25 iterations instead of 5, and changed the condition's placement.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the Airflow DAG to manage repository processing state using Airflow Variables. While this is a good step, the review identifies some critical issues. There are unused imports that should be removed. More importantly, the pagination logic for fetching GitHub repositories is flawed and could lead to data loss or duplication. Additionally, the worker contains leftover debug code that prematurely terminates its execution. These issues need to be addressed to ensure the DAG runs correctly and reliably.

Comment on lines +50 to +55
start_with_repo_number = int(Variable.get("github_repo_number", default_var = "0"))
start_with_repo_number += 500

print("celery_worker")
app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": start_with_repo_number, "batch_size": 500})

Variable.set(key= "github_repo_number", value= str(start_with_repo_number))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current logic for determining start_with_repo_number is flawed. It increments the value by a fixed batch_size (500) in each run. However, the worker task (get_github_data) does not return the ID of the last repository it processed. This can lead to skipping or reprocessing repositories, especially since the GitHub get_repos(since=...) API returns repositories with an ID greater than the provided value.

To fix this, the get_github_data task should return the ID of the last repository it processed. The DAG should then use this returned value (via XComs) to update the github_repo_number Airflow Variable for the next run.

Comment on lines +154 to +155
if counter >= 25:
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This hardcoded break condition if counter >= 25: break will cause the worker to stop after processing only 25 repositories, ignoring the batch_size parameter which is set to 500 by the DAG. This appears to be leftover debugging code and should be removed.

Comment on lines +1 to +4
from email.policy import default
import os
from socket import timeout
from tracemalloc import start
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There are several unused imports in this block: default from email.policy (line 1), timeout from socket (line 3), and start from tracemalloc (line 4). These should be removed to keep the code clean.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the Airflow DAG to persist the starting repository number between runs using Airflow Variables, which is a good improvement for making the process stateful. It also includes changes to the Celery worker.

My review has identified a couple of issues:

  • There are unused imports in the DAG file that should be removed for code cleanliness.
  • A hardcoded limit has been introduced in the worker which appears to be leftover from debugging and will prevent it from processing full batches. This is a significant issue that needs to be addressed.

Please see the detailed comments below.

Comment on lines +154 to +155
if counter >= 25:
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This condition introduces a hardcoded limit that causes the loop to break after processing only 25 repositories. This seems like a leftover debugging statement that overrides the batch_size parameter. Please remove it to ensure the worker processes the intended number of repositories.

Comment on lines +1 to +4
from email.policy import default
import os
from socket import timeout
from tracemalloc import start
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The imports email.policy.default (line 1) and tracemalloc.start (line 4) are not used in this file and should be removed to maintain code cleanliness.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant