Enhance Airflow and Celery integration with improved logging and Docker configuration#34
Conversation
|
/gemini review |
Summary of ChangesHello @LuisJG8, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request focuses on enhancing the robustness and maintainability of the Airflow and Celery integration. It streamlines the Airflow DAG structure, introduces a standardized and configurable logging system for Celery workers, and improves Docker deployment by configuring log rotation. These changes collectively lead to more reliable task execution and better observability of the distributed system. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request enhances the Airflow and Celery integration by improving logging and Docker configuration. The changes include adding a centralized Celery logging configuration, setting up log rotation in Docker, and refactoring the Airflow DAG and Celery worker. My review has identified a few issues: a critical piece of debug code left in the worker that limits processing, a change that breaks the data flow in a Celery chord, inefficient initialization of GitHub clients, and some inconsistencies in logging. I've also pointed out an unused import and hardcoded values that could be improved.
| if counter >= 5: | ||
| break |
| # bind = True allows to get task data, like task id | ||
| @app.task(bind=True) | ||
| def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, github_instance: Github = gh): | ||
| def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500): |
There was a problem hiding this comment.
The return mylist statement was removed from this function. This breaks the Celery chord (build_repo_chord) which relies on this task's return value to pass to aggregate_results. Without it, aggregate_results will receive None and the final result will be incorrect. Please add return mylist at the end of the function.
| api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT") | ||
| auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two) | ||
| gh, gh_two = Github(auth=auth), Github(auth=auth_two) |
There was a problem hiding this comment.
|
|
||
|
|
||
| app.send_task("worker.get_github_data") | ||
| app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": 1000, "batch_size": 500}) |
| @@ -0,0 +1,23 @@ | |||
| import logging | |||
| import os | |||
| from math import log | |||
| except GithubException as ge: | ||
| if ge.status == 403: | ||
| print(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") | ||
| logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") |
There was a problem hiding this comment.
For consistency with the rest of the file and to leverage Celery's task-specific logging, please use the task logger logger.exception instead of logging.exception.
| logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") | |
| logger.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") |
| logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") | ||
| else: | ||
| print(f"GitHub API error {ge.status} for repo, skipping...") | ||
| logging.exception(f"GitHub API error {ge.status} for repo, skipping...") |
| continue | ||
| except Exception as e: | ||
| print(f"Error accessing repo data: {e}, skipping...") | ||
| logging.exception(f"Error accessing repo data: {e}, skipping...") |
There was a problem hiding this comment.
Code Review
This pull request introduces several enhancements to the Airflow and Celery integration, primarily focusing on improved logging and Docker configuration. The changes to configure Celery logging and Docker log rotation are excellent improvements. However, I've identified a few issues in the implementation, including a critical bug in worker.py where a change breaks the data aggregation logic. There are also some leftover debugging statements and inconsistencies in logging that should be addressed. My review provides specific suggestions to fix these issues and improve the overall code quality.
|
|
||
|
|
||
| # s3_url = save_to_s3(data=repo_collection, file_directory="github_repos/test.json") | ||
| logger.info(f"Processed {counter} repositories") |
There was a problem hiding this comment.
The get_github_data task no longer returns the mylist of collected repositories because return mylist was removed. This breaks the chord defined in build_repo_chord, as the aggregate_results task will receive a list of None values instead of the repository data. Please restore return mylist at the end of this function to fix the aggregation logic.
| if counter >= 5: | ||
| break |
| rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining") | ||
| max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total") |
|
|
||
|
|
||
| app.send_task("worker.get_github_data") | ||
| app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": 1000, "batch_size": 500}) |
There was a problem hiding this comment.
The arguments start_in_repo_num and batch_size are hardcoded. This reduces the flexibility of the DAG. Consider making these values configurable, for instance, by using Airflow variables or passing them as part of the DAG run configuration. The hardcoded start_in_repo_num: 1000 is particularly concerning as it might not be the intended starting point for all runs.
| @@ -0,0 +1,23 @@ | |||
| import logging | |||
| import os | |||
| from math import log | |||
| except GithubException as ge: | ||
| if ge.status == 403: | ||
| print(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") | ||
| logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") | ||
| else: | ||
| print(f"GitHub API error {ge.status} for repo, skipping...") | ||
| logging.exception(f"GitHub API error {ge.status} for repo, skipping...") | ||
| continue | ||
| except Exception as e: | ||
| print(f"Error accessing repo data: {e}, skipping...") | ||
| logging.exception(f"Error accessing repo data: {e}, skipping...") | ||
| continue |
There was a problem hiding this comment.
You're using logging.exception here, but a task-specific logger logger was obtained via get_task_logger. For consistency and to ensure logs have the proper Celery task context, you should use logger.exception instead. This is already done correctly in other parts of the file (e.g., line 156).
| except GithubException as ge: | |
| if ge.status == 403: | |
| print(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") | |
| logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") | |
| else: | |
| print(f"GitHub API error {ge.status} for repo, skipping...") | |
| logging.exception(f"GitHub API error {ge.status} for repo, skipping...") | |
| continue | |
| except Exception as e: | |
| print(f"Error accessing repo data: {e}, skipping...") | |
| logging.exception(f"Error accessing repo data: {e}, skipping...") | |
| continue | |
| except GithubException as ge: | |
| if ge.status == 403: | |
| logger.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") | |
| else: | |
| logger.exception(f"GitHub API error {ge.status} for repo, skipping...") | |
| continue | |
| except Exception as e: | |
| logger.exception(f"Error accessing repo data: {e}, skipping...") | |
| continue |
| except Exception as validation_error: | ||
| print(f"Validation error for repo {github_data_points.get('full_name')}: {validation_error}") | ||
| print("Skipping this repo and continuing") | ||
| continue |
There was a problem hiding this comment.
These print statements should be converted to logger calls (e.g., logger.error or logger.warning) to be consistent with the logging improvements made elsewhere in this file. This ensures all output is handled by the configured logging system.
| except Exception as validation_error: | |
| print(f"Validation error for repo {github_data_points.get('full_name')}: {validation_error}") | |
| print("Skipping this repo and continuing") | |
| continue | |
| except Exception as validation_error: | |
| logger.error(f"Validation error for repo {github_data_points.get('full_name')}: {validation_error}") | |
| logger.warning("Skipping this repo and continuing") | |
| continue |
No description provided.