From 663a64a6309d1ccc85f5e446339a5e8d7dfadd06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Forr=C3=B3?= Date: Thu, 25 Jun 2026 16:35:38 +0200 Subject: [PATCH 1/4] Add async task multiplexing for concurrent task processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nikola Forró Assisted-by: Claude Opus 4.6 via Claude Code --- compose.yaml | 1 + openshift/configmap-agents-env.yml | 1 + ymir/agents/backport_agent.py | 23 +++++------ ymir/agents/rebase_agent.py | 23 +++++------ ymir/agents/rebuild_agent.py | 25 +++++------- ymir/agents/triage_agent.py | 41 +++++++------------ ymir/common/base_utils.py | 65 +++++++++++++++++++++++++++++- 7 files changed, 112 insertions(+), 67 deletions(-) diff --git a/compose.yaml b/compose.yaml index 5ac55acdc..407bc7c40 100644 --- a/compose.yaml +++ b/compose.yaml @@ -7,6 +7,7 @@ x-beeai-env: &beeai-env COLLECTOR_ENDPOINT: http://otel-collector:4318/v1/traces GIT_REPO_BASEPATH: /git-repos MAX_RETRIES: 3 + MAX_CONCURRENT_TASKS: ${MAX_CONCURRENT_TASKS:-1} DRY_RUN: ${DRY_RUN:-false} JIRA_DRY_RUN: ${JIRA_DRY_RUN:-false} JIRA_ALLOW_STATUS_CHANGES: ${JIRA_ALLOW_STATUS_CHANGES:-false} diff --git a/openshift/configmap-agents-env.yml b/openshift/configmap-agents-env.yml index 82f494a9a..66d7274d1 100644 --- a/openshift/configmap-agents-env.yml +++ b/openshift/configmap-agents-env.yml @@ -1,6 +1,7 @@ apiVersion: v1 data: MAX_RETRIES: "3" + MAX_CONCURRENT_TASKS: "1" GIT_REPO_BASEPATH: /git-repos # The maximum number of retries for a single step in the agent execution. BEEAI_MAX_RETRIES_PER_STEP: "5" diff --git a/ymir/agents/backport_agent.py b/ymir/agents/backport_agent.py index c73cb8684..44c0b202f 100644 --- a/ymir/agents/backport_agent.py +++ b/ymir/agents/backport_agent.py @@ -44,7 +44,7 @@ run_tool, wrap_details, ) -from ymir.common.base_utils import fix_await, redis_client +from ymir.common.base_utils import fix_await, redis_client, run_task_loop from ymir.common.constants import JiraLabels, RedisQueues from ymir.common.logging_setup import configure_logging from ymir.common.mock_repos import get_mock_local_tool_env @@ -802,6 +802,7 @@ async def main() -> None: return logger.info("Starting backport agent in queue mode") + max_concurrent_tasks = int(os.getenv("MAX_CONCURRENT_TASKS", 1)) async with redis_client(os.environ["REDIS_URL"]) as redis: max_retries = int(os.getenv("MAX_RETRIES", 3)) # Determine which backport queue to listen to based on container version @@ -818,18 +819,7 @@ async def main() -> None: f"listening to queues: [{backport_queue_todo}, {backport_queue}]" ) - while True: - redis_logger.info( - f"Waiting for tasks from [{backport_queue_todo}, {backport_queue}] (timeout: 30s)..." - ) - element = await fix_await(redis.brpop([backport_queue_todo, backport_queue], timeout=30)) - if element is None: - redis_logger.info("No tasks received, continuing to wait...") - continue - - _, payload = element - redis_logger.info("Received task from queue.") - + async def process_task(payload): task = Task.model_validate_json(payload) triage_state = task.metadata backport_data = BackportData.model_validate(triage_state["triage_result"]["data"]) @@ -969,6 +959,13 @@ async def retry( ).model_dump_json(), ) + await run_task_loop( + redis, + [backport_queue_todo, backport_queue], + process_task, + max_concurrent=max_concurrent_tasks, + ) + if __name__ == "__main__": try: diff --git a/ymir/agents/rebase_agent.py b/ymir/agents/rebase_agent.py index 7b932b8d5..7d6589281 100644 --- a/ymir/agents/rebase_agent.py +++ b/ymir/agents/rebase_agent.py @@ -39,7 +39,7 @@ resolve_chat_model_override, wrap_details, ) -from ymir.common.base_utils import fix_await, redis_client +from ymir.common.base_utils import fix_await, redis_client, run_task_loop from ymir.common.constants import JiraLabels, RedisQueues from ymir.common.logging_setup import configure_logging from ymir.common.mock_repos import get_mock_local_tool_env @@ -427,6 +427,7 @@ async def comment_in_jira(state): return logger.info("Starting rebase agent in queue mode") + max_concurrent_tasks = int(os.getenv("MAX_CONCURRENT_TASKS", 1)) async with redis_client(os.environ["REDIS_URL"]) as redis: max_retries = int(os.getenv("MAX_RETRIES", 3)) # Determine which rebase queue to listen to based on container version @@ -443,18 +444,7 @@ async def comment_in_jira(state): f"listening to queues: [{rebase_queue_todo}, {rebase_queue}]" ) - while True: - redis_logger.info( - f"Waiting for tasks from [{rebase_queue_todo}, {rebase_queue}] (timeout: 30s)..." - ) - element = await fix_await(redis.brpop([rebase_queue_todo, rebase_queue], timeout=30)) - if element is None: - redis_logger.info("No tasks received, continuing to wait...") - continue - - _, payload = element - redis_logger.info("Received task from queue.") - + async def process_task(payload): task = Task.model_validate_json(payload) triage_state = task.metadata rebase_data = RebaseData.model_validate(triage_state["triage_result"]["data"]) @@ -585,6 +575,13 @@ async def retry( ).model_dump_json(), ) + await run_task_loop( + redis, + [rebase_queue_todo, rebase_queue], + process_task, + max_concurrent=max_concurrent_tasks, + ) + if __name__ == "__main__": try: diff --git a/ymir/agents/rebuild_agent.py b/ymir/agents/rebuild_agent.py index d05498e5e..d817a3e0f 100644 --- a/ymir/agents/rebuild_agent.py +++ b/ymir/agents/rebuild_agent.py @@ -24,7 +24,7 @@ resolve_chat_model_override, run_subprocess, ) -from ymir.common.base_utils import fix_await, redis_client +from ymir.common.base_utils import fix_await, redis_client, run_task_loop from ymir.common.constants import JiraLabels, RedisQueues from ymir.common.logging_setup import configure_logging from ymir.common.mock_repos import get_mock_local_tool_env @@ -332,6 +332,7 @@ async def comment_in_jira(state): # Queue mode logger.info("Starting rebuild agent in queue mode") + max_concurrent_tasks = int(os.getenv("MAX_CONCURRENT_TASKS", 1)) async with redis_client(os.environ["REDIS_URL"]) as redis: max_retries = int(os.getenv("MAX_RETRIES", 3)) container_version = os.getenv("CONTAINER_VERSION", "c10s") @@ -347,18 +348,7 @@ async def comment_in_jira(state): f"listening to queues: [{rebuild_queue_todo}, {rebuild_queue}]" ) - while True: - redis_logger.info( - f"Waiting for tasks from [{rebuild_queue_todo}, {rebuild_queue}] (timeout: 30s)..." - ) - element = await fix_await(redis.brpop([rebuild_queue_todo, rebuild_queue], timeout=30)) - if element is None: - redis_logger.info("No tasks received, continuing to wait...") - continue - - _, payload = element - redis_logger.info("Received task from queue.") - + async def process_task(payload): try: task = Task.model_validate_json(payload) triage_state = task.metadata @@ -375,7 +365,7 @@ async def comment_in_jira(state): ).model_dump_json(), ) ) - continue + return logger.info( f"Processing rebuild for package: {rebuild_data.package}, " @@ -525,6 +515,13 @@ async def retry( ).model_dump_json(), ) + await run_task_loop( + redis, + [rebuild_queue_todo, rebuild_queue], + process_task, + max_concurrent=max_concurrent_tasks, + ) + if __name__ == "__main__": try: diff --git a/ymir/agents/triage_agent.py b/ymir/agents/triage_agent.py index bb9620184..388ceb13b 100644 --- a/ymir/agents/triage_agent.py +++ b/ymir/agents/triage_agent.py @@ -34,7 +34,7 @@ resolve_chat_model_override, run_tool, ) -from ymir.common.base_utils import fix_await, redis_client +from ymir.common.base_utils import fix_await, redis_client, run_task_loop from ymir.common.config import load_rhel_config from ymir.common.constants import JiraLabels, RedisQueues from ymir.common.logging_setup import configure_logging @@ -819,30 +819,12 @@ async def main() -> None: return logger.info(f"Starting triage agent in queue mode (AUTO_CHAIN={'enabled' if auto_chain else 'disabled'})") + max_concurrent_tasks = int(os.getenv("MAX_CONCURRENT_TASKS", 1)) async with redis_client(os.environ["REDIS_URL"]) as redis: max_retries = int(os.getenv("MAX_RETRIES", 3)) redis_logger.info(f"Connected to Redis, max retries set to {max_retries}") - while True: - redis_logger.info( - "Waiting for tasks from triage_queue_todo (priority) and triage_queue (timeout: 30s)..." - ) - # Multi-key BRPOP serves the first non-empty list in order, so - # ymir_todo-triggered tasks (TRIAGE_QUEUE_TODO) jump ahead of - # normal-flow tasks (TRIAGE_QUEUE). - element = await fix_await( - redis.brpop( - [RedisQueues.TRIAGE_QUEUE_TODO.value, RedisQueues.TRIAGE_QUEUE.value], - timeout=30, - ) - ) - if element is None: - redis_logger.info("No tasks received, continuing to wait...") - continue - - _, payload = element - redis_logger.info("Received task from queue") - + async def process_task(payload): task = Task.model_validate_json(payload) input = InputSchema.model_validate(task.metadata) user_triggered = task.user_triggered @@ -881,7 +863,7 @@ async def main() -> None: f"Skipping duplicate triage for {input.issue} — " f"already has labels: {terminal_ymir_labels}" ) - continue + return async def retry(task, error, input=input, user_triggered=user_triggered): task.attempts += 1 @@ -968,7 +950,7 @@ async def retry(task, error, input=input, user_triggered=user_triggered): # ~7s, so we're past transient blips. Typical Jira outages last # minutes; cycling faster just spams the API. await asyncio.sleep(60) - continue + return try: logger.info(f"Starting triage processing for {input.issue}") @@ -1053,10 +1035,10 @@ async def retry(task, error, input=input, user_triggered=user_triggered): if auto_chain: if output.resolution == Resolution.OPEN_ENDED_ANALYSIS: queue = RedisQueues.OPEN_ENDED_ANALYSIS_LIST.value - payload = output.data.model_dump_json() + downstream_payload = output.data.model_dump_json() else: task = Task(metadata=state.model_dump(), user_triggered=user_triggered) - payload = task.model_dump_json() + downstream_payload = task.model_dump_json() if output.resolution == Resolution.REBASE: queue = RedisQueues.get_rebase_queue_for_branch( state.target_branch, task.user_triggered @@ -1071,11 +1053,18 @@ async def retry(task, error, input=input, user_triggered=user_triggered): ) else: queue = RedisQueues.CLARIFICATION_NEEDED_QUEUE.value - await fix_await(redis.lpush(queue, payload)) + await fix_await(redis.lpush(queue, downstream_payload)) logger.info(f"Pushed {input.issue} to {queue}") else: logger.info(f"AUTO_CHAIN disabled, skipping downstream queue for {input.issue}") + await run_task_loop( + redis, + [RedisQueues.TRIAGE_QUEUE_TODO.value, RedisQueues.TRIAGE_QUEUE.value], + process_task, + max_concurrent=max_concurrent_tasks, + ) + if __name__ == "__main__": try: diff --git a/ymir/common/base_utils.py b/ymir/common/base_utils.py index 5b7e33096..af8c226f8 100644 --- a/ymir/common/base_utils.py +++ b/ymir/common/base_utils.py @@ -6,7 +6,7 @@ import re import shlex import subprocess -from collections.abc import AsyncGenerator, Awaitable +from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine from contextlib import asynccontextmanager from pathlib import Path from typing import TypeVar @@ -14,6 +14,7 @@ import redis.asyncio as redis logger = logging.getLogger(__name__) +task_loop_logger = logging.getLogger("agent.task_loop") T = TypeVar("T") @@ -61,6 +62,68 @@ async def redis_client(redis_url: str) -> AsyncGenerator[redis.Redis]: logger.debug("Disconnected from Redis") +async def run_task_loop( + redis_conn: redis.Redis, + queues: list[str], + process_fn: Callable[[bytes], Coroutine], + max_concurrent: int = 1, + poll_timeout: int = 30, +) -> None: + """Run a concurrent task loop that pops tasks from Redis queues. + + Acquires a semaphore slot before popping to ensure we never hold more + tasks in memory than we can process, preventing task loss on crash. + """ + if max_concurrent < 1: + raise ValueError("max_concurrent must be at least 1") + + sem = asyncio.Semaphore(max_concurrent) + active: set[asyncio.Task] = set() + + task_loop_logger.info( + "Task loop started: listening on %s, max_concurrent=%d", + queues, + max_concurrent, + ) + + async def _run(payload: bytes) -> None: + try: + await process_fn(payload) + except Exception: + logger.exception("Unhandled exception in task processing") + finally: + sem.release() + + try: + while True: + await sem.acquire() + element = await fix_await(redis_conn.brpop(queues, timeout=poll_timeout)) + if element is None: + sem.release() + continue + + _, payload = element + task_loop_logger.info("Received task from queue.") + + t = asyncio.create_task(_run(payload)) + active.add(t) + t.add_done_callback(active.discard) + except asyncio.CancelledError: + if active: + task_loop_logger.info( + "Task loop cancelled. Waiting for %d active tasks to complete...", + len(active), + ) + await asyncio.shield(asyncio.gather(*active, return_exceptions=True)) + raise + finally: + if active: + task_loop_logger.info("Cancelling %d active tasks...", len(active)) + for t in active: + t.cancel() + await asyncio.gather(*active, return_exceptions=True) + + def get_jira_auth_headers() -> dict[str, str]: """Build Jira API authentication headers. From 4073968f4cbc9a44c9eb4790bd74aa8ec380b0c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Forr=C3=B3?= Date: Thu, 25 Jun 2026 16:37:14 +0200 Subject: [PATCH 2/4] Improve logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nikola Forró Assisted-by: Claude Opus 4.6 via Claude Code --- compose.yaml | 1 + openshift/configmap-agents-env.yml | 1 + ymir/agents/backport_agent.py | 7 +- ymir/agents/build_agent.py | 3 +- ymir/agents/cve_applicability_agent.py | 3 +- ymir/agents/log_agent.py | 3 +- ymir/agents/merge_request_agent.py | 4 +- ymir/agents/observability.py | 17 ++- ymir/agents/preliminary_testing_agent.py | 4 +- ymir/agents/rebase_agent.py | 7 +- ymir/agents/rebuild_agent.py | 5 +- ymir/agents/triage_agent.py | 7 +- ymir/common/base_utils.py | 8 ++ ymir/common/logging_setup.py | 134 ++++++++++++++++++++++- ymir/common/utils.py | 5 +- 15 files changed, 178 insertions(+), 31 deletions(-) diff --git a/compose.yaml b/compose.yaml index 407bc7c40..48a5e3838 100644 --- a/compose.yaml +++ b/compose.yaml @@ -8,6 +8,7 @@ x-beeai-env: &beeai-env GIT_REPO_BASEPATH: /git-repos MAX_RETRIES: 3 MAX_CONCURRENT_TASKS: ${MAX_CONCURRENT_TASKS:-1} + LOG_BUFFER_SIZE: ${LOG_BUFFER_SIZE:-0} DRY_RUN: ${DRY_RUN:-false} JIRA_DRY_RUN: ${JIRA_DRY_RUN:-false} JIRA_ALLOW_STATUS_CHANGES: ${JIRA_ALLOW_STATUS_CHANGES:-false} diff --git a/openshift/configmap-agents-env.yml b/openshift/configmap-agents-env.yml index 66d7274d1..16d140edd 100644 --- a/openshift/configmap-agents-env.yml +++ b/openshift/configmap-agents-env.yml @@ -2,6 +2,7 @@ apiVersion: v1 data: MAX_RETRIES: "3" MAX_CONCURRENT_TASKS: "1" + LOG_BUFFER_SIZE: "0" GIT_REPO_BASEPATH: /git-repos # The maximum number of retries for a single step in the agent execution. BEEAI_MAX_RETRIES_PER_STEP: "5" diff --git a/ymir/agents/backport_agent.py b/ymir/agents/backport_agent.py index 44c0b202f..17fbf5651 100644 --- a/ymir/agents/backport_agent.py +++ b/ymir/agents/backport_agent.py @@ -46,7 +46,7 @@ ) from ymir.common.base_utils import fix_await, redis_client, run_task_loop from ymir.common.constants import JiraLabels, RedisQueues -from ymir.common.logging_setup import configure_logging +from ymir.common.logging_setup import configure_logging, current_jira_issue, get_trajectory_writeable from ymir.common.mock_repos import get_mock_local_tool_env from ymir.common.models import ( BackportData, @@ -186,7 +186,7 @@ async def create_backport_agent( only_success_invocations=False, ), ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + middlewares=[GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable())], role="Red Hat Enterprise Linux developer", instructions=await get_instructions(fix_version), ) @@ -766,7 +766,7 @@ async def comment_in_jira(state): async def main() -> None: init_sentry() - configure_logging(level=logging.INFO) + configure_logging(level=logging.INFO, buffer_size=int(os.getenv("LOG_BUFFER_SIZE", 0))) resolve_chat_model_override("backport") span_processor = setup_observability(os.environ["COLLECTOR_ENDPOINT"]) @@ -823,6 +823,7 @@ async def process_task(payload): task = Task.model_validate_json(payload) triage_state = task.metadata backport_data = BackportData.model_validate(triage_state["triage_result"]["data"]) + current_jira_issue.set(backport_data.jira_issue) dist_git_branch = triage_state["target_branch"] user_triggered = task.user_triggered logger.info( diff --git a/ymir/agents/build_agent.py b/ymir/agents/build_agent.py index 0160c27ca..c0c9922ea 100644 --- a/ymir/agents/build_agent.py +++ b/ymir/agents/build_agent.py @@ -16,6 +16,7 @@ is_reasoning_enabled, render_template, ) +from ymir.common.logging_setup import get_trajectory_writeable from ymir.tools.unprivileged.commands import RunShellCommandTool from ymir.tools.unprivileged.filesystem import GetCWDTool from ymir.tools.unprivileged.text import ( @@ -68,7 +69,7 @@ def create_build_agent(mcp_tools: list[Tool], local_tool_options: dict[str, Any] ConditionalRequirement("download_artifacts", only_after=["build_package"]), ConditionalRequirement("extract_log_snippets", only_after=["download_artifacts"]), ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + middlewares=[GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable())], role="Red Hat Enterprise Linux developer", instructions=get_instructions(), ) diff --git a/ymir/agents/cve_applicability_agent.py b/ymir/agents/cve_applicability_agent.py index 3dbb48718..663385b2e 100644 --- a/ymir/agents/cve_applicability_agent.py +++ b/ymir/agents/cve_applicability_agent.py @@ -12,6 +12,7 @@ from ymir.agents.reasoning_agent import ReasoningAgent from ymir.agents.utils import get_chat_model, get_tool_call_checker_config, is_reasoning_enabled +from ymir.common.logging_setup import get_trajectory_writeable from ymir.common.models import Resolution from ymir.tools.unprivileged.commands import RunShellCommandTool from ymir.tools.unprivileged.text import SearchTextTool, ViewTool @@ -45,7 +46,7 @@ def create_applicability_agent( only_success_invocations=False, ), ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + middlewares=[GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable())], role="Red Hat security analyst", ) diff --git a/ymir/agents/log_agent.py b/ymir/agents/log_agent.py index b3cd75bc7..768dba754 100644 --- a/ymir/agents/log_agent.py +++ b/ymir/agents/log_agent.py @@ -16,6 +16,7 @@ is_reasoning_enabled, render_template, ) +from ymir.common.logging_setup import get_trajectory_writeable from ymir.tools.unprivileged.commands import RunShellCommandTool from ymir.tools.unprivileged.filesystem import GetCWDTool from ymir.tools.unprivileged.specfile import AddChangelogEntryTool @@ -66,7 +67,7 @@ def create_log_agent(_: list[Tool], local_tool_options: dict[str, Any]) -> Reaso only_success_invocations=False, ), ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + middlewares=[GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable())], role="Red Hat Enterprise Linux developer", instructions=get_instructions(), ) diff --git a/ymir/agents/merge_request_agent.py b/ymir/agents/merge_request_agent.py index 7c410b92b..36a106ff9 100644 --- a/ymir/agents/merge_request_agent.py +++ b/ymir/agents/merge_request_agent.py @@ -34,7 +34,7 @@ mcp_tools, render_template, ) -from ymir.common.logging_setup import configure_logging +from ymir.common.logging_setup import configure_logging, get_trajectory_writeable from ymir.common.models import ( BuildInputSchema, BuildOutputSchema, @@ -96,7 +96,7 @@ def create_merge_request_agent(mcp_tools: list[Tool], local_tool_options: dict[s only_success_invocations=False, ), ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + middlewares=[GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable())], role="Red Hat Enterprise Linux developer", instructions=get_instructions(), ) diff --git a/ymir/agents/observability.py b/ymir/agents/observability.py index 8ffa40f0e..5bb531c20 100644 --- a/ymir/agents/observability.py +++ b/ymir/agents/observability.py @@ -1,6 +1,5 @@ import atexit import contextlib -from contextvars import ContextVar import sentry_sdk from openinference.instrumentation.beeai import BeeAIInstrumentor @@ -12,21 +11,21 @@ from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor from opentelemetry.sdk.trace.export import BatchSpanProcessor +from ymir.common.logging_setup import current_jira_issue -class AgentSpanProcessor(SpanProcessor): - _jira_issue_var: ContextVar[str | None] = ContextVar("jira_issue", default=None) +class AgentSpanProcessor(SpanProcessor): def set_jira_issue(self, jira_issue: str | None) -> None: - self._jira_issue_var.set(jira_issue) + current_jira_issue.set(jira_issue) @contextlib.contextmanager def jira_issue_context(self, jira_issue: str | None): """Set the jira issue attribute on all spans created within the context.""" - token = self._jira_issue_var.set(jira_issue) + token = current_jira_issue.set(jira_issue) try: yield finally: - self._jira_issue_var.reset(token) + current_jira_issue.reset(token) @contextlib.contextmanager def start_transaction( @@ -40,15 +39,15 @@ def start_transaction( transaction.set_data("workflow", workflow) transaction.set_data("jira_issue", jira_issue) - token = self._jira_issue_var.set(jira_issue) + token = current_jira_issue.set(jira_issue) try: yield finally: - self._jira_issue_var.reset(token) + current_jira_issue.reset(token) def on_start(self, span: Span, parent_context: Context | None = None) -> None: if span.is_recording(): - jira_issue = self._jira_issue_var.get() + jira_issue = current_jira_issue.get() if jira_issue: span.set_attribute("jira.issue", jira_issue) diff --git a/ymir/agents/preliminary_testing_agent.py b/ymir/agents/preliminary_testing_agent.py index f82660e21..6811c404a 100644 --- a/ymir/agents/preliminary_testing_agent.py +++ b/ymir/agents/preliminary_testing_agent.py @@ -30,7 +30,7 @@ render_template, run_tool, ) -from ymir.common.logging_setup import configure_logging +from ymir.common.logging_setup import configure_logging, get_trajectory_writeable from ymir.tools.unprivileged.greenwave import FetchGreenWaveTool, FetchTestingFarmResultsTool logger = logging.getLogger(__file__) @@ -104,7 +104,7 @@ def create_preliminary_testing_agent(gateway_tools: list) -> ReasoningAgent: only_success_invocations=False, ), ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + middlewares=[GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable())], ) diff --git a/ymir/agents/rebase_agent.py b/ymir/agents/rebase_agent.py index 7d6589281..f04bc916c 100644 --- a/ymir/agents/rebase_agent.py +++ b/ymir/agents/rebase_agent.py @@ -41,7 +41,7 @@ ) from ymir.common.base_utils import fix_await, redis_client, run_task_loop from ymir.common.constants import JiraLabels, RedisQueues -from ymir.common.logging_setup import configure_logging +from ymir.common.logging_setup import configure_logging, current_jira_issue, get_trajectory_writeable from ymir.common.mock_repos import get_mock_local_tool_env from ymir.common.models import ( BuildInputSchema, @@ -110,7 +110,7 @@ def create_rebase_agent(mcp_tools: list[Tool], local_tool_options: dict[str, Any only_success_invocations=False, ), ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + middlewares=[GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable())], role="Red Hat Enterprise Linux developer", instructions=get_instructions(), ) @@ -119,7 +119,7 @@ def create_rebase_agent(mcp_tools: list[Tool], local_tool_options: dict[str, Any async def main() -> None: init_sentry() - configure_logging(level=logging.INFO) + configure_logging(level=logging.INFO, buffer_size=int(os.getenv("LOG_BUFFER_SIZE", 0))) resolve_chat_model_override("rebase") span_processor = setup_observability(os.environ["COLLECTOR_ENDPOINT"]) @@ -448,6 +448,7 @@ async def process_task(payload): task = Task.model_validate_json(payload) triage_state = task.metadata rebase_data = RebaseData.model_validate(triage_state["triage_result"]["data"]) + current_jira_issue.set(rebase_data.jira_issue) dist_git_branch = triage_state["target_branch"] user_triggered = task.user_triggered logger.info( diff --git a/ymir/agents/rebuild_agent.py b/ymir/agents/rebuild_agent.py index d817a3e0f..8e6d2d56d 100644 --- a/ymir/agents/rebuild_agent.py +++ b/ymir/agents/rebuild_agent.py @@ -26,7 +26,7 @@ ) from ymir.common.base_utils import fix_await, redis_client, run_task_loop from ymir.common.constants import JiraLabels, RedisQueues -from ymir.common.logging_setup import configure_logging +from ymir.common.logging_setup import configure_logging, current_jira_issue from ymir.common.mock_repos import get_mock_local_tool_env from ymir.common.models import ( ConsolidatedIssue, @@ -45,7 +45,7 @@ async def main() -> None: init_sentry() - configure_logging(level=logging.INFO) + configure_logging(level=logging.INFO, buffer_size=int(os.getenv("LOG_BUFFER_SIZE", 0))) resolve_chat_model_override("rebuild") span_processor = setup_observability(os.environ["COLLECTOR_ENDPOINT"]) @@ -353,6 +353,7 @@ async def process_task(payload): task = Task.model_validate_json(payload) triage_state = task.metadata rebuild_data = RebuildData.model_validate(triage_state["triage_result"]["data"]) + current_jira_issue.set(rebuild_data.jira_issue) dist_git_branch = triage_state["target_branch"] user_triggered = task.user_triggered except Exception as e: diff --git a/ymir/agents/triage_agent.py b/ymir/agents/triage_agent.py index 388ceb13b..920a611fc 100644 --- a/ymir/agents/triage_agent.py +++ b/ymir/agents/triage_agent.py @@ -37,7 +37,7 @@ from ymir.common.base_utils import fix_await, redis_client, run_task_loop from ymir.common.config import load_rhel_config from ymir.common.constants import JiraLabels, RedisQueues -from ymir.common.logging_setup import configure_logging +from ymir.common.logging_setup import configure_logging, current_jira_issue, get_trajectory_writeable from ymir.common.mock_repos import get_mock_local_tool_env from ymir.common.models import ( ApplicabilityResult, @@ -265,7 +265,7 @@ def create_triage_agent(gateway_tools, local_tool_options=None) -> ReasoningAgen ConditionalRequirement("search_jira_issues", only_after=["get_jira_details"]), ConditionalRequirement("zstream_search", only_after=["get_jira_details"]), ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + middlewares=[GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable())], role="Red Hat Enterprise Linux developer", instructions=[ "Be proactive in your search for fixes and do not give up easily.", @@ -791,7 +791,7 @@ async def comment_in_jira(state): async def main() -> None: init_sentry() - configure_logging(level=logging.INFO) + configure_logging(level=logging.INFO, buffer_size=int(os.getenv("LOG_BUFFER_SIZE", 0))) resolve_chat_model_override("triage") span_processor = setup_observability(os.environ["COLLECTOR_ENDPOINT"]) @@ -827,6 +827,7 @@ async def main() -> None: async def process_task(payload): task = Task.model_validate_json(payload) input = InputSchema.model_validate(task.metadata) + current_jira_issue.set(input.issue) user_triggered = task.user_triggered logger.info( f"Processing triage for JIRA issue: {input.issue}, attempt: {task.attempts + 1}" diff --git a/ymir/common/base_utils.py b/ymir/common/base_utils.py index af8c226f8..184b018a0 100644 --- a/ymir/common/base_utils.py +++ b/ymir/common/base_utils.py @@ -13,6 +13,8 @@ import redis.asyncio as redis +from ymir.common.logging_setup import current_jira_issue, flush_task_logs + logger = logging.getLogger(__name__) task_loop_logger = logging.getLogger("agent.task_loop") @@ -92,6 +94,12 @@ async def _run(payload: bytes) -> None: except Exception: logger.exception("Unhandled exception in task processing") finally: + try: + if issue := current_jira_issue.get(): + current_jira_issue.set(None) + flush_task_logs(issue) + except Exception: + logger.exception("Unhandled exception during log flushing") sem.release() try: diff --git a/ymir/common/logging_setup.py b/ymir/common/logging_setup.py index 2b988458c..23c6f3715 100644 --- a/ymir/common/logging_setup.py +++ b/ymir/common/logging_setup.py @@ -1,23 +1,151 @@ """Shared logging configuration for ymir entry points.""" import logging +import sys +import threading from collections.abc import Iterable +from contextvars import ContextVar -LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" +LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s:%(jira_issue)s %(message)s" LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" +current_jira_issue: ContextVar[str | None] = ContextVar("current_jira_issue", default=None) + +_buffered_handler: "BufferedTaskHandler | None" = None + + +class _JiraFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + issue = current_jira_issue.get() + record.jira_issue = f" [{issue}]" if issue else "" + return super().format(record) + + +class BufferedTaskHandler(logging.Handler): + """Handler that buffers formatted log lines per Jira issue. + + Lines without a Jira issue context pass through immediately. + Lines with context are buffered per-issue and flushed when the buffer + reaches `buffer_size` or when `flush_task` is called explicitly. + + Multi-line messages can be kept together by wrapping the corresponding + log calls between `begin_group()` and `end_group()`. While a group is + open the size threshold is deferred so that a multi-line message is + never split across two flushes. + """ + + def __init__(self, buffer_size: int = 50) -> None: + super().__init__() + self._buffer_size = buffer_size + self._buffers: dict[str, list[str]] = {} + self._lock = threading.Lock() + self._group_depth: ContextVar[int] = ContextVar("group_depth", default=0) + + def begin_group(self) -> None: + self._group_depth.set(self._group_depth.get() + 1) + + def end_group(self) -> None: + depth = self._group_depth.get() - 1 + self._group_depth.set(depth) + if depth == 0 and (issue := current_jira_issue.get()): + with self._lock: + if len(self._buffers.get(issue, [])) >= self._buffer_size: + self._flush_issue(issue) + + def emit(self, record: logging.LogRecord) -> None: + msg = self.format(record) + issue = current_jira_issue.get() + if issue is None: + with self._lock: + sys.stdout.write(msg + "\n") + sys.stdout.flush() + return + with self._lock: + buf = self._buffers.setdefault(issue, []) + buf.append(msg) + if self._group_depth.get() == 0 and len(buf) >= self._buffer_size: + self._flush_issue(issue) + + def flush_task(self, issue: str) -> None: + with self._lock: + self._flush_issue(issue) + + def flush(self) -> None: + with self._lock: + for issue in list(self._buffers): + self._flush_issue(issue) + super().flush() + + def close(self) -> None: + self.flush() + super().close() + + def _flush_issue(self, issue: str) -> None: + buf = self._buffers.pop(issue, None) + if buf: + sys.stdout.write("\n".join(buf) + "\n") + sys.stdout.flush() + + +class _LogWriteable: + """Adapter routing `write()` calls through Python logging. + + Satisfies BeeAI's `Writeable` protocol so it can be used as the + `target` for `GlobalTrajectoryMiddleware`. + """ + + def __init__(self) -> None: + self._logger = logging.getLogger("agent.trajectory") + + def write(self, s: str) -> int: + lines = [line for line in s.splitlines() if line] + if not lines: + return len(s) + if len(lines) > 1 and _buffered_handler is not None: + _buffered_handler.begin_group() + try: + for line in lines: + self._logger.info("%s", line) + finally: + if len(lines) > 1 and _buffered_handler is not None: + _buffered_handler.end_group() + return len(s) + + +def get_trajectory_writeable() -> _LogWriteable: + return _LogWriteable() + + +def flush_task_logs(issue: str) -> None: + if _buffered_handler is not None: + _buffered_handler.flush_task(issue) + def configure_logging( level: int = logging.INFO, extra_handlers: Iterable[logging.Handler] | None = None, + buffer_size: int = 0, ) -> None: """Configure the root logger with timestamps and short logger names. Replaces any handlers already attached to the root logger so repeated calls (e.g. across tests) produce a consistent format. + + When `buffer_size` > 0, log lines emitted inside a task context + (`current_jira_issue` set) are buffered per issue and flushed in + contiguous batches of up to `buffer_size` lines. """ - formatter = logging.Formatter(fmt=LOG_FORMAT, datefmt=LOG_DATE_FORMAT) - handlers: list[logging.Handler] = [logging.StreamHandler()] + global _buffered_handler + + formatter = _JiraFormatter(fmt=LOG_FORMAT, datefmt=LOG_DATE_FORMAT) + + if buffer_size > 0: + _buffered_handler = BufferedTaskHandler(buffer_size=buffer_size) + handlers: list[logging.Handler] = [_buffered_handler] + else: + _buffered_handler = None + handlers = [logging.StreamHandler(sys.stdout)] + if extra_handlers: handlers.extend(extra_handlers) for handler in handlers: diff --git a/ymir/common/utils.py b/ymir/common/utils.py index da3939efd..6e750134d 100644 --- a/ymir/common/utils.py +++ b/ymir/common/utils.py @@ -26,6 +26,7 @@ from ymir.common.base_utils import is_cs_branch from ymir.common.constants import BREWHUB_URL, CENTOS_STREAM_KOJIHUB_URL +from ymir.common.logging_setup import get_trajectory_writeable from ymir.common.version_utils import construct_internal_branch_name, parse_rhel_version logger = logging.getLogger(__name__) @@ -82,7 +83,9 @@ async def run_tool( ) -> str | dict | list: if isinstance(tool, str): tool = next(t for t in available_tools or [] if t.name == tool) - output = await tool.run(input=kwargs).middleware(GlobalTrajectoryMiddleware(pretty=True)) + output = await tool.run(input=kwargs).middleware( + GlobalTrajectoryMiddleware(pretty=True, target=get_trajectory_writeable()) + ) match output: case StringToolOutput(): result = output.get_text_content() From 1ba1fcbd9a74dff2793ef1a929e43c47d35b2b83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Forr=C3=B3?= Date: Fri, 26 Jun 2026 13:19:07 +0200 Subject: [PATCH 3/4] Add pre-push hook for auto-bumping package version(s) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nikola Forró --- .pre-commit-config.yaml | 8 +++ scripts/autobump-version.py | 103 ++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 scripts/autobump-version.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 708886357..7d091c143 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -38,3 +38,11 @@ repos: stages: [pre-push] always_run: true pass_filenames: false + - id: autobump-version + name: Auto-bump patch version for changed packages + entry: python scripts/autobump-version.py + language: python + additional_dependencies: [gitpython, tomlkit] + stages: [pre-push] + always_run: true + pass_filenames: false diff --git a/scripts/autobump-version.py b/scripts/autobump-version.py new file mode 100644 index 000000000..01568babf --- /dev/null +++ b/scripts/autobump-version.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +"""Pre-push hook to auto-bump patch version in subpackages with code changes. + +Compares the commits being pushed against the remote base. For each configured +package, if there are code changes but the version hasn't been bumped yet, +bumps the patch version, commits, and aborts the push so the user can push again +with the bump included. +""" + +import sys +from pathlib import Path + +import git +import tomlkit + +PACKAGES = [ + Path("ymir/tools"), + Path("ymir/common"), +] + + +def get_base(repo: git.Repo, local_sha: str, remote_sha: str) -> git.Commit | None: + if remote_sha != "0" * 40: + return repo.commit(remote_sha) + merge_base = repo.merge_base(local_sha, "main") + return merge_base[0] if merge_base else None + + +def get_version_at_ref(commit: git.Commit, pyproject: Path) -> str | None: + try: + blob = commit.tree / str(pyproject) + except KeyError: + return None + data = tomlkit.loads(blob.data_stream.read().decode()) + return data.get("project", {}).get("version") + + +def has_code_changes(repo: git.Repo, directory: Path, local: git.Commit, base: git.Commit) -> bool: + diff = base.diff(local, paths=[str(directory)]) + return any(not (d.a_path or d.b_path).endswith("pyproject.toml") for d in diff) + + +def main() -> int: + push_refs = [] + for line in sys.stdin: + parts = line.strip().split() + if len(parts) >= 4: + push_refs.append((parts[1], parts[3])) + + if not push_refs: + return 0 + + repo = git.Repo(search_parent_directories=True) + bumped = [] + + for local_sha, remote_sha in push_refs: + if local_sha == "0" * 40: + continue + + local = repo.commit(local_sha) + base = get_base(repo, local_sha, remote_sha) + if not base: + continue + + for package in PACKAGES: + pyproject = package / "pyproject.toml" + + if not has_code_changes(repo, package, local, base): + continue + + base_version = get_version_at_ref(base, pyproject) + + data = tomlkit.loads(pyproject.read_text()) + local_version = data["project"]["version"] + + if local_version != base_version: + continue + + major, minor, patch = local_version.split(".") + new_version = f"{major}.{minor}.{int(patch) + 1}" + data["project"]["version"] = new_version + pyproject.write_text(tomlkit.dumps(data)) + + bumped.append((package, f"{local_version} -> {new_version}")) + + if not bumped: + return 0 + + files = [str(pkg / "pyproject.toml") for pkg, _ in bumped] + repo.index.add(files) + + details = ", ".join(f"{pkg.name} ({ver})" for pkg, ver in bumped) + repo.index.commit(f"Bump package version: {details}") + + print( + f"Auto-bumped version: {details}. Please push again.", + file=sys.stderr, + ) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) From 1969bbd2cd98b0ec849c81c1a1be64f57a3717d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Forr=C3=B3?= Date: Fri, 26 Jun 2026 13:30:41 +0200 Subject: [PATCH 4/4] Bump package version: common (0.5.0 -> 0.5.1) --- ymir/common/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ymir/common/pyproject.toml b/ymir/common/pyproject.toml index 3a7e923c2..acfc13438 100644 --- a/ymir/common/pyproject.toml +++ b/ymir/common/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "ymir-common" -version = "0.5.0" +version = "0.5.1" description = "Common utilities shared across Ymir AI workflows" requires-python = ">=3.13" dynamic = ["dependencies"]