Skip to content

From json to pqt#32

Merged
LuisJG8 merged 2 commits intoMyProjectsfrom
from_json_to_pqt
Feb 15, 2026
Merged

From json to pqt#32
LuisJG8 merged 2 commits intoMyProjectsfrom
from_json_to_pqt

Conversation

@LuisJG8
Copy link
Copy Markdown
Owner

@LuisJG8 LuisJG8 commented Feb 15, 2026

No description provided.

@LuisJG8
Copy link
Copy Markdown
Owner Author

LuisJG8 commented Feb 15, 2026

/gemini review

@LuisJG8 LuisJG8 merged commit ce90257 into MyProjects Feb 15, 2026
1 check failed
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @LuisJG8, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refactors an existing Airflow DAG to streamline GitHub API interactions and Celery task execution. It also introduces a new Fluent Bit configuration for handling and routing log data, setting up dummy inputs and defining outputs to standard output and a file.

Highlights

  • Airflow DAG Refactoring: Simplified the 'check_rate_limit' task in 'run_queue.py' to use a single GitHub API token instead of two, reducing complexity.
  • Celery Task Dispatch: Removed conditional logic and a fixed 'time.sleep' from the 'run_queue' task, allowing the 'worker.get_github_data' Celery task to be dispatched directly.
  • Fluent Bit Configuration: Introduced a new 'fluent-bit.yml' file, defining dummy log inputs and configuring outputs to 'stdout' and a specific log file ('workshop-INFO.log').

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • airflow/dags/run_queue.py
    • Imported 'timeout' from the 'socket' module.
    • Refactored the 'check_rate_limit' task to use a single GitHub API token.
    • Removed conditional execution and a 'time.sleep' call from the 'run_queue' task, directly invoking the Celery task.
    • Removed 'celery_worker' variable assignment and associated print statements.
  • fluent-bit/fluent-bit.yml
    • Added a new Fluent Bit configuration file.
    • Defined two dummy input sources for INFO and ERROR messages.
    • Configured outputs to 'stdout' for all messages and to 'workshop-INFO.log' for messages tagged '*.info'.
Activity
  • No human activity has been recorded on this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the data processing pipeline, changing the final output format from JSON to Parquet. It also modifies the Airflow DAG to trigger the Celery worker and adds a Fluent Bit configuration for logging.

My review has identified a few issues:

  • In airflow/dags/run_queue.py, there's an unused import and some dead code that should be removed for clarity. The logging of the Celery task creation could also be more informative.
  • The new fluent-bit/fluent-bit.yml file contains a critical configuration error with a duplicate key that will cause logs to be written to the wrong location. It also has a potentially misconfigured dummy input for testing purposes.

I've provided specific suggestions to address these points.

Comment on lines +17 to +19
file: /data
match: '*.info'
file: workshop-INFO.log
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a duplicate file key in this output configuration. The first instance on line 17 should be path to specify the directory for the log file. With the current configuration, file: /data will be overwritten by the subsequent file key, causing the log file to be created in Fluent Bit's current working directory instead of the intended /data directory.

      path: /data
      match: '*.info'
      file: workshop-INFO.log

@@ -1,4 +1,5 @@
import os
from socket import timeout
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout object from the socket module is imported but never used in this file. It's best practice to remove unused imports to keep the code clean and avoid potential confusion.

Comment on lines 45 to 46
rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining")
max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variables rate_limit and max_total_api_calls are assigned values from XComs but are not used anywhere in the run_queue task. This is dead code and should be removed to improve clarity and maintainability.

Comment on lines +49 to +51
app.send_task("worker.get_github_data")

print(celery_worker)

time.sleep(500)
print("celery_worker")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The statement print("celery_worker") is not very informative for logging or debugging. Previously, the Celery task object was printed. Consider capturing the result of app.send_task and logging the task ID for better traceability. Also, it's recommended to use Airflow's logging mechanism instead of print for better integration with the Airflow UI.

Suggested change
app.send_task("worker.get_github_data")
print(celery_worker)
time.sleep(500)
print("celery_worker")
celery_task = app.send_task("worker.get_github_data")
print(f"Celery task sent with ID: {celery_task.id}")

Comment on lines +7 to +9
- name: dummy
tag: workshop.info
dummy: '{"message": "ERROR message"}'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This dummy input generates a record with the message 'ERROR message' but tags it as workshop.info. This will cause it to be routed to the workshop-INFO.log file, which is likely not the intended behavior for an error message. Consider changing the tag to something like workshop.error to allow for separate routing of error messages.

    - name: dummy
      tag: workshop.error
      dummy: '{"message": "ERROR message"}'

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the data processing pipeline, seemingly to switch from JSON to Parquet format as the title suggests. The changes in the Airflow DAG simplify task triggering but introduce some code quality issues like an unused import, unused variables, and debug print statements. A new Fluent Bit configuration file is added, but it contains a critical misconfiguration with a duplicate key and a logical issue with input tags. My review includes specific suggestions to address these points and improve the code quality and correctness of the configuration.

Comment on lines +17 to +19
file: /data
match: '*.info'
file: workshop-INFO.log
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This output block has a duplicate file key. To specify the output directory, you should use path. The current configuration is incorrect as the key on line 19 will override line 17.

      path: /data
      match: '*.info'
      file: workshop-INFO.log

@@ -1,4 +1,5 @@
import os
from socket import timeout
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout import from socket is unused in this file and should be removed to improve code clarity and reduce clutter.

print(rate_limit)

celery_worker = app.send_task("worker.get_github_data")
app.send_task("worker.get_github_data")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

With the removal of the if condition that used rate_limit, the variables rate_limit and max_total_api_calls (fetched on lines 45-46) are no longer used in this task. Please remove the XCom pulls for these variables to clean up the code and avoid unnecessary operations.

Comment on lines 50 to +51

print(celery_worker)

time.sleep(500)
print("celery_worker")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This print statement and the preceding blank line appear to be for debugging. For production code, it's better to use Airflow's logging mechanism. Please remove these lines if they are not needed for production.

dummy: '{"message": "INFO message"}'

- name: dummy
tag: workshop.info
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This dummy input generates an 'ERROR message' (on line 9) but uses the tag workshop.info. To distinguish between message types for routing and processing, it's best to use a distinct tag, such as workshop.error.

      tag: workshop.error

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant