Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 103 additions & 1 deletion db.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ async def _generic_norm_upsert(
INSERT INTO "{table}" (
{", ".join(ins_col)}
) VALUES (
{", ".join(["$"+str(i+1) for i in range(len(ins_col))])}
{", ".join(["$" + str(i + 1) for i in range(len(ins_col))])}
) RETURNING {", ".join(sel_cols)}
""",
*ins_args,
Expand Down Expand Up @@ -495,6 +495,108 @@ async def get_pending_refreshes(self, limit: int = 50) -> list[dict[str, Any]]:
)
return [dict(row) for row in rows]

async def refresh_mr_payload_from_api(
self, merge_request_ref_id: int, api_data: dict[str, Any]
) -> MergeRequestInfos:
"""Update stored MR payload with fresh state from GitLab API.

Syncs state, title, draft, merge status, branches, and pipeline ID.
Assignees/reviewers are not updated (API response lacks email field required by GLUser).
"""
connection: asyncpg.Connection
async with await database.acquire() as connection:
async with connection.transaction():
row = await connection.fetchrow(
"""SELECT merge_request_ref_id, merge_request_payload,
merge_request_extra_state, head_pipeline_id
FROM merge_request_ref
WHERE merge_request_ref_id = $1
FOR UPDATE""",
merge_request_ref_id,
)
assert row is not None

payload = row["merge_request_payload"]
oa = payload.get("object_attributes", {})

for field in (
"state",
"title",
"draft",
"detailed_merge_status",
"source_branch",
"target_branch",
):
if field in api_data:
oa[field] = api_data[field]

# GitLab REST API returns updated_at as ISO 8601 ("...Z");
# webhook payloads use "YYYY-MM-DD HH:MM:SS UTC". Normalize
# to webhook format so downstream fromisoformat parsing
# (with the " UTC" -> "+00:00" replace) keeps working.
# Defensive: if GitLab ever returns a naive datetime (no
# offset), assume UTC rather than letting astimezone() apply
# the host's local TZ. If parsing fails outright, log and
# keep the stored value — never raise here, otherwise the
# whole pending_mr_refresh row gets stuck retrying forever.
if "updated_at" in api_data and api_data["updated_at"]:
raw = api_data["updated_at"]
try:
parsed = datetime.datetime.fromisoformat(raw.replace("Z", "+00:00"))
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=datetime.UTC)
oa["updated_at"] = parsed.astimezone(datetime.UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
except (ValueError, TypeError) as exc:
log.warning(
"could not parse api updated_at, keeping stored value",
merge_request_ref_id=merge_request_ref_id,
raw=raw,
error=str(exc),
)

if "draft" in api_data:
oa["work_in_progress"] = api_data["draft"]

# Synthesize `action` from state so cards/render.py picks the
# right icon (CodeTextOff for close, Merge for merge). The
# renderer keys off action, not state, so we MUST set it.
# Only mutate on terminal/reopen transitions; otherwise keep
# the webhook-recorded action to avoid masking real events.
api_state = api_data.get("state")
if api_state == "merged" and oa.get("action") != "merge":
oa["action"] = "merge"
elif api_state == "closed" and oa.get("action") != "close":
oa["action"] = "close"
elif api_state == "opened" and oa.get("action") in ("close", "merge"):
oa["action"] = "reopen"

head_pipeline_id = row["head_pipeline_id"]
api_pipeline = api_data.get("head_pipeline")
if api_pipeline and api_pipeline.get("id"):
oa["head_pipeline_id"] = api_pipeline["id"]
head_pipeline_id = api_pipeline["id"]

payload["object_attributes"] = oa

await connection.execute(
"""UPDATE merge_request_ref
SET merge_request_payload = $1, head_pipeline_id = $2
WHERE merge_request_ref_id = $3""",
payload,
head_pipeline_id,
merge_request_ref_id,
)

# extra_state is read from the pre-update `row` snapshot. Safe today
# because this function does not mutate extra_state; if that ever
# changes, re-read it after the UPDATE or RETURNING it.
return MergeRequestInfos(
merge_request_ref_id=merge_request_ref_id,
merge_request_payload=payload,
merge_request_extra_state=row["merge_request_extra_state"],
head_pipeline_id=head_pipeline_id,
)

async def delete_pending_refresh(self, merge_request_ref_id: int) -> None:
"""Delete a pending refresh after processing."""
connection: asyncpg.Connection
Expand Down
79 changes: 79 additions & 0 deletions gitlab_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

if TYPE_CHECKING:
from db import MergeRequestExtraState
from db import MergeRequestInfos

logger = fastapi_structured_logging.get_logger()

Expand Down Expand Up @@ -178,3 +179,81 @@ async def _fetch_mr_discussion_stats(
error=str(e),
)
return None


async def fetch_and_refresh_mr_status(
merge_request_ref_id: int,
project_url: str,
project_id: int,
mr_iid: int,
) -> MergeRequestInfos | None:
"""
Fetch current MR status from GitLab API and update stored payload.

Syncs state, title, draft, merge status, branches, and pipeline ID.
Returns updated MergeRequestInfos or None if API unavailable
(token missing, HTTP failure, or transient error).
Callers should treat None as "could not verify" — distinct from
"API confirmed MR still open".
"""
api_data = await _fetch_mr_status(project_url, project_id, mr_iid)
if api_data is None:
return None

from db import dbh

return await dbh.refresh_mr_payload_from_api(merge_request_ref_id, api_data)


async def _fetch_mr_status(
project_url: str,
project_id: int,
mr_iid: int,
) -> dict[str, Any] | None:
"""Fetch single MR from GitLab API. Returns None if token not configured or error."""
api_token = config.get_gitlab_api_token(project_url)
if api_token is None:
logger.debug("no gitlab api token configured for project", project_url=project_url)
return None

encoded_project_id = quote(str(project_id), safe="")
url = f"{api_token.url.rstrip('/')}/api/v4/projects/{encoded_project_id}/merge_requests/{mr_iid}"

try:
timeout = httpx.Timeout(5.0, connect=2.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
url,
headers={"PRIVATE-TOKEN": api_token.token},
)
response.raise_for_status()
data: dict[str, Any] = response.json()
logger.info(
"fetched mr status from api",
project_id=project_id,
mr_iid=mr_iid,
state=data.get("state"),
draft=data.get("draft"),
detailed_merge_status=data.get("detailed_merge_status"),
)
return data
except httpx.HTTPStatusError as e:
logger.warning(
"gitlab api http error fetching mr status",
token_name=api_token.name,
api_url=url,
project_id=project_id,
mr_iid=mr_iid,
status_code=e.response.status_code,
)
return None
except Exception as e:
logger.warning(
"gitlab api error fetching mr status",
token_name=api_token.name,
api_url=url,
project_id=project_id,
mr_iid=mr_iid,
error=str(e),
)
return None
60 changes: 60 additions & 0 deletions periodic_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

from cards.render import render
from config import DefaultConfig
from config import config
from db import DatabaseLifecycleHandler
from db import MergeRequestInfos
from db import compute_mri_fingerprint
from db import dbh
from db import has_unresolved_threads
from db import make_mr_summary
from gitlab_api import fetch_and_persist_discussion_stats
from gitlab_api import fetch_and_refresh_mr_status
from webhook.messaging import update_all_messages_transactional


Expand Down Expand Up @@ -46,6 +48,41 @@ async def _process_pending_refreshes() -> int:
head_pipeline_id=row["head_pipeline_id"],
)

# Emoji events trigger a full MR status refresh via API
# to sync state that may have been lost due to missed webhooks.
# We do NOT short-circuit when stored state is already terminal:
# the merge_request handler updates payload state and schedules
# deletion in separate transactions, so a partial rollback can
# leave state="merged" with refs still present. Letting the API
# refresh + api-refresh-close branch run is the recovery for that.
api_refreshed = False
if row["payload_type"] == "emoji":
refreshed_mri = await fetch_and_refresh_mr_status(
merge_request_ref_id=mri.merge_request_ref_id,
project_url=mri.merge_request_payload.project.web_url,
project_id=mri.merge_request_payload.object_attributes.target_project_id,
mr_iid=mri.merge_request_payload.object_attributes.iid,
)
if refreshed_mri is not None:
mri = refreshed_mri
api_refreshed = True
if mri.merge_request_payload.object_attributes.state in ("closed", "merged"):
logger.info(
"api refresh detected terminal state",
merge_request_ref_id=mri.merge_request_ref_id,
state=mri.merge_request_payload.object_attributes.state,
)
else:
# None = couldn't verify (no token / HTTP error). Distinct
# from "API confirmed still open"; recurring emoji events
# on a closed MR will keep landing here until API succeeds.
logger.warning(
"api refresh unavailable for emoji refresh",
merge_request_ref_id=mri.merge_request_ref_id,
project_id=mri.merge_request_payload.object_attributes.target_project_id,
mr_iid=mri.merge_request_payload.object_attributes.iid,
)

had_unresolved_threads = has_unresolved_threads(mri.merge_request_extra_state)

updated_extra_state = await fetch_and_persist_discussion_stats(
Expand Down Expand Up @@ -90,6 +127,29 @@ async def _process_pending_refreshes() -> int:
)
continue

# API refresh detected closed/merged MR — schedule message deletion to clean up
if api_refreshed and is_closing_state:
datasource_fingerprint = compute_mri_fingerprint(mri)
card = render(mri, collapsed=True, show_collapsible=True)
await update_all_messages_transactional(
mri,
card,
make_mr_summary(mri),
datasource_fingerprint,
payload_updated_at,
"api-refresh-close",
schedule_deletion=True,
deletion_delay=datetime.timedelta(seconds=config.MESSAGE_DELETE_DELAY_SECONDS),
)
await dbh.delete_pending_refresh(mri.merge_request_ref_id)
processed += 1
logger.info(
"api refresh detected closed/merged MR, scheduled message deletion",
merge_request_ref_id=mri.merge_request_ref_id,
state=mri.merge_request_payload.object_attributes.state,
)
continue

should_be_collapsed: bool = (
mri.merge_request_payload.object_attributes.draft
or mri.merge_request_payload.object_attributes.work_in_progress
Expand Down
Loading
Loading