Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/examples-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:

# Haystack examples
- { path: 'examples/haystack/haystack_example.py', name: 'Haystack OpenAI' }
- { path: 'examples/haystack/azure_haystack_example.py', name: 'Haystack Azure Chat' }
# Add more examples as needed


Expand Down Expand Up @@ -192,4 +193,4 @@ jobs:
echo "βœ… All examples passed!" >> $GITHUB_STEP_SUMMARY
else
echo "❌ Some examples failed. Check the logs above." >> $GITHUB_STEP_SUMMARY
fi
fi
6 changes: 6 additions & 0 deletions agentops/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ class InstrumentorConfig(TypedDict):
"min_version": "1.0.0",
"package_name": "xpander-sdk",
},
"haystack": {
"module_name": "agentops.instrumentation.agentic.haystack",
"class_name": "HaystackInstrumentor",
"min_version": "2.0.0",
"package_name": "haystack-ai",
},
}

# Combine all target packages for monitoring
Expand Down
1 change: 1 addition & 0 deletions agentops/instrumentation/agentic/haystack/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .instrumentor import HaystackInstrumentor # noqa: F401
176 changes: 176 additions & 0 deletions agentops/instrumentation/agentic/haystack/instrumentor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
from typing import Any, Dict
from opentelemetry.trace import SpanKind
from opentelemetry.instrumentation.utils import unwrap
from wrapt import wrap_function_wrapper

from agentops.instrumentation.common import (
CommonInstrumentor,
InstrumentorConfig,
StandardMetrics,
create_wrapper_factory,
create_span,
SpanAttributeManager,
)
from agentops.semconv import SpanAttributes


_instruments = ("haystack-ai >= 2.0.0",)


class HaystackInstrumentor(CommonInstrumentor):
def __init__(self):
config = InstrumentorConfig(
library_name="haystack",
library_version="2",
wrapped_methods=[],
metrics_enabled=False,
dependencies=_instruments,
)
super().__init__(config)
self._attribute_manager = None

def _initialize(self, **kwargs):
application_name = kwargs.get("application_name", "default_application")
environment = kwargs.get("environment", "default_environment")
self._attribute_manager = SpanAttributeManager(service_name=application_name, deployment_environment=environment)

def _create_metrics(self, meter) -> Dict[str, Any]:
return StandardMetrics.create_standard_metrics(meter)

def _custom_wrap(self, **kwargs):
attr_manager = self._attribute_manager

wrap_function_wrapper(
"haystack.components.generators.openai",
"OpenAIGenerator.run",
create_wrapper_factory(_wrap_haystack_run_impl, self._metrics, attr_manager)(self._tracer),
)

wrap_function_wrapper(
"haystack.components.generators.chat",
"AzureOpenAIChatGenerator.run",
create_wrapper_factory(_wrap_haystack_run_impl, self._metrics, attr_manager)(self._tracer),
)

try:
wrap_function_wrapper(
"haystack.components.generators.openai",
"OpenAIGenerator.stream",
create_wrapper_factory(_wrap_haystack_stream_impl, self._metrics, attr_manager)(self._tracer),
)
except Exception:
pass

try:
wrap_function_wrapper(
"haystack.components.generators.chat",
"AzureOpenAIChatGenerator.stream",
create_wrapper_factory(_wrap_haystack_stream_impl, self._metrics, attr_manager)(self._tracer),
)
except Exception:
pass

def _custom_unwrap(self, **kwargs):
unwrap("haystack.components.generators.openai", "OpenAIGenerator.run")
unwrap("haystack.components.generators.chat", "AzureOpenAIChatGenerator.run")
try:
unwrap("haystack.components.generators.openai", "OpenAIGenerator.stream")
except Exception:
pass
try:
unwrap("haystack.components.generators.chat", "AzureOpenAIChatGenerator.stream")
except Exception:
pass


def _first_non_empty_text(value):
if isinstance(value, list) and value:
return _first_non_empty_text(value[0])
if isinstance(value, dict):
if "content" in value:
return str(value["content"])
if "text" in value:
return str(value["text"])
if "replies" in value and value["replies"]:
return str(value["replies"][0])
if value is None:
return None
return str(value)


def _extract_prompt(args, kwargs):
if "prompt" in kwargs:
return kwargs.get("prompt")
if "messages" in kwargs:
return kwargs.get("messages")
if args:
return args[0]
return None


def _get_model_name(instance):
for attr in ("model", "model_name", "deployment_name", "deployment"):
if hasattr(instance, attr):
val = getattr(instance, attr)
if val:
return str(val)
return None


def _wrap_haystack_run_impl(tracer, metrics, attr_manager, wrapped, instance, args, kwargs):
model = _get_model_name(instance)
with create_span(
tracer,
"haystack.generator.run",
kind=SpanKind.CLIENT,
attributes={SpanAttributes.LLM_SYSTEM: "haystack", "gen_ai.model": model, SpanAttributes.LLM_REQUEST_STREAMING: False},
attribute_manager=attr_manager,
) as span:
prompt = _extract_prompt(args, kwargs)
prompt_text = _first_non_empty_text(prompt)
if prompt_text:
span.set_attribute("gen_ai.prompt.0.content", prompt_text[:500])

result = wrapped(*args, **kwargs)

reply_text = None
if isinstance(result, dict):
reply_text = _first_non_empty_text(result.get("replies"))
if not reply_text:
reply_text = _first_non_empty_text(result)
else:
reply_text = _first_non_empty_text(result)

if reply_text:
span.set_attribute("gen_ai.response.0.content", str(reply_text)[:500])

return result


def _wrap_haystack_stream_impl(tracer, metrics, attr_manager, wrapped, instance, args, kwargs):
model = _get_model_name(instance)
with create_span(
tracer,
"haystack.generator.stream",
kind=SpanKind.CLIENT,
attributes={SpanAttributes.LLM_SYSTEM: "haystack", "gen_ai.model": model, SpanAttributes.LLM_REQUEST_STREAMING: True},
attribute_manager=attr_manager,
) as span:
prompt = _extract_prompt(args, kwargs)
prompt_text = _first_non_empty_text(prompt)
if prompt_text:
span.set_attribute("gen_ai.prompt.0.content", prompt_text[:500])

out = wrapped(*args, **kwargs)

try:
chunk_count = 0
for chunk in out:
chunk_count += 1
last_text = _first_non_empty_text(chunk)
if last_text:
span.set_attribute("gen_ai.response.0.content", str(last_text)[:500])
yield chunk
span.set_attribute("gen_ai.response.chunk_count", chunk_count)
except TypeError:
return out
9 changes: 7 additions & 2 deletions docs/v1/integrations/haystack.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,17 @@ AgentOps makes monitoring your Haystack agents seamless. Haystack, much like Aut
</Step>
</Steps>

## Supported generators

- OpenAI: `haystack.components.generators.openai.OpenAIGenerator`
- Azure OpenAI Chat: `haystack.components.generators.chat.AzureOpenAIChatGenerator`

## Full Examples

You can refer to the following example -
You can refer to the following examples -

- [Simple Haystack example (OpenAI)](https://github.com/AgentOps-AI/agentops/blob/main/examples/haystack/haystack_example.py)

- [Haystack Azure OpenAI Chat example](https://github.com/AgentOps-AI/agentops/blob/main/examples/haystack/azure_haystack_example.py)

<script type="module" src="/scripts/github_stars.js"></script>
<script type="module" src="/scripts/scroll-img-fadein-animation.js"></script>
Expand Down
47 changes: 47 additions & 0 deletions examples/haystack/azure_haystack_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os

import agentops
from haystack.components.generators.chat import AzureOpenAIChatGenerator


def main():
agentops.init(os.getenv("AGENTOPS_API_KEY"))

if not os.getenv("AZURE_OPENAI_API_KEY") or not os.getenv("AZURE_OPENAI_ENDPOINT"):
print("Skipping Azure example: missing AZURE_OPENAI_API_KEY or AZURE_OPENAI_ENDPOINT (CI-safe skip)")
return

tracer = agentops.start_trace(
trace_name="Haystack Azure Chat Example",
tags=["haystack", "azure", "chat", "agentops-example"],
)

api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-06-01")
deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini")

generator = AzureOpenAIChatGenerator(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=api_version,
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
deployment_name=deployment,
)

messages = [{"role": "user", "content": "In one sentence, what is AgentOps?"}]
result = generator.run(messages=messages)
replies = result.get("replies") or []
print("Haystack Azure reply:", replies[0] if replies else "<no reply>")

print("\n" + "=" * 50)
print("Now let's verify that our LLM calls were tracked properly...")
try:
validation_result = agentops.validate_trace_spans(trace_context=tracer)
agentops.print_validation_summary(validation_result)
except agentops.ValidationError as e:
print(f"\n❌ Error validating spans: {e}")
raise

agentops.end_trace(tracer, end_state="Success")


if __name__ == "__main__":
main()
6 changes: 5 additions & 1 deletion examples/haystack/haystack_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
def main():
agentops.init(os.getenv("AGENTOPS_API_KEY"))

if not os.getenv("OPENAI_API_KEY"):
print("Skipping OpenAI example: missing OPENAI_API_KEY")
return

tracer = agentops.start_trace(
trace_name="Haystack OpenAI Example",
tags=["haystack", "openai", "agentops-example"],
Expand All @@ -19,7 +23,7 @@ def main():
print("Haystack reply:", replies[0] if replies else "<no reply>")

print("\n" + "=" * 50)
print("Now let's verify that our LLM calls were tracked properly...")
print("Now let's verify that our LLM calls were tracked properly with AgentOps...")
try:
validation_result = agentops.validate_trace_spans(trace_context=tracer)
agentops.print_validation_summary(validation_result)
Expand Down
2 changes: 1 addition & 1 deletion examples/haystack/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
haystack-ai>=2.0.0
openai>=1.0.0
openai>=1.102.0
Loading