From 82dc16b7c0ced3fd3f9530625f18c45729abec24 Mon Sep 17 00:00:00 2001 From: Damien Degois Date: Tue, 5 May 2026 23:49:27 +0200 Subject: [PATCH] feat: recover stalled MRs via emoji-triggered API refresh Missed merge/close webhooks (delivery failures, or partial rollbacks where state="merged" commits but msg_to_delete/refs cleanup doesn't) leave Teams cards frozen on the open-state view. Emoji events on the MR now trigger a recovery probe: cleanup tick fetches current state from the GitLab API; on confirmed terminal state, the leftover refs are PATCHed to the merged card and queued for deletion. Idempotent -- no-op if refs already cleaned. - Synthesize action from state on terminal/reopen transitions (cards/render.py keys icon off action, not state). - Normalize API ISO 8601 updated_at to webhook format so the existing fromisoformat parser keeps a single canonical shape. - Assume UTC for naive datetimes (host-TZ skew protection); tolerate unparseable timestamps without sticking the pending refresh. - Distinguish API-unavailable (warning) from API-confirmed-open in logs for triage. Rate-limit relies on the existing pending_mr_refresh debounce. --- db.py | 104 ++++++- gitlab_api.py | 79 +++++ periodic_cleanup.py | 60 ++++ tests/test_refresh_mr_payload.py | 507 +++++++++++++++++++++++++++++++ 4 files changed, 749 insertions(+), 1 deletion(-) create mode 100644 tests/test_refresh_mr_payload.py diff --git a/db.py b/db.py index 45f1a55..5462b7c 100644 --- a/db.py +++ b/db.py @@ -410,7 +410,7 @@ async def _generic_norm_upsert( INSERT INTO "{table}" ( {", ".join(ins_col)} ) VALUES ( - {", ".join(["$"+str(i+1) for i in range(len(ins_col))])} + {", ".join(["$" + str(i + 1) for i in range(len(ins_col))])} ) RETURNING {", ".join(sel_cols)} """, *ins_args, @@ -495,6 +495,108 @@ async def get_pending_refreshes(self, limit: int = 50) -> list[dict[str, Any]]: ) return [dict(row) for row in rows] + async def refresh_mr_payload_from_api( + self, merge_request_ref_id: int, api_data: dict[str, Any] + ) -> MergeRequestInfos: + """Update stored MR payload with fresh state from GitLab API. + + Syncs state, title, draft, merge status, branches, and pipeline ID. + Assignees/reviewers are not updated (API response lacks email field required by GLUser). + """ + connection: asyncpg.Connection + async with await database.acquire() as connection: + async with connection.transaction(): + row = await connection.fetchrow( + """SELECT merge_request_ref_id, merge_request_payload, + merge_request_extra_state, head_pipeline_id + FROM merge_request_ref + WHERE merge_request_ref_id = $1 + FOR UPDATE""", + merge_request_ref_id, + ) + assert row is not None + + payload = row["merge_request_payload"] + oa = payload.get("object_attributes", {}) + + for field in ( + "state", + "title", + "draft", + "detailed_merge_status", + "source_branch", + "target_branch", + ): + if field in api_data: + oa[field] = api_data[field] + + # GitLab REST API returns updated_at as ISO 8601 ("...Z"); + # webhook payloads use "YYYY-MM-DD HH:MM:SS UTC". Normalize + # to webhook format so downstream fromisoformat parsing + # (with the " UTC" -> "+00:00" replace) keeps working. + # Defensive: if GitLab ever returns a naive datetime (no + # offset), assume UTC rather than letting astimezone() apply + # the host's local TZ. If parsing fails outright, log and + # keep the stored value — never raise here, otherwise the + # whole pending_mr_refresh row gets stuck retrying forever. + if "updated_at" in api_data and api_data["updated_at"]: + raw = api_data["updated_at"] + try: + parsed = datetime.datetime.fromisoformat(raw.replace("Z", "+00:00")) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=datetime.UTC) + oa["updated_at"] = parsed.astimezone(datetime.UTC).strftime("%Y-%m-%d %H:%M:%S UTC") + except (ValueError, TypeError) as exc: + log.warning( + "could not parse api updated_at, keeping stored value", + merge_request_ref_id=merge_request_ref_id, + raw=raw, + error=str(exc), + ) + + if "draft" in api_data: + oa["work_in_progress"] = api_data["draft"] + + # Synthesize `action` from state so cards/render.py picks the + # right icon (CodeTextOff for close, Merge for merge). The + # renderer keys off action, not state, so we MUST set it. + # Only mutate on terminal/reopen transitions; otherwise keep + # the webhook-recorded action to avoid masking real events. + api_state = api_data.get("state") + if api_state == "merged" and oa.get("action") != "merge": + oa["action"] = "merge" + elif api_state == "closed" and oa.get("action") != "close": + oa["action"] = "close" + elif api_state == "opened" and oa.get("action") in ("close", "merge"): + oa["action"] = "reopen" + + head_pipeline_id = row["head_pipeline_id"] + api_pipeline = api_data.get("head_pipeline") + if api_pipeline and api_pipeline.get("id"): + oa["head_pipeline_id"] = api_pipeline["id"] + head_pipeline_id = api_pipeline["id"] + + payload["object_attributes"] = oa + + await connection.execute( + """UPDATE merge_request_ref + SET merge_request_payload = $1, head_pipeline_id = $2 + WHERE merge_request_ref_id = $3""", + payload, + head_pipeline_id, + merge_request_ref_id, + ) + + # extra_state is read from the pre-update `row` snapshot. Safe today + # because this function does not mutate extra_state; if that ever + # changes, re-read it after the UPDATE or RETURNING it. + return MergeRequestInfos( + merge_request_ref_id=merge_request_ref_id, + merge_request_payload=payload, + merge_request_extra_state=row["merge_request_extra_state"], + head_pipeline_id=head_pipeline_id, + ) + async def delete_pending_refresh(self, merge_request_ref_id: int) -> None: """Delete a pending refresh after processing.""" connection: asyncpg.Connection diff --git a/gitlab_api.py b/gitlab_api.py index b4b3356..38d5780 100644 --- a/gitlab_api.py +++ b/gitlab_api.py @@ -15,6 +15,7 @@ if TYPE_CHECKING: from db import MergeRequestExtraState + from db import MergeRequestInfos logger = fastapi_structured_logging.get_logger() @@ -178,3 +179,81 @@ async def _fetch_mr_discussion_stats( error=str(e), ) return None + + +async def fetch_and_refresh_mr_status( + merge_request_ref_id: int, + project_url: str, + project_id: int, + mr_iid: int, +) -> MergeRequestInfos | None: + """ + Fetch current MR status from GitLab API and update stored payload. + + Syncs state, title, draft, merge status, branches, and pipeline ID. + Returns updated MergeRequestInfos or None if API unavailable + (token missing, HTTP failure, or transient error). + Callers should treat None as "could not verify" — distinct from + "API confirmed MR still open". + """ + api_data = await _fetch_mr_status(project_url, project_id, mr_iid) + if api_data is None: + return None + + from db import dbh + + return await dbh.refresh_mr_payload_from_api(merge_request_ref_id, api_data) + + +async def _fetch_mr_status( + project_url: str, + project_id: int, + mr_iid: int, +) -> dict[str, Any] | None: + """Fetch single MR from GitLab API. Returns None if token not configured or error.""" + api_token = config.get_gitlab_api_token(project_url) + if api_token is None: + logger.debug("no gitlab api token configured for project", project_url=project_url) + return None + + encoded_project_id = quote(str(project_id), safe="") + url = f"{api_token.url.rstrip('/')}/api/v4/projects/{encoded_project_id}/merge_requests/{mr_iid}" + + try: + timeout = httpx.Timeout(5.0, connect=2.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.get( + url, + headers={"PRIVATE-TOKEN": api_token.token}, + ) + response.raise_for_status() + data: dict[str, Any] = response.json() + logger.info( + "fetched mr status from api", + project_id=project_id, + mr_iid=mr_iid, + state=data.get("state"), + draft=data.get("draft"), + detailed_merge_status=data.get("detailed_merge_status"), + ) + return data + except httpx.HTTPStatusError as e: + logger.warning( + "gitlab api http error fetching mr status", + token_name=api_token.name, + api_url=url, + project_id=project_id, + mr_iid=mr_iid, + status_code=e.response.status_code, + ) + return None + except Exception as e: + logger.warning( + "gitlab api error fetching mr status", + token_name=api_token.name, + api_url=url, + project_id=project_id, + mr_iid=mr_iid, + error=str(e), + ) + return None diff --git a/periodic_cleanup.py b/periodic_cleanup.py index 183e126..490bb9e 100644 --- a/periodic_cleanup.py +++ b/periodic_cleanup.py @@ -8,6 +8,7 @@ from cards.render import render from config import DefaultConfig +from config import config from db import DatabaseLifecycleHandler from db import MergeRequestInfos from db import compute_mri_fingerprint @@ -15,6 +16,7 @@ from db import has_unresolved_threads from db import make_mr_summary from gitlab_api import fetch_and_persist_discussion_stats +from gitlab_api import fetch_and_refresh_mr_status from webhook.messaging import update_all_messages_transactional @@ -46,6 +48,41 @@ async def _process_pending_refreshes() -> int: head_pipeline_id=row["head_pipeline_id"], ) + # Emoji events trigger a full MR status refresh via API + # to sync state that may have been lost due to missed webhooks. + # We do NOT short-circuit when stored state is already terminal: + # the merge_request handler updates payload state and schedules + # deletion in separate transactions, so a partial rollback can + # leave state="merged" with refs still present. Letting the API + # refresh + api-refresh-close branch run is the recovery for that. + api_refreshed = False + if row["payload_type"] == "emoji": + refreshed_mri = await fetch_and_refresh_mr_status( + merge_request_ref_id=mri.merge_request_ref_id, + project_url=mri.merge_request_payload.project.web_url, + project_id=mri.merge_request_payload.object_attributes.target_project_id, + mr_iid=mri.merge_request_payload.object_attributes.iid, + ) + if refreshed_mri is not None: + mri = refreshed_mri + api_refreshed = True + if mri.merge_request_payload.object_attributes.state in ("closed", "merged"): + logger.info( + "api refresh detected terminal state", + merge_request_ref_id=mri.merge_request_ref_id, + state=mri.merge_request_payload.object_attributes.state, + ) + else: + # None = couldn't verify (no token / HTTP error). Distinct + # from "API confirmed still open"; recurring emoji events + # on a closed MR will keep landing here until API succeeds. + logger.warning( + "api refresh unavailable for emoji refresh", + merge_request_ref_id=mri.merge_request_ref_id, + project_id=mri.merge_request_payload.object_attributes.target_project_id, + mr_iid=mri.merge_request_payload.object_attributes.iid, + ) + had_unresolved_threads = has_unresolved_threads(mri.merge_request_extra_state) updated_extra_state = await fetch_and_persist_discussion_stats( @@ -90,6 +127,29 @@ async def _process_pending_refreshes() -> int: ) continue + # API refresh detected closed/merged MR — schedule message deletion to clean up + if api_refreshed and is_closing_state: + datasource_fingerprint = compute_mri_fingerprint(mri) + card = render(mri, collapsed=True, show_collapsible=True) + await update_all_messages_transactional( + mri, + card, + make_mr_summary(mri), + datasource_fingerprint, + payload_updated_at, + "api-refresh-close", + schedule_deletion=True, + deletion_delay=datetime.timedelta(seconds=config.MESSAGE_DELETE_DELAY_SECONDS), + ) + await dbh.delete_pending_refresh(mri.merge_request_ref_id) + processed += 1 + logger.info( + "api refresh detected closed/merged MR, scheduled message deletion", + merge_request_ref_id=mri.merge_request_ref_id, + state=mri.merge_request_payload.object_attributes.state, + ) + continue + should_be_collapsed: bool = ( mri.merge_request_payload.object_attributes.draft or mri.merge_request_payload.object_attributes.work_in_progress diff --git a/tests/test_refresh_mr_payload.py b/tests/test_refresh_mr_payload.py new file mode 100644 index 0000000..3d81d0c --- /dev/null +++ b/tests/test_refresh_mr_payload.py @@ -0,0 +1,507 @@ +#!/usr/bin/env python3 +""" +Tests for the GitLab API state-refresh recovery path: + +- DBHelper.refresh_mr_payload_from_api (db.py) +- fetch_and_refresh_mr_status (gitlab_api.py) +- _process_pending_refreshes api-refresh-close branch (periodic_cleanup.py) +""" + +from typing import Any +from unittest.mock import AsyncMock +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest + + +def _stored_payload(**overrides: Any) -> dict[str, Any]: + """Webhook-format stored payload, valid for MergeRequestPayload schema.""" + oa: dict[str, Any] = { + "id": 123, + "iid": 1, + "title": "Test MR", + "created_at": "2026-01-01 00:00:00 UTC", + "draft": False, + "state": "opened", + "url": "https://gitlab.example.com/test/project/-/merge_requests/1", + "action": "open", + "updated_at": "2026-01-01 00:00:00 UTC", + "detailed_merge_status": "mergeable", + "head_pipeline_id": None, + "work_in_progress": False, + "source_project_id": 100, + "source_branch": "feature", + "target_project_id": 100, + "target_branch": "main", + } + oa.update(overrides) + return { + "object_kind": "merge_request", + "event_type": "merge_request", + "repository": {"homepage": "https://gitlab.example.com/test/project", "name": "project"}, + "user": {"id": 1, "username": "u", "name": "U", "email": "u@example.com"}, + "project": { + "id": 1, + "path_with_namespace": "test/project", + "web_url": "https://gitlab.example.com/test/project", + }, + "object_attributes": oa, + "changes": {}, + "assignees": [], + "reviewers": [], + } + + +def _stored_extra_state() -> dict[str, Any]: + return { + "version": 1, + "opener": {"id": 1, "username": "u", "name": "U"}, + "approvers": {}, + "pipeline_statuses": {}, + "emojis": {}, + } + + +def _mock_db_with_row(stored_row: dict[str, Any]): + """Build a mock_database whose fetchrow returns stored_row.""" + db = MagicMock() + conn = MagicMock() + conn.execute = AsyncMock() + conn.fetchrow = AsyncMock(return_value=stored_row) + + class TxnCtx: + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + return None + + conn.transaction = MagicMock(return_value=TxnCtx()) + + class AcquireCtx: + async def __aenter__(self): + return conn + + async def __aexit__(self, *args): + return None + + db.acquire = AsyncMock(return_value=AcquireCtx()) + return db, conn + + +@pytest.mark.asyncio +async def test_refresh_state_opened_to_merged_sets_action_merge(): + """API state 'merged' on a webhook-recorded 'open' must synthesize action='merge'.""" + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(state="opened", action="open"), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, conn = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = { + "state": "merged", + "title": "Test MR", + "draft": False, + "detailed_merge_status": "mergeable", + "source_branch": "feature", + "target_branch": "main", + "updated_at": "2026-05-05T14:36:14.118Z", + } + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + oa = result.merge_request_payload.object_attributes + assert oa.state == "merged" + assert oa.action == "merge" + + +@pytest.mark.asyncio +async def test_refresh_state_opened_to_closed_sets_action_close(): + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(state="opened", action="open"), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = {"state": "closed", "updated_at": "2026-05-05T14:36:14Z"} + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + assert result.merge_request_payload.object_attributes.action == "close" + + +@pytest.mark.asyncio +async def test_refresh_state_still_opened_does_not_mutate_action(): + """If API confirms still open and stored action is benign, action must stay.""" + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(state="opened", action="approved"), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = {"state": "opened", "updated_at": "2026-05-05T14:36:14Z"} + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + # No transition: action stays as it was on the webhook. + assert result.merge_request_payload.object_attributes.action == "approved" + + +@pytest.mark.asyncio +async def test_refresh_reopen_path_when_state_returned_to_opened(): + """Stored action close/merge but API says opened → action=reopen.""" + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(state="merged", action="merge"), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = {"state": "opened", "updated_at": "2026-05-05T14:36:14Z"} + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + assert result.merge_request_payload.object_attributes.action == "reopen" + assert result.merge_request_payload.object_attributes.state == "opened" + + +@pytest.mark.asyncio +async def test_refresh_normalizes_iso_updated_at_to_webhook_format(): + """API ISO-8601 updated_at must be stored in webhook 'YYYY-MM-DD HH:MM:SS UTC' format.""" + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = {"state": "merged", "updated_at": "2026-05-05T14:36:14.118Z"} + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + assert result.merge_request_payload.object_attributes.updated_at == "2026-05-05 14:36:14 UTC" + + +@pytest.mark.asyncio +async def test_refresh_naive_iso_updated_at_assumed_utc_not_local_tz(): + """Defensive: API returning naive ISO (no offset) must NOT be shifted by host TZ. + + Without the explicit UTC tag, astimezone() treats naive datetimes as the + host's local timezone — wrong if the host runs in any non-UTC TZ. + """ + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = {"state": "merged", "updated_at": "2026-05-05T14:36:14"} + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + # Time must be preserved as-is (assumed UTC), not shifted by host TZ. + assert result.merge_request_payload.object_attributes.updated_at == "2026-05-05 14:36:14 UTC" + + +@pytest.mark.asyncio +async def test_refresh_unparseable_updated_at_keeps_stored_value(): + """Defensive: bad updated_at format must not raise — would stick the pending refresh.""" + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(updated_at="2026-01-01 00:00:00 UTC"), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = {"state": "merged", "updated_at": "not a date"} + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + # Stored value preserved, no exception raised, state still updated. + assert result.merge_request_payload.object_attributes.updated_at == "2026-01-01 00:00:00 UTC" + assert result.merge_request_payload.object_attributes.state == "merged" + + +@pytest.mark.asyncio +async def test_refresh_normalizes_iso_updated_at_with_offset(): + """Non-Z ISO with explicit offset must also normalize to UTC webhook format.""" + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = {"state": "merged", "updated_at": "2026-05-05T16:36:14+02:00"} + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + assert result.merge_request_payload.object_attributes.updated_at == "2026-05-05 14:36:14 UTC" + + +@pytest.mark.asyncio +async def test_refresh_head_pipeline_present_updates_column_and_oa(): + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(head_pipeline_id=10), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": 10, + } + db, conn = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = { + "state": "merged", + "head_pipeline": {"id": 99, "status": "success"}, + "updated_at": "2026-05-05T14:36:14Z", + } + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + assert result.head_pipeline_id == 99 + assert result.merge_request_payload.object_attributes.head_pipeline_id == 99 + + update_calls = [c for c in conn.execute.call_args_list if "UPDATE merge_request_ref" in c.args[0]] + assert update_calls, "expected UPDATE of merge_request_ref" + assert update_calls[-1].args[2] == 99 + + +@pytest.mark.asyncio +async def test_refresh_head_pipeline_null_keeps_stored_value(): + """API returning head_pipeline=null must not overwrite stored pipeline id.""" + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(head_pipeline_id=10), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": 10, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = { + "state": "merged", + "head_pipeline": None, + "updated_at": "2026-05-05T14:36:14Z", + } + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + assert result.head_pipeline_id == 10 + assert result.merge_request_payload.object_attributes.head_pipeline_id == 10 + + +@pytest.mark.asyncio +async def test_refresh_draft_syncs_work_in_progress(): + from db import DBHelper + + stored = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(draft=False, work_in_progress=False), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + } + db, _ = _mock_db_with_row(stored) + dbh = DBHelper(db) + + api_data: dict[str, Any] = {"state": "opened", "draft": True, "updated_at": "2026-05-05T14:36:14Z"} + + with patch("db.database", db): + result = await dbh.refresh_mr_payload_from_api(1, api_data) + + assert result.merge_request_payload.object_attributes.draft is True + assert result.merge_request_payload.object_attributes.work_in_progress is True + + +@pytest.mark.asyncio +async def test_emoji_refresh_calls_api_even_when_state_already_terminal(): + """Even when stored state is already terminal, emoji refresh must call the API. + + The merge_request handler updates payload state and schedules deletion + in separate transactions, so a partial rollback can leave state='merged' + with refs still present. The api-refresh-close branch (gated on + api_refreshed) is the recovery — short-circuiting the API call here + would skip that recovery. + """ + import periodic_cleanup as pc + + pending_row = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(state="merged", action="merge"), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + "payload_type": "emoji", + "first_event_at": None, + "last_event_at": None, + } + + mock_fetch = AsyncMock(return_value=None) + mock_dbh = MagicMock() + mock_dbh.get_pending_refreshes = AsyncMock(return_value=[pending_row]) + mock_dbh.delete_pending_refresh = AsyncMock() + mock_update = AsyncMock(return_value=0) + + with ( + patch.object(pc, "fetch_and_refresh_mr_status", mock_fetch), + patch.object(pc, "fetch_and_persist_discussion_stats", AsyncMock(return_value=None)), + patch.object(pc, "update_all_messages_transactional", mock_update), + patch.object(pc, "dbh", mock_dbh), + ): + await pc._process_pending_refreshes() + + mock_fetch.assert_called_once() + + +@pytest.mark.asyncio +async def test_emoji_refresh_calls_api_when_state_still_open(): + """Stored state is non-terminal → emoji refresh must call the API.""" + import periodic_cleanup as pc + + pending_row = { + "merge_request_ref_id": 1, + "merge_request_payload": _stored_payload(state="opened", action="open"), + "merge_request_extra_state": _stored_extra_state(), + "head_pipeline_id": None, + "payload_type": "emoji", + "first_event_at": None, + "last_event_at": None, + } + + mock_fetch = AsyncMock(return_value=None) + mock_dbh = MagicMock() + mock_dbh.get_pending_refreshes = AsyncMock(return_value=[pending_row]) + mock_dbh.delete_pending_refresh = AsyncMock() + mock_update = AsyncMock(return_value=0) + + with ( + patch.object(pc, "fetch_and_refresh_mr_status", mock_fetch), + patch.object(pc, "fetch_and_persist_discussion_stats", AsyncMock(return_value=None)), + patch.object(pc, "update_all_messages_transactional", mock_update), + patch.object(pc, "dbh", mock_dbh), + ): + await pc._process_pending_refreshes() + + mock_fetch.assert_called_once() + + +@pytest.mark.asyncio +async def test_fetch_and_refresh_returns_none_when_token_missing(): + """No GitLab token configured for the project URL → return None, no DB write.""" + from gitlab_api import fetch_and_refresh_mr_status + + with ( + patch("gitlab_api.config") as mock_cfg, + patch("db.dbh") as mock_dbh, + ): + mock_cfg.get_gitlab_api_token.return_value = None + mock_dbh.refresh_mr_payload_from_api = AsyncMock() + + result = await fetch_and_refresh_mr_status( + merge_request_ref_id=1, + project_url="https://gitlab.example.com/test/project", + project_id=100, + mr_iid=1, + ) + + assert result is None + mock_dbh.refresh_mr_payload_from_api.assert_not_called() + + +@pytest.mark.asyncio +async def test_fetch_and_refresh_returns_none_on_http_error(): + """HTTP 404/4xx on the GitLab API call → return None, no DB write.""" + import httpx + + from gitlab_api import fetch_and_refresh_mr_status + + api_token = MagicMock() + api_token.url = "https://gitlab.example.com" + api_token.token = "tok" # noqa: S105 + api_token.name = "test" + + class MockResponse: + status_code = 404 + + def raise_for_status(self): + raise httpx.HTTPStatusError("not found", request=MagicMock(), response=self) + + class MockClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + return None + + async def get(self, *args, **kwargs): + return MockResponse() + + with ( + patch("gitlab_api.config") as mock_cfg, + patch("gitlab_api.httpx.AsyncClient", MockClient), + patch("db.dbh") as mock_dbh, + ): + mock_cfg.get_gitlab_api_token.return_value = api_token + mock_dbh.refresh_mr_payload_from_api = AsyncMock() + + result = await fetch_and_refresh_mr_status( + merge_request_ref_id=1, + project_url="https://gitlab.example.com/test/project", + project_id=100, + mr_iid=1, + ) + + assert result is None + mock_dbh.refresh_mr_payload_from_api.assert_not_called()