Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/lib/vcon_redis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Optional
from lib.logging_utils import init_logger
from lib.metrics import increment_counter
from redis.commands.json.path import Path
from redis_mgr import redis
from settings import VCON_REDIS_EXPIRY
Expand Down Expand Up @@ -49,6 +50,7 @@ def get_vcon(self, vcon_id: str) -> Optional[vcon.Vcon]:
f"vcon:{vcon_id}", Path.root_path()
)
if not vcon_dict:
increment_counter("conserver.lib.vcon_redis.get_vcon_not_found")
return None
_vcon = vcon.Vcon(vcon_dict)
return _vcon
Expand Down
4 changes: 2 additions & 2 deletions server/links/analyze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,14 @@ def run(
)
increment_counter(
"conserver.link.openai.analysis_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.analysis_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vendor_schema = {}
Expand Down
10 changes: 5 additions & 5 deletions server/links/analyze_and_label/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ def run(
increment_counter(
"conserver.link.openai.labels_added",
value=len(labels),
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON response for vCon {vcon_uuid}: {e}")
increment_counter(
"conserver.link.openai.json_parse_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
# Add the raw text anyway as the analysis
vCon.add_analysis(
Expand All @@ -182,14 +182,14 @@ def run(
)
increment_counter(
"conserver.link.openai.analysis_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.analysis_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vcon_redis.store_vcon(vCon)
Expand Down
8 changes: 4 additions & 4 deletions server/links/analyze_vcon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ def run(
)
increment_counter(
"conserver.link.openai.invalid_json",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise ValueError("Invalid JSON response from OpenAI")

except Exception as e:
logger.error(
"Failed to generate analysis for vCon %s after multiple retries: %s",
Expand All @@ -160,14 +160,14 @@ def run(
)
increment_counter(
"conserver.link.openai.analysis_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.analysis_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vendor_schema = {}
Expand Down
6 changes: 3 additions & 3 deletions server/links/check_and_tag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def run(
logger.info(f"Applied tag: {opts['tag_name']}:{opts['tag_value']} (evaluation: {applies})")
increment_counter(
"conserver.link.openai.tags_applied",
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']},
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
else:
logger.info(f"Tag not applied: {opts['tag_name']}:{opts['tag_value']} (evaluation: {applies})")
Expand All @@ -194,14 +194,14 @@ def run(
)
increment_counter(
"conserver.link.openai.evaluation_failures",
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']},
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.evaluation_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']},
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vcon_redis.store_vcon(vCon)
Expand Down
13 changes: 12 additions & 1 deletion server/links/datatrails/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from fastapi import HTTPException
from lib.vcon_redis import VconRedis
from lib.logging_utils import init_logger
from lib.metrics import increment_counter
from starlette.status import HTTP_404_NOT_FOUND, HTTP_501_NOT_IMPLEMENTED
from vcon import Vcon

Expand Down Expand Up @@ -382,7 +383,13 @@ def run(vcon_uuid: str, link_name: str, opts: dict = default_options) -> str:
# }
# )

event = create_asset_event(opts, asset_id, auth, event_attributes)
attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}

try:
event = create_asset_event(opts, asset_id, auth, event_attributes)
except Exception:
increment_counter("conserver.link.datatrails.event_creation_failures", attributes=attrs)
raise
event_id = event["identity"]
logger.info(f"DataTrails: Event Created: {event_id}")

Expand All @@ -395,6 +402,10 @@ def run(vcon_uuid: str, link_name: str, opts: dict = default_options) -> str:
event_id = event["identity"]
logger.info(f"DataTrails: New Event Created: {event_id}")
except:
increment_counter(
"conserver.link.datatrails.event_creation_failures",
attributes={**attrs, "event_type": "asset_free"},
)
logger.info(f"DataTrails: New Event Creation Failure")


Expand Down
11 changes: 6 additions & 5 deletions server/links/deepgram_link/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def run(
logger.info("Dialog %s already transcribed on vCon: %s", index, vCon.uuid)
continue

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}
start = time.time()
try:
if opts.get("LITELLM_PROXY_URL") and opts.get("LITELLM_MASTER_KEY"):
Expand All @@ -248,26 +249,26 @@ def run(
result = transcribe_dg(dg_client, dialog, opts["api"], vcon_uuid=vcon_uuid, run_opts=opts)
except Exception as e:
logger.error("Failed to transcribe vCon %s after multiple retries: %s", vcon_uuid, e, exc_info=True)
increment_counter("conserver.link.deepgram.transcription_failures")
increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs)
raise e
elapsed = time.time() - start
record_histogram("conserver.link.deepgram.transcription_time", elapsed)
record_histogram("conserver.link.deepgram.transcription_time", elapsed, attributes=attrs)
logger.info(f"Transcription for dialog {index} took {elapsed:.2f} seconds.")

if not result:
logger.warning("No transcription generated for vCon %s, dialog %s", vcon_uuid, index)
increment_counter("conserver.link.deepgram.transcription_failures")
increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs)
break

# Log and track confidence (not available for LiteLLM/OpenAI-format transcription)
confidence = result.get("confidence")
if confidence is not None:
record_histogram("conserver.link.deepgram.confidence", confidence)
record_histogram("conserver.link.deepgram.confidence", confidence, attributes=attrs)
logger.info(f"Transcription confidence for dialog {index}: {confidence}")
# If the confidence is too low, don't store the transcript
if confidence < opts["minimum_confidence"]:
logger.warning("Low confidence result for vCon %s, dialog %s: %s", vcon_uuid, index, confidence)
increment_counter("conserver.link.deepgram.transcription_failures")
increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs)
continue
else:
logger.info(f"Confidence not available for dialog {index} (LiteLLM path), skipping threshold check")
Expand Down
6 changes: 3 additions & 3 deletions server/links/detect_engagement/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def run(
increment_counter(
"conserver.link.openai.engagement_detected",
value=1 if is_engaged else 0,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

except Exception as e:
Expand All @@ -158,14 +158,14 @@ def run(
)
increment_counter(
"conserver.link.openai.engagement_analysis_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.engagement_analysis_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vcon_redis.store_vcon(vCon)
Expand Down
11 changes: 8 additions & 3 deletions server/links/diet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from redis_mgr import redis
from lib.logging_utils import init_logger
from lib.metrics import increment_counter
import json
import requests
from typing import Dict, List, Any, Optional
Expand All @@ -19,13 +20,15 @@

def run(vcon_uuid, link_name, opts=default_options):
logger.info("Starting diet::run")

# Merge provided options with defaults
options = {**default_options, **opts}

for key, value in options.items():
logger.info(f"diet::{key}: {value}")

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}

# Load vCon from Redis using JSON.GET
vcon = redis.json().get(f"vcon:{vcon_uuid}")
if not vcon:
Expand Down Expand Up @@ -57,9 +60,11 @@ def run(vcon_uuid, link_name, opts=default_options):
else:
dialog["body"] = ""
else:
increment_counter("conserver.link.diet.media_post_failures", attributes=attrs)
logger.error(f"Failed to post media: {response.status_code}")
dialog["body"] = ""
except Exception as e:
increment_counter("conserver.link.diet.media_post_failures", attributes=attrs)
logger.error(f"Exception posting media: {e}")
dialog["body"] = ""
else:
Expand All @@ -85,7 +90,7 @@ def run(vcon_uuid, link_name, opts=default_options):
# Save the modified vCon back to Redis using JSON.SET
redis.json().set(f"vcon:{vcon_uuid}", "$", vcon)
logger.info(f"Successfully applied diet to vCon {vcon_uuid}")

return vcon_uuid

def remove_system_prompts_recursive(obj):
Expand Down
10 changes: 6 additions & 4 deletions server/links/groq_whisper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def run(
vCon.uuid)
continue

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}
try:
# Attempt transcription with timing metrics
start = time.time()
Expand All @@ -329,25 +330,26 @@ def run(
result = transcribe_groq_whisper(dialog, opts)
record_histogram(
"conserver.link.groq_whisper.transcription_time",
time.time() - start
time.time() - start,
attributes=attrs,
)
except RetryError as re:
logger.error(
"Failed to transcribe vCon %s after multiple retry attempts: %s",
vcon_uuid, re)
increment_counter("conserver.link.groq_whisper.transcription_failures")
increment_counter("conserver.link.groq_whisper.transcription_failures", attributes=attrs)
break
except Exception as e:
logger.error(
"Unexpected error transcribing vCon %s: %s",
vcon_uuid, e)
increment_counter("conserver.link.groq_whisper.transcription_failures")
increment_counter("conserver.link.groq_whisper.transcription_failures", attributes=attrs)
break

if not result:
logger.warning("No transcription generated for vCon %s", vcon_uuid)
increment_counter(
"conserver.link.groq_whisper.transcription_failures")
"conserver.link.groq_whisper.transcription_failures", attributes=attrs)
break

logger.info("Transcribed vCon: %s", vCon.uuid)
Expand Down
7 changes: 4 additions & 3 deletions server/links/hugging_face_whisper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,21 @@ def run(
logger.info("Dialog %s already transcribed on vCon: %s", index, vCon.uuid)
continue

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}
try:
# Attempt transcription with timing metrics
start = time.time()
logger.debug("Transcribing dialog %s in vCon: %s", index, vCon.uuid)
result = transcribe_hugging_face_whisper(dialog, opts)
record_histogram("conserver.link.hugging_face_whisper.transcription_time", time.time() - start)
record_histogram("conserver.link.hugging_face_whisper.transcription_time", time.time() - start, attributes=attrs)
except (RetryError, Exception) as e:
logger.error("Failed to transcribe vCon %s after multiple retries: %s", vcon_uuid, e)
increment_counter("conserver.link.hugging_face_whisper.transcription_failures")
increment_counter("conserver.link.hugging_face_whisper.transcription_failures", attributes=attrs)
break

if not result:
logger.warning("No transcription generated for vCon %s", vcon_uuid)
increment_counter("conserver.link.hugging_face_whisper.transcription_failures")
increment_counter("conserver.link.hugging_face_whisper.transcription_failures", attributes=attrs)
break

logger.info("Transcribed vCon: %s", vCon.uuid)
Expand Down
5 changes: 3 additions & 2 deletions server/links/hugging_llm_link/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,14 @@ def process_vcon(self, vcon_uuid: str, link_name: str) -> str:
logger.info("No transcript found in vCon: %s", vcon_uuid)
return vcon_uuid

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}
try:
start = time.time()
result = self.llm.analyze(transcript_text)
record_histogram("conserver.link.huggingface.llm_time", time.time() - start)
record_histogram("conserver.link.huggingface.llm_time", time.time() - start, attributes=attrs)
except (RetryError, Exception) as e:
logger.error("Failed to analyze vCon %s: %s", vcon_uuid, str(e))
increment_counter("conserver.link.huggingface.llm_failures")
increment_counter("conserver.link.huggingface.llm_failures", attributes=attrs)
return vcon_uuid

self._add_analysis_to_vcon(vcon, result)
Expand Down
10 changes: 7 additions & 3 deletions server/links/jq_link/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from lib.logging_utils import init_logger
from lib.vcon_redis import VconRedis
from lib.metrics import increment_counter
import jq

logger = init_logger(__name__)
Expand Down Expand Up @@ -41,33 +42,36 @@ def run(vcon_uuid, link_name, opts=default_options):

# Convert vCon to dict for jq
vcon_dict = vcon.to_dict()
attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}

try:
# Apply the jq filter
# Compile and run the jq program
logger.debug(f"Applying jq filter '{opts['filter']}' to vCon {vcon_uuid}")
program = jq.compile(opts["filter"])
results = list(program.input(vcon_dict))

# Handle empty results
if not results:
logger.debug(f"JQ filter returned no results for vCon {vcon_uuid}")
matches = False
else:
matches = bool(results[0])

logger.debug(f"JQ filter results: {results}")
except Exception as e:
increment_counter("conserver.link.jq.filter_errors", attributes=attrs)
logger.error(f"Error applying jq filter '{opts['filter']}' to vCon {vcon_uuid}: {e}")
logger.debug(f"vCon content: {vcon_dict}")
return None

# Forward based on matches and forward_matches setting
should_forward = matches == opts["forward_matches"]

if should_forward:
logger.info(f"vCon {vcon_uuid} {'' if matches else 'did not '}match filter - forwarding")
return vcon_uuid
else:
increment_counter("conserver.link.jq.vcon_filtered_out", attributes=attrs)
logger.info(f"vCon {vcon_uuid} {'' if matches else 'did not '}match filter - filtering out")
return None
Loading
Loading