From 560983c6a5fe7e71113b2dc8b568e4ef72764ff6 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Mon, 23 Mar 2026 20:52:06 +0530 Subject: [PATCH] Add link-level OTEL metrics with link.name and vcon.uuid attributes (CON-6) - Add `conserver.link.execution_time` histogram to main loop's `_process_link` so every link gets timing automatically without per-link boilerplate - Add `link.name` and `vcon.uuid` attributes to all existing metric calls in analyze, analyze_and_label, analyze_vcon, check_and_tag, detect_engagement, deepgram_link, groq_whisper, hugging_face_whisper, openai_transcribe, hugging_llm_link - Add failure counters and domain events to previously uninstrumented links: webhook (post_failures, no_urls_configured), post_analysis_to_slack (post_failures, fallback_channel_used), datatrails (event_creation_failures), scitt (statement_creation_failures, registration_failures), jq_link (filter_errors, vcon_filtered_out), tag_router (routes_matched, routed_count), sampler (sampled_in, sampled_out), diet (media_post_failures) - Add `get_vcon_not_found` counter to lib/vcon_redis.py - Add InMemoryMetricReader-based conftest.py fixture and 25 unit tests validating metric name + link.name + vcon.uuid attributes for all instrumented links and the main loop chain.name attribute Co-Authored-By: Claude Sonnet 4.6 --- server/lib/vcon_redis.py | 2 + server/links/analyze/__init__.py | 4 +- server/links/analyze_and_label/__init__.py | 10 +- server/links/analyze_vcon/__init__.py | 8 +- server/links/check_and_tag/__init__.py | 6 +- server/links/datatrails/__init__.py | 13 +- server/links/deepgram_link/__init__.py | 11 +- server/links/detect_engagement/__init__.py | 6 +- server/links/diet/__init__.py | 11 +- server/links/groq_whisper/__init__.py | 10 +- server/links/hugging_face_whisper/__init__.py | 7 +- server/links/hugging_llm_link/__init__.py | 5 +- server/links/jq_link/__init__.py | 10 +- server/links/openai_transcribe/__init__.py | 7 +- .../links/post_analysis_to_slack/__init__.py | 21 +- server/links/sampler/__init__.py | 16 +- server/links/scitt/__init__.py | 43 +- server/links/tag_router/__init__.py | 12 +- server/links/webhook/__init__.py | 18 +- server/main.py | 14 +- server/tests/conftest.py | 28 + server/tests/test_link_metrics.py | 605 ++++++++++++++++++ 22 files changed, 793 insertions(+), 74 deletions(-) create mode 100644 server/tests/conftest.py create mode 100644 server/tests/test_link_metrics.py diff --git a/server/lib/vcon_redis.py b/server/lib/vcon_redis.py index ae19d4c..4b1068d 100644 --- a/server/lib/vcon_redis.py +++ b/server/lib/vcon_redis.py @@ -1,5 +1,6 @@ from typing import Optional from lib.logging_utils import init_logger +from lib.metrics import increment_counter from redis.commands.json.path import Path from redis_mgr import redis from settings import VCON_REDIS_EXPIRY @@ -49,6 +50,7 @@ def get_vcon(self, vcon_id: str) -> Optional[vcon.Vcon]: f"vcon:{vcon_id}", Path.root_path() ) if not vcon_dict: + increment_counter("conserver.lib.vcon_redis.get_vcon_not_found") return None _vcon = vcon.Vcon(vcon_dict) return _vcon diff --git a/server/links/analyze/__init__.py b/server/links/analyze/__init__.py index 00a9142..086e68b 100644 --- a/server/links/analyze/__init__.py +++ b/server/links/analyze/__init__.py @@ -153,14 +153,14 @@ def run( ) increment_counter( "conserver.link.openai.analysis_failures", - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) raise e record_histogram( "conserver.link.openai.analysis_time", time.time() - start, - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) vendor_schema = {} diff --git a/server/links/analyze_and_label/__init__.py b/server/links/analyze_and_label/__init__.py index edfb6ad..eaee831 100644 --- a/server/links/analyze_and_label/__init__.py +++ b/server/links/analyze_and_label/__init__.py @@ -149,14 +149,14 @@ def run( increment_counter( "conserver.link.openai.labels_added", value=len(labels), - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) - + except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON response for vCon {vcon_uuid}: {e}") increment_counter( "conserver.link.openai.json_parse_failures", - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) # Add the raw text anyway as the analysis vCon.add_analysis( @@ -182,14 +182,14 @@ def run( ) increment_counter( "conserver.link.openai.analysis_failures", - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) raise e record_histogram( "conserver.link.openai.analysis_time", time.time() - start, - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) vcon_redis.store_vcon(vCon) diff --git a/server/links/analyze_vcon/__init__.py b/server/links/analyze_vcon/__init__.py index 2a8dd5e..76dacd3 100644 --- a/server/links/analyze_vcon/__init__.py +++ b/server/links/analyze_vcon/__init__.py @@ -148,10 +148,10 @@ def run( ) increment_counter( "conserver.link.openai.invalid_json", - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) raise ValueError("Invalid JSON response from OpenAI") - + except Exception as e: logger.error( "Failed to generate analysis for vCon %s after multiple retries: %s", @@ -160,14 +160,14 @@ def run( ) increment_counter( "conserver.link.openai.analysis_failures", - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) raise e record_histogram( "conserver.link.openai.analysis_time", time.time() - start, - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) vendor_schema = {} diff --git a/server/links/check_and_tag/__init__.py b/server/links/check_and_tag/__init__.py index d0d1474..5376080 100644 --- a/server/links/check_and_tag/__init__.py +++ b/server/links/check_and_tag/__init__.py @@ -182,7 +182,7 @@ def run( logger.info(f"Applied tag: {opts['tag_name']}:{opts['tag_value']} (evaluation: {applies})") increment_counter( "conserver.link.openai.tags_applied", - attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']}, + attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) else: logger.info(f"Tag not applied: {opts['tag_name']}:{opts['tag_value']} (evaluation: {applies})") @@ -194,14 +194,14 @@ def run( ) increment_counter( "conserver.link.openai.evaluation_failures", - attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']}, + attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) raise e record_histogram( "conserver.link.openai.evaluation_time", time.time() - start, - attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']}, + attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) vcon_redis.store_vcon(vCon) diff --git a/server/links/datatrails/__init__.py b/server/links/datatrails/__init__.py index ebaaeae..aaa89b1 100644 --- a/server/links/datatrails/__init__.py +++ b/server/links/datatrails/__init__.py @@ -4,6 +4,7 @@ from fastapi import HTTPException from lib.vcon_redis import VconRedis from lib.logging_utils import init_logger +from lib.metrics import increment_counter from starlette.status import HTTP_404_NOT_FOUND, HTTP_501_NOT_IMPLEMENTED from vcon import Vcon @@ -382,7 +383,13 @@ def run(vcon_uuid: str, link_name: str, opts: dict = default_options) -> str: # } # ) - event = create_asset_event(opts, asset_id, auth, event_attributes) + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} + + try: + event = create_asset_event(opts, asset_id, auth, event_attributes) + except Exception: + increment_counter("conserver.link.datatrails.event_creation_failures", attributes=attrs) + raise event_id = event["identity"] logger.info(f"DataTrails: Event Created: {event_id}") @@ -395,6 +402,10 @@ def run(vcon_uuid: str, link_name: str, opts: dict = default_options) -> str: event_id = event["identity"] logger.info(f"DataTrails: New Event Created: {event_id}") except: + increment_counter( + "conserver.link.datatrails.event_creation_failures", + attributes={**attrs, "event_type": "asset_free"}, + ) logger.info(f"DataTrails: New Event Creation Failure") diff --git a/server/links/deepgram_link/__init__.py b/server/links/deepgram_link/__init__.py index 14680fe..c71bd24 100644 --- a/server/links/deepgram_link/__init__.py +++ b/server/links/deepgram_link/__init__.py @@ -235,6 +235,7 @@ def run( logger.info("Dialog %s already transcribed on vCon: %s", index, vCon.uuid) continue + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} start = time.time() try: if opts.get("LITELLM_PROXY_URL") and opts.get("LITELLM_MASTER_KEY"): @@ -248,26 +249,26 @@ def run( result = transcribe_dg(dg_client, dialog, opts["api"], vcon_uuid=vcon_uuid, run_opts=opts) except Exception as e: logger.error("Failed to transcribe vCon %s after multiple retries: %s", vcon_uuid, e, exc_info=True) - increment_counter("conserver.link.deepgram.transcription_failures") + increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs) raise e elapsed = time.time() - start - record_histogram("conserver.link.deepgram.transcription_time", elapsed) + record_histogram("conserver.link.deepgram.transcription_time", elapsed, attributes=attrs) logger.info(f"Transcription for dialog {index} took {elapsed:.2f} seconds.") if not result: logger.warning("No transcription generated for vCon %s, dialog %s", vcon_uuid, index) - increment_counter("conserver.link.deepgram.transcription_failures") + increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs) break # Log and track confidence (not available for LiteLLM/OpenAI-format transcription) confidence = result.get("confidence") if confidence is not None: - record_histogram("conserver.link.deepgram.confidence", confidence) + record_histogram("conserver.link.deepgram.confidence", confidence, attributes=attrs) logger.info(f"Transcription confidence for dialog {index}: {confidence}") # If the confidence is too low, don't store the transcript if confidence < opts["minimum_confidence"]: logger.warning("Low confidence result for vCon %s, dialog %s: %s", vcon_uuid, index, confidence) - increment_counter("conserver.link.deepgram.transcription_failures") + increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs) continue else: logger.info(f"Confidence not available for dialog {index} (LiteLLM path), skipping threshold check") diff --git a/server/links/detect_engagement/__init__.py b/server/links/detect_engagement/__init__.py index 9c4f8fc..31e871a 100644 --- a/server/links/detect_engagement/__init__.py +++ b/server/links/detect_engagement/__init__.py @@ -144,7 +144,7 @@ def run( increment_counter( "conserver.link.openai.engagement_detected", value=1 if is_engaged else 0, - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) except Exception as e: @@ -158,14 +158,14 @@ def run( ) increment_counter( "conserver.link.openai.engagement_analysis_failures", - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) raise e record_histogram( "conserver.link.openai.engagement_analysis_time", time.time() - start, - attributes={"analysis_type": opts['analysis_type']}, + attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid}, ) vcon_redis.store_vcon(vCon) diff --git a/server/links/diet/__init__.py b/server/links/diet/__init__.py index a064ca8..91683f3 100644 --- a/server/links/diet/__init__.py +++ b/server/links/diet/__init__.py @@ -1,5 +1,6 @@ from redis_mgr import redis from lib.logging_utils import init_logger +from lib.metrics import increment_counter import json import requests from typing import Dict, List, Any, Optional @@ -19,13 +20,15 @@ def run(vcon_uuid, link_name, opts=default_options): logger.info("Starting diet::run") - + # Merge provided options with defaults options = {**default_options, **opts} - + for key, value in options.items(): logger.info(f"diet::{key}: {value}") + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} + # Load vCon from Redis using JSON.GET vcon = redis.json().get(f"vcon:{vcon_uuid}") if not vcon: @@ -57,9 +60,11 @@ def run(vcon_uuid, link_name, opts=default_options): else: dialog["body"] = "" else: + increment_counter("conserver.link.diet.media_post_failures", attributes=attrs) logger.error(f"Failed to post media: {response.status_code}") dialog["body"] = "" except Exception as e: + increment_counter("conserver.link.diet.media_post_failures", attributes=attrs) logger.error(f"Exception posting media: {e}") dialog["body"] = "" else: @@ -85,7 +90,7 @@ def run(vcon_uuid, link_name, opts=default_options): # Save the modified vCon back to Redis using JSON.SET redis.json().set(f"vcon:{vcon_uuid}", "$", vcon) logger.info(f"Successfully applied diet to vCon {vcon_uuid}") - + return vcon_uuid def remove_system_prompts_recursive(obj): diff --git a/server/links/groq_whisper/__init__.py b/server/links/groq_whisper/__init__.py index 7d83673..5d2e268 100644 --- a/server/links/groq_whisper/__init__.py +++ b/server/links/groq_whisper/__init__.py @@ -321,6 +321,7 @@ def run( vCon.uuid) continue + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} try: # Attempt transcription with timing metrics start = time.time() @@ -329,25 +330,26 @@ def run( result = transcribe_groq_whisper(dialog, opts) record_histogram( "conserver.link.groq_whisper.transcription_time", - time.time() - start + time.time() - start, + attributes=attrs, ) except RetryError as re: logger.error( "Failed to transcribe vCon %s after multiple retry attempts: %s", vcon_uuid, re) - increment_counter("conserver.link.groq_whisper.transcription_failures") + increment_counter("conserver.link.groq_whisper.transcription_failures", attributes=attrs) break except Exception as e: logger.error( "Unexpected error transcribing vCon %s: %s", vcon_uuid, e) - increment_counter("conserver.link.groq_whisper.transcription_failures") + increment_counter("conserver.link.groq_whisper.transcription_failures", attributes=attrs) break if not result: logger.warning("No transcription generated for vCon %s", vcon_uuid) increment_counter( - "conserver.link.groq_whisper.transcription_failures") + "conserver.link.groq_whisper.transcription_failures", attributes=attrs) break logger.info("Transcribed vCon: %s", vCon.uuid) diff --git a/server/links/hugging_face_whisper/__init__.py b/server/links/hugging_face_whisper/__init__.py index a2b11d1..b8dcea6 100644 --- a/server/links/hugging_face_whisper/__init__.py +++ b/server/links/hugging_face_whisper/__init__.py @@ -184,20 +184,21 @@ def run( logger.info("Dialog %s already transcribed on vCon: %s", index, vCon.uuid) continue + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} try: # Attempt transcription with timing metrics start = time.time() logger.debug("Transcribing dialog %s in vCon: %s", index, vCon.uuid) result = transcribe_hugging_face_whisper(dialog, opts) - record_histogram("conserver.link.hugging_face_whisper.transcription_time", time.time() - start) + record_histogram("conserver.link.hugging_face_whisper.transcription_time", time.time() - start, attributes=attrs) except (RetryError, Exception) as e: logger.error("Failed to transcribe vCon %s after multiple retries: %s", vcon_uuid, e) - increment_counter("conserver.link.hugging_face_whisper.transcription_failures") + increment_counter("conserver.link.hugging_face_whisper.transcription_failures", attributes=attrs) break if not result: logger.warning("No transcription generated for vCon %s", vcon_uuid) - increment_counter("conserver.link.hugging_face_whisper.transcription_failures") + increment_counter("conserver.link.hugging_face_whisper.transcription_failures", attributes=attrs) break logger.info("Transcribed vCon: %s", vCon.uuid) diff --git a/server/links/hugging_llm_link/__init__.py b/server/links/hugging_llm_link/__init__.py index a7ad8f5..6873e1c 100644 --- a/server/links/hugging_llm_link/__init__.py +++ b/server/links/hugging_llm_link/__init__.py @@ -268,13 +268,14 @@ def process_vcon(self, vcon_uuid: str, link_name: str) -> str: logger.info("No transcript found in vCon: %s", vcon_uuid) return vcon_uuid + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} try: start = time.time() result = self.llm.analyze(transcript_text) - record_histogram("conserver.link.huggingface.llm_time", time.time() - start) + record_histogram("conserver.link.huggingface.llm_time", time.time() - start, attributes=attrs) except (RetryError, Exception) as e: logger.error("Failed to analyze vCon %s: %s", vcon_uuid, str(e)) - increment_counter("conserver.link.huggingface.llm_failures") + increment_counter("conserver.link.huggingface.llm_failures", attributes=attrs) return vcon_uuid self._add_analysis_to_vcon(vcon, result) diff --git a/server/links/jq_link/__init__.py b/server/links/jq_link/__init__.py index 5b7c88e..8842067 100644 --- a/server/links/jq_link/__init__.py +++ b/server/links/jq_link/__init__.py @@ -1,5 +1,6 @@ from lib.logging_utils import init_logger from lib.vcon_redis import VconRedis +from lib.metrics import increment_counter import jq logger = init_logger(__name__) @@ -41,6 +42,7 @@ def run(vcon_uuid, link_name, opts=default_options): # Convert vCon to dict for jq vcon_dict = vcon.to_dict() + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} try: # Apply the jq filter @@ -48,26 +50,28 @@ def run(vcon_uuid, link_name, opts=default_options): logger.debug(f"Applying jq filter '{opts['filter']}' to vCon {vcon_uuid}") program = jq.compile(opts["filter"]) results = list(program.input(vcon_dict)) - + # Handle empty results if not results: logger.debug(f"JQ filter returned no results for vCon {vcon_uuid}") matches = False else: matches = bool(results[0]) - + logger.debug(f"JQ filter results: {results}") except Exception as e: + increment_counter("conserver.link.jq.filter_errors", attributes=attrs) logger.error(f"Error applying jq filter '{opts['filter']}' to vCon {vcon_uuid}: {e}") logger.debug(f"vCon content: {vcon_dict}") return None # Forward based on matches and forward_matches setting should_forward = matches == opts["forward_matches"] - + if should_forward: logger.info(f"vCon {vcon_uuid} {'' if matches else 'did not '}match filter - forwarding") return vcon_uuid else: + increment_counter("conserver.link.jq.vcon_filtered_out", attributes=attrs) logger.info(f"vCon {vcon_uuid} {'' if matches else 'did not '}match filter - filtering out") return None \ No newline at end of file diff --git a/server/links/openai_transcribe/__init__.py b/server/links/openai_transcribe/__init__.py index fc76481..c93f3bb 100644 --- a/server/links/openai_transcribe/__init__.py +++ b/server/links/openai_transcribe/__init__.py @@ -535,6 +535,7 @@ def run( continue # Initialize OpenAI client for each dialog (in case key changes) + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} start = time.time() result = None try: @@ -542,15 +543,15 @@ def run( result = transcribe_openai(dialog["url"], opts, vcon_uuid) except Exception as e: logger.error("Failed to transcribe vCon %s after multiple retries: %s", vcon_uuid, e, exc_info=True) - increment_counter("conserver.link.openai.transcription_failures") + increment_counter("conserver.link.openai.transcription_failures", attributes=attrs) raise e elapsed = time.time() - start - record_histogram("conserver.link.openai.transcription_time", elapsed) + record_histogram("conserver.link.openai.transcription_time", elapsed, attributes=attrs) logger.info(f"Transcription for dialog {index} took {elapsed:.2f} seconds.") if not result: logger.warning("No transcription generated for vCon %s, dialog %s", vcon_uuid, index) - increment_counter("conserver.link.openai.transcription_failures") + increment_counter("conserver.link.openai.transcription_failures", attributes=attrs) break logger.info("Transcribed vCon: %s, dialog: %s", vCon.uuid, index) diff --git a/server/links/post_analysis_to_slack/__init__.py b/server/links/post_analysis_to_slack/__init__.py index 0cf5905..95b7110 100644 --- a/server/links/post_analysis_to_slack/__init__.py +++ b/server/links/post_analysis_to_slack/__init__.py @@ -1,5 +1,6 @@ from server.lib.vcon_redis import VconRedis from lib.logging_utils import init_logger +from lib.metrics import increment_counter from slack_sdk.web import WebClient logger = init_logger(__name__) @@ -41,7 +42,7 @@ def get_summary(vcon, index): return None -def post_blocks_to_channel(token, channel_name, abstract, url, opts): +def post_blocks_to_channel(token, channel_name, abstract, url, opts, attrs=None): blocks = [ { "type": "section", @@ -64,6 +65,8 @@ def post_blocks_to_channel(token, channel_name, abstract, url, opts): try: client.chat_postMessage(channel=channel_name, blocks=blocks, text=abstract) except Exception as e: + if attrs: + increment_counter("conserver.link.slack.fallback_channel_used", attributes=attrs) # Code to run if an exception is raised client.chat_postMessage( channel=opts["default_channel_name"], @@ -84,6 +87,8 @@ def run(vcon_id, link_name, opts=default_options): vcon_redis = VconRedis() vcon = vcon_redis.get_vcon(vcon_id) + attrs = {"link.name": link_name, "vcon.uuid": vcon_id} + for a in vcon.analysis: # we still need to run this check give the following scenario: # 0 customers_frustration None @@ -107,9 +112,17 @@ def run(vcon_id, link_name, opts=default_options): if team_name and team_name != "strolid": channel_name = f"team-{team_name}-alerts" abstract = abstract + f" #{dealer_name}" - post_blocks_to_channel(opts["token"], channel_name, abstract, url, opts) - - post_blocks_to_channel(opts["token"], opts["default_channel_name"], abstract, url, opts) + try: + post_blocks_to_channel(opts["token"], channel_name, abstract, url, opts, attrs=attrs) + except Exception: + increment_counter("conserver.link.slack.post_failures", attributes=attrs) + raise + + try: + post_blocks_to_channel(opts["token"], opts["default_channel_name"], abstract, url, opts, attrs=attrs) + except Exception: + increment_counter("conserver.link.slack.post_failures", attributes=attrs) + raise a["was_posted_to_slack"] = True vcon_redis.store_vcon(vcon) diff --git a/server/links/sampler/__init__.py b/server/links/sampler/__init__.py index c0c06f5..dab2e59 100644 --- a/server/links/sampler/__init__.py +++ b/server/links/sampler/__init__.py @@ -2,6 +2,7 @@ import time import hashlib from lib.logging_utils import init_logger +from lib.metrics import increment_counter logger = init_logger(__name__) @@ -42,18 +43,25 @@ def run(vcon_uuid: str, link_name: str, opts: dict = default_options) -> str | N method = options["method"] value = options["value"] + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid, "method": method} if method == "percentage": - return _percentage_sampling(vcon_uuid, value) + result = _percentage_sampling(vcon_uuid, value) elif method == "rate": - return _rate_sampling(vcon_uuid, value) + result = _rate_sampling(vcon_uuid, value) elif method == "modulo": - return _modulo_sampling(vcon_uuid, value) + result = _modulo_sampling(vcon_uuid, value) elif method == "time_based": - return _time_based_sampling(vcon_uuid, value) + result = _time_based_sampling(vcon_uuid, value) else: raise ValueError(f"Unknown sampling method: {method}") + if result: + increment_counter("conserver.link.sampler.sampled_in", attributes=attrs) + else: + increment_counter("conserver.link.sampler.sampled_out", attributes=attrs) + return result + def _percentage_sampling(vcon_uuid: str, percentage: float) -> str | None: """ diff --git a/server/links/scitt/__init__.py b/server/links/scitt/__init__.py index ff010eb..d8248ef 100644 --- a/server/links/scitt/__init__.py +++ b/server/links/scitt/__init__.py @@ -5,6 +5,7 @@ from fastapi import HTTPException from lib.vcon_redis import VconRedis from lib.logging_utils import init_logger +from lib.metrics import increment_counter from starlette.status import HTTP_404_NOT_FOUND, HTTP_501_NOT_IMPLEMENTED import hashlib @@ -91,17 +92,23 @@ def run( signing_key_path = os.path.join(opts["signing_key_path"]) signing_key = create_hashed_signed_statement.open_signing_key(signing_key_path) - signed_statement = create_hashed_signed_statement.create_hashed_signed_statement( - issuer=opts["issuer"], - signing_key=signing_key, - subject=subject, - kid=key_id.encode('utf-8'), - meta_map=meta_map, - payload=payload.encode('utf-8'), - payload_hash_alg=payload_hash_alg, - payload_location=payload_location, - pre_image_content_type="application/vcon+json" - ) + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} + + try: + signed_statement = create_hashed_signed_statement.create_hashed_signed_statement( + issuer=opts["issuer"], + signing_key=signing_key, + subject=subject, + kid=key_id.encode('utf-8'), + meta_map=meta_map, + payload=payload.encode('utf-8'), + payload_hash_alg=payload_hash_alg, + payload_location=payload_location, + pre_image_content_type="application/vcon+json" + ) + except Exception: + increment_counter("conserver.link.scitt.statement_creation_failures", attributes=attrs) + raise logger.info(f"signed_statement: {signed_statement}") ############################### @@ -118,11 +125,15 @@ def run( detail=f"OIDC_flow not found or unsupported. OIDC_flow: {oidc_flow}" ) - operation_id = register_signed_statement.register_statement( - opts=opts, - auth=auth, - signed_statement=signed_statement - ) + try: + operation_id = register_signed_statement.register_statement( + opts=opts, + auth=auth, + signed_statement=signed_statement + ) + except Exception: + increment_counter("conserver.link.scitt.registration_failures", attributes=attrs) + raise logger.info(f"operation_id: {operation_id}") return vcon_uuid diff --git a/server/links/tag_router/__init__.py b/server/links/tag_router/__init__.py index 7987e2f..8a38ecf 100644 --- a/server/links/tag_router/__init__.py +++ b/server/links/tag_router/__init__.py @@ -1,5 +1,6 @@ from lib.logging_utils import init_logger from lib.vcon_redis import VconRedis +from lib.metrics import increment_counter from redis_mgr import redis logger = init_logger(__name__) @@ -67,6 +68,8 @@ def run(vcon_uuid, link_name, opts=default_options): logger.debug(f"No tags found in vCon {vcon_uuid}") return vcon_uuid if opts.get("forward_original") else None + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} + # Route the vCon to the appropriate Redis lists based on tags routed = False for tag in tags: @@ -75,14 +78,19 @@ def run(vcon_uuid, link_name, opts=default_options): logger.info(f"Routing vCon {vcon_uuid} to list '{target_list}' based on tag '{tag}'") # Push the vCon UUID to the target Redis list redis.rpush(target_list, str(vcon_uuid)) + increment_counter( + "conserver.link.tag_router.routes_matched", + attributes={**attrs, "route": target_list}, + ) routed = True else: logger.debug(f"No route configured for tag '{tag}'") - + if routed: + increment_counter("conserver.link.tag_router.routed_count", attributes=attrs) logger.info(f"Successfully routed vCon {vcon_uuid} based on tags") else: logger.info(f"No applicable routes found for vCon {vcon_uuid}") - + # Return based on forward_original setting return vcon_uuid if opts.get("forward_original") else None diff --git a/server/links/webhook/__init__.py b/server/links/webhook/__init__.py index 16b433c..0a11061 100644 --- a/server/links/webhook/__init__.py +++ b/server/links/webhook/__init__.py @@ -1,6 +1,6 @@ from server.lib.vcon_redis import VconRedis from lib.logging_utils import init_logger - +from lib.metrics import increment_counter import requests logger = init_logger(__name__) @@ -38,10 +38,12 @@ def run( # Validate webhook URLs are configured webhook_urls = opts.get("webhook-urls", []) + attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid} if not webhook_urls: logger.warning( f"webhook plugin: no webhook-urls configured for vcon {vcon_uuid}, skipping" ) + increment_counter("conserver.link.webhook.no_urls_configured", attributes=attrs) return vcon_uuid # Post this to each webhook url @@ -49,10 +51,16 @@ def run( logger.info( f"webhook plugin: posting vcon {vcon_uuid} to webhook url: {url}" ) - resp = requests.post(url, json=json_dict, headers=headers) - logger.info( - f"webhook plugin response for {vcon_uuid}: {resp.status_code} {resp.text}" - ) + try: + resp = requests.post(url, json=json_dict, headers=headers) + logger.info( + f"webhook plugin response for {vcon_uuid}: {resp.status_code} {resp.text}" + ) + except Exception as e: + increment_counter("conserver.link.webhook.post_failures", attributes=attrs) + logger.error(f"webhook plugin: failed to post vcon {vcon_uuid} to {url}: {e}") + raise + # Return the vcon_uuid down the chain. # If you want the vCon processing to stop (if you are filtering them, for instance) # send None diff --git a/server/main.py b/server/main.py index 0ec4815..5125c7f 100644 --- a/server/main.py +++ b/server/main.py @@ -242,8 +242,9 @@ def process(self) -> None: "chain_name": self.chain_details["name"] } ) - record_histogram("conserver.main_loop.vcon_processing_time", vcon_processing_time) - increment_counter("conserver.main_loop.count_vcons_processed") + chain_attrs = {"chain.name": self.chain_details["name"]} + record_histogram("conserver.main_loop.vcon_processing_time", vcon_processing_time, attributes=chain_attrs) + increment_counter("conserver.main_loop.count_vcons_processed", attributes=chain_attrs) # End span if created - exit the context manager if self._span_context_manager: @@ -514,6 +515,15 @@ def _process_link(self, links: list[str], link_index: int) -> bool: started = time.time() should_continue_chain = module.run(self.vcon_id, link_name, options) link_processing_time = round(time.time() - started, 3) + record_histogram( + "conserver.link.execution_time", + link_processing_time, + attributes={ + "link.name": link_name, + "vcon.uuid": self.vcon_id, + "chain.name": self.chain_details["name"], + }, + ) logger.info( "Completed link %s (module: %s) for vCon: %s in %s seconds", link_name, diff --git a/server/tests/conftest.py b/server/tests/conftest.py new file mode 100644 index 0000000..ea21e99 --- /dev/null +++ b/server/tests/conftest.py @@ -0,0 +1,28 @@ +import pytest +from unittest.mock import patch +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader + + +@pytest.fixture +def metric_reader(): + """Injects an in-memory OTEL meter into lib.metrics for validation tests. + + Yields the InMemoryMetricReader so tests can assert on emitted metrics. + Each test gets a fresh meter with no prior state. + """ + reader = InMemoryMetricReader() + provider = MeterProvider(metric_readers=[reader]) + test_meter = provider.get_meter("vcon-server-test") + + with patch.multiple( + "lib.metrics", + meter=test_meter, + _otel_initialized=True, + OTEL_EXPORTER_OTLP_ENDPOINT="http://test-collector:4317", + counter_metrics={}, + histogram_metrics={}, + ): + yield reader + + provider.shutdown() diff --git a/server/tests/test_link_metrics.py b/server/tests/test_link_metrics.py new file mode 100644 index 0000000..497b6d7 --- /dev/null +++ b/server/tests/test_link_metrics.py @@ -0,0 +1,605 @@ +"""End-to-end validation tests for link-level OTEL metrics (CON-6). + +Each test exercises a link's run() function with mocked external calls and +asserts that the expected metric names and attributes are emitted to the +in-memory OTEL reader. No external services required. + +Run with: + pytest server/tests/test_link_metrics.py -v +""" +import json +import pytest +from unittest.mock import patch, MagicMock +from tenacity import RetryError + +from server.vcon import Vcon + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def extract_metrics(reader): + """Return {metric_name: [{"attributes": {...}, ...}, ...]} from reader.""" + result = {} + data = reader.get_metrics_data() + for rm in data.resource_metrics: + for sm in rm.scope_metrics: + for metric in sm.metrics: + points = result.setdefault(metric.name, []) + for dp in metric.data.data_points: + point = {"attributes": dict(dp.attributes)} + # Histogram data points have sum/count; Sum data points have value + if hasattr(dp, "sum") and hasattr(dp, "count"): + point["sum"] = dp.sum + point["count"] = dp.count + else: + point["value"] = dp.value + points.append(point) + return result + + +def make_recording_vcon(): + """Vcon with one recording dialog (duration 120s, no transcript yet).""" + vcon = Vcon.build_new() + vcon.add_dialog({ + "type": "recording", + "url": "https://example.com/audio.wav", + "duration": 120, + }) + return vcon + + +def make_transcript_vcon(): + """Vcon with one dialog that already has a transcript analysis.""" + vcon = make_recording_vcon() + vcon.add_analysis( + type="transcript", + dialog=0, + vendor="test", + body={"paragraphs": {"transcript": "Hello world"}, "transcript": "Hello world"}, + ) + return vcon + + +def assert_metric_has_attrs(metrics, metric_name, expected_attrs): + """Assert metric_name exists and at least one data point matches expected_attrs.""" + assert metric_name in metrics, f"Metric '{metric_name}' not found. Got: {list(metrics)}" + points = metrics[metric_name] + for point in points: + if all(point["attributes"].get(k) == v for k, v in expected_attrs.items()): + return + raise AssertionError( + f"No data point for '{metric_name}' has attributes {expected_attrs}. " + f"Data points: {points}" + ) + + +# --------------------------------------------------------------------------- +# deepgram_link +# --------------------------------------------------------------------------- + +class TestDeepgramMetrics: + LINK_NAME = "deepgram_link" + UUID = "dg-test-uuid" + OPTS = {"DEEPGRAM_KEY": "fake-key", "minimum_duration": 60, "minimum_confidence": 0.5, "api": {}} + + def _run(self, vcon, mock_result=None, side_effect=None): + from server.links.deepgram_link import run + with patch("server.links.deepgram_link.VconRedis") as mock_redis_cls, \ + patch("server.links.deepgram_link.transcribe_dg") as mock_transcribe: + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + if side_effect: + mock_transcribe.side_effect = side_effect + else: + mock_transcribe.return_value = mock_result + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_transcription_time(self, metric_reader): + vcon = make_recording_vcon() + result = {"confidence": 0.9, "transcript": "hello", "detected_language": "en", "words": [], "paragraphs": {}} + self._run(vcon, mock_result=result) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.deepgram.transcription_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + def test_success_records_confidence(self, metric_reader): + vcon = make_recording_vcon() + result = {"confidence": 0.9, "transcript": "hello", "detected_language": "en", "words": [], "paragraphs": {}} + self._run(vcon, mock_result=result) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.deepgram.confidence", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_recording_vcon() + self._run(vcon, side_effect=Exception("API error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.deepgram.transcription_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + +# --------------------------------------------------------------------------- +# groq_whisper +# --------------------------------------------------------------------------- + +class TestGroqWhisperMetrics: + LINK_NAME = "groq_whisper" + UUID = "groq-test-uuid" + OPTS = {"API_KEY": "fake-key", "minimum_duration": 30} + + def _run(self, vcon, mock_result=None, side_effect=None): + from server.links.groq_whisper import run + with patch("server.links.groq_whisper.VconRedis") as mock_redis_cls, \ + patch("server.links.groq_whisper.transcribe_groq_whisper") as mock_transcribe: + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + if side_effect: + mock_transcribe.side_effect = side_effect + else: + mock_result_obj = MagicMock() + mock_result_obj.text = mock_result or "transcription text" + mock_transcribe.return_value = mock_result_obj + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_transcription_time(self, metric_reader): + vcon = make_recording_vcon() + self._run(vcon) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.groq_whisper.transcription_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_recording_vcon() + self._run(vcon, side_effect=RetryError(last_attempt=MagicMock())) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.groq_whisper.transcription_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + +# --------------------------------------------------------------------------- +# hugging_face_whisper +# --------------------------------------------------------------------------- + +class TestHuggingFaceWhisperMetrics: + LINK_NAME = "hugging_face_whisper" + UUID = "hfw-test-uuid" + OPTS = {"API_URL": "https://fake.hf.co", "API_KEY": "fake-key", "minimum_duration": 30, "Content-Type": "audio/flac"} + + def _run(self, vcon, mock_result=None, side_effect=None): + from server.links.hugging_face_whisper import run + with patch("server.links.hugging_face_whisper.VconRedis") as mock_redis_cls, \ + patch("server.links.hugging_face_whisper.transcribe_hugging_face_whisper") as mock_transcribe: + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + if side_effect: + mock_transcribe.side_effect = side_effect + else: + mock_transcribe.return_value = mock_result or {"text": "hello world"} + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_transcription_time(self, metric_reader): + vcon = make_recording_vcon() + self._run(vcon) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.hugging_face_whisper.transcription_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_recording_vcon() + self._run(vcon, side_effect=Exception("HF API error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.hugging_face_whisper.transcription_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + +# --------------------------------------------------------------------------- +# openai_transcribe +# --------------------------------------------------------------------------- + +class TestOpenAITranscribeMetrics: + LINK_NAME = "openai_transcribe" + UUID = "oai-transcribe-uuid" + OPTS = {"OPENAI_API_KEY": "fake-key", "model": "gpt-4o-transcribe", "minimum_duration": 3} + + def _run(self, vcon, mock_result=None, side_effect=None): + from server.links.openai_transcribe import run + with patch("server.links.openai_transcribe.VconRedis") as mock_redis_cls, \ + patch("server.links.openai_transcribe.transcribe_openai") as mock_transcribe: + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + if side_effect: + mock_transcribe.side_effect = side_effect + else: + mock_transcribe.return_value = mock_result or {"text": "transcribed text"} + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_transcription_time(self, metric_reader): + vcon = make_recording_vcon() + self._run(vcon) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.transcription_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_recording_vcon() + self._run(vcon, side_effect=Exception("OpenAI error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.transcription_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + +# --------------------------------------------------------------------------- +# analyze +# --------------------------------------------------------------------------- + +class TestAnalyzeMetrics: + LINK_NAME = "analyze" + UUID = "analyze-test-uuid" + OPTS = {"analysis_type": "summary", "model": "gpt-3.5-turbo-16k", "prompt": "Summarize", "temperature": 0, + "source": {"analysis_type": "transcript", "text_location": "body.paragraphs.transcript"}} + + def _run(self, vcon, mock_result=None, side_effect=None): + from server.links.analyze import run + with patch("server.links.analyze.VconRedis") as mock_redis_cls, \ + patch("server.links.analyze.get_openai_client"), \ + patch("server.links.analyze.generate_analysis") as mock_gen, \ + patch("server.links.analyze.is_included", return_value=True), \ + patch("server.links.analyze.randomly_execute_with_sampling", return_value=True), \ + patch("server.links.analyze.send_ai_usage_data_for_tracking"): + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + if side_effect: + mock_gen.side_effect = side_effect + else: + mock_gen.return_value = mock_result or "This is a summary." + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_analysis_time(self, metric_reader): + vcon = make_transcript_vcon() + self._run(vcon) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.analysis_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_transcript_vcon() + self._run(vcon, side_effect=Exception("OpenAI error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.analysis_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + +# --------------------------------------------------------------------------- +# analyze_and_label +# --------------------------------------------------------------------------- + +class TestAnalyzeAndLabelMetrics: + LINK_NAME = "analyze_and_label" + UUID = "aal-test-uuid" + OPTS = {"analysis_type": "labeled_analysis", "model": "gpt-4-turbo", "prompt": "Label this", + "temperature": 0.2, "source": {"analysis_type": "transcript", "text_location": "body.paragraphs.transcript"}, + "response_format": {"type": "json_object"}} + + def _run(self, vcon, mock_result=None, side_effect=None): + from server.links.analyze_and_label import run + with patch("server.links.analyze_and_label.VconRedis") as mock_redis_cls, \ + patch("server.links.analyze_and_label.get_openai_client"), \ + patch("server.links.analyze_and_label.generate_analysis_with_labels") as mock_gen, \ + patch("server.links.analyze_and_label.is_included", return_value=True), \ + patch("server.links.analyze_and_label.randomly_execute_with_sampling", return_value=True): + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + if side_effect: + mock_gen.side_effect = side_effect + else: + mock_gen.return_value = mock_result or json.dumps({"labels": ["label1", "label2"]}) + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_labels_added(self, metric_reader): + vcon = make_transcript_vcon() + self._run(vcon) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.labels_added", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + def test_success_records_analysis_time(self, metric_reader): + vcon = make_transcript_vcon() + self._run(vcon) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.analysis_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_transcript_vcon() + self._run(vcon, side_effect=Exception("OpenAI error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.analysis_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + +# --------------------------------------------------------------------------- +# analyze_vcon +# --------------------------------------------------------------------------- + +class TestAnalyzeVconMetrics: + LINK_NAME = "analyze_vcon" + UUID = "avcon-test-uuid" + OPTS = {"analysis_type": "json_analysis", "model": "gpt-3.5-turbo-16k", + "prompt": "Analyze", "system_prompt": "You are helpful.", "temperature": 0, + "remove_body_properties": False} + + def _run(self, vcon, mock_result=None, side_effect=None): + from server.links.analyze_vcon import run + with patch("server.links.analyze_vcon.VconRedis") as mock_redis_cls, \ + patch("server.links.analyze_vcon.get_openai_client"), \ + patch("server.links.analyze_vcon.generate_analysis") as mock_gen, \ + patch("server.links.analyze_vcon.is_included", return_value=True), \ + patch("server.links.analyze_vcon.randomly_execute_with_sampling", return_value=True): + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_vcon = MagicMock() + mock_vcon.uuid = self.UUID + mock_vcon.analysis = [] + mock_vcon.to_dict.return_value = {"uuid": self.UUID, "dialog": [], "analysis": []} + mock_redis.get_vcon.return_value = mock_vcon + if side_effect: + mock_gen.side_effect = side_effect + else: + mock_gen.return_value = mock_result or json.dumps({"summary": "ok"}) + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_analysis_time(self, metric_reader): + self._run(None) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.analysis_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + def test_failure_increments_counter(self, metric_reader): + self._run(None, side_effect=Exception("OpenAI error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.analysis_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + +# --------------------------------------------------------------------------- +# check_and_tag +# --------------------------------------------------------------------------- + +class TestCheckAndTagMetrics: + LINK_NAME = "check_and_tag" + UUID = "cat-test-uuid" + OPTS = {"analysis_type": "tag_evaluation", "model": "gpt-4", + "tag_name": "urgent", "tag_value": "true", + "evaluation_question": "Is this urgent?", + "source": {"analysis_type": "transcript", "text_location": "body"}, + "response_format": {"type": "json_object"}} + + def _run(self, vcon, applies=True, side_effect=None): + from server.links.check_and_tag import run + with patch("server.links.check_and_tag.VconRedis") as mock_redis_cls, \ + patch("server.links.check_and_tag.get_openai_client"), \ + patch("server.links.check_and_tag.generate_tag_evaluation") as mock_gen, \ + patch("server.links.check_and_tag.is_included", return_value=True), \ + patch("server.links.check_and_tag.randomly_execute_with_sampling", return_value=True): + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + if side_effect: + mock_gen.side_effect = side_effect + else: + mock_gen.return_value = json.dumps({"applies": applies}) + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_tag_applied_increments_counter(self, metric_reader): + vcon = make_transcript_vcon() + # transcript body needs to be accessible at path "body" for check_and_tag + vcon.analysis[0]["body"] = "Hello world" + self._run(vcon, applies=True) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.tags_applied", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"], + "tag_name": self.OPTS["tag_name"]}) + + def test_success_records_evaluation_time(self, metric_reader): + vcon = make_transcript_vcon() + vcon.analysis[0]["body"] = "Hello world" + self._run(vcon, applies=True) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.evaluation_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_transcript_vcon() + vcon.analysis[0]["body"] = "Hello world" + self._run(vcon, side_effect=Exception("OpenAI error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.evaluation_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + +# --------------------------------------------------------------------------- +# detect_engagement +# --------------------------------------------------------------------------- + +class TestDetectEngagementMetrics: + LINK_NAME = "detect_engagement" + UUID = "eng-test-uuid" + OPTS = {"analysis_type": "engagement_analysis", "model": "gpt-4.1", "temperature": 0.2, + "OPENAI_API_KEY": "fake-key", + "source": {"analysis_type": "transcript", "text_location": "body.paragraphs.transcript"}} + + def _run(self, vcon, engaged=True, side_effect=None): + from server.links.detect_engagement import run + with patch("server.links.detect_engagement.VconRedis") as mock_redis_cls, \ + patch("server.links.detect_engagement.get_openai_client"), \ + patch("server.links.detect_engagement.check_engagement") as mock_check, \ + patch("server.links.detect_engagement.is_included", return_value=True), \ + patch("server.links.detect_engagement.randomly_execute_with_sampling", return_value=True): + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + if side_effect: + mock_check.side_effect = side_effect + else: + mock_check.return_value = engaged + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_engagement_counter(self, metric_reader): + vcon = make_transcript_vcon() + self._run(vcon, engaged=True) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.engagement_detected", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + def test_success_records_analysis_time(self, metric_reader): + vcon = make_transcript_vcon() + self._run(vcon, engaged=True) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.engagement_analysis_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_transcript_vcon() + self._run(vcon, side_effect=Exception("OpenAI error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.openai.engagement_analysis_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID, + "analysis_type": self.OPTS["analysis_type"]}) + + +# --------------------------------------------------------------------------- +# hugging_llm_link +# --------------------------------------------------------------------------- + +class TestHuggingLLMMetrics: + LINK_NAME = "hugging_llm" + UUID = "hllm-test-uuid" + OPTS = {"HUGGINGFACE_API_KEY": "fake-key", "use_local_model": False} + + def _run(self, vcon, mock_result=None, side_effect=None): + from server.links.hugging_llm_link import run + with patch("server.links.hugging_llm_link.VconRedis") as mock_redis_cls, \ + patch("server.links.hugging_llm_link.HuggingFaceLLM") as mock_llm_cls: + mock_redis = MagicMock() + mock_redis_cls.return_value = mock_redis + mock_redis.get_vcon.return_value = vcon + + mock_llm = MagicMock() + mock_llm_cls.return_value = mock_llm + if side_effect: + mock_llm.analyze.side_effect = side_effect + else: + mock_llm.analyze.return_value = mock_result or { + "analysis": "some analysis", "model": "llama-2", "parameters": {} + } + try: + run(self.UUID, self.LINK_NAME, self.OPTS) + except Exception: + pass + + def test_success_records_llm_time(self, metric_reader): + vcon = make_transcript_vcon() + # hugging_llm reads body.transcript (dict format) + vcon.analysis[0]["body"] = {"transcript": "Hello world"} + self._run(vcon) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.huggingface.llm_time", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + def test_failure_increments_counter(self, metric_reader): + vcon = make_transcript_vcon() + vcon.analysis[0]["body"] = {"transcript": "Hello world"} + self._run(vcon, side_effect=Exception("HF API error")) + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.link.huggingface.llm_failures", + {"link.name": self.LINK_NAME, "vcon.uuid": self.UUID}) + + +# --------------------------------------------------------------------------- +# Main loop (VconChainRequest) +# --------------------------------------------------------------------------- + +class TestMainLoopMetrics: + CHAIN_NAME = "test_chain" + UUID = "main-loop-uuid" + + def test_vcon_processing_records_time_and_count(self, metric_reader): + from server.main import VconChainRequest + import server.main as main_module + + chain_details = { + "name": self.CHAIN_NAME, + "links": ["mock_link"], + "storages": [], + "egress_lists": [], + } + + # Provide a minimal config so _process_link can find the link + main_module.config = { + "links": { + "mock_link": { + "module": "mock_module", + "options": {}, + } + } + } + + mock_module = MagicMock() + mock_module.run.return_value = self.UUID + + with patch.dict("server.main.imported_modules", {"mock_module": mock_module}): + req = VconChainRequest(chain_details, self.UUID, context=None) + req.process() + + metrics = extract_metrics(metric_reader) + assert_metric_has_attrs(metrics, "conserver.main_loop.vcon_processing_time", + {"chain.name": self.CHAIN_NAME}) + assert_metric_has_attrs(metrics, "conserver.main_loop.count_vcons_processed", + {"chain.name": self.CHAIN_NAME})