diff --git a/.github/workflows/examples-integration-test.yml b/.github/workflows/examples-integration-test.yml index e16e878f5..f6e057a90 100644 --- a/.github/workflows/examples-integration-test.yml +++ b/.github/workflows/examples-integration-test.yml @@ -108,6 +108,7 @@ jobs: # Haystack examples - { path: 'examples/haystack/haystack_example.py', name: 'Haystack OpenAI' } + - { path: 'examples/haystack/azure_haystack_example.py', name: 'Haystack Azure Chat' } # Add more examples as needed @@ -192,4 +193,4 @@ jobs: echo "āœ… All examples passed!" >> $GITHUB_STEP_SUMMARY else echo "āŒ Some examples failed. Check the logs above." >> $GITHUB_STEP_SUMMARY - fi \ No newline at end of file + fi \ No newline at end of file diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index e47b6e7fb..9f1b6a3c8 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -118,6 +118,12 @@ class InstrumentorConfig(TypedDict): "min_version": "1.0.0", "package_name": "xpander-sdk", }, + "haystack": { + "module_name": "agentops.instrumentation.agentic.haystack", + "class_name": "HaystackInstrumentor", + "min_version": "2.0.0", + "package_name": "haystack-ai", + }, } # Combine all target packages for monitoring diff --git a/agentops/instrumentation/agentic/haystack/__init__.py b/agentops/instrumentation/agentic/haystack/__init__.py new file mode 100644 index 000000000..43be3b98c --- /dev/null +++ b/agentops/instrumentation/agentic/haystack/__init__.py @@ -0,0 +1 @@ +from .instrumentor import HaystackInstrumentor # noqa: F401 diff --git a/agentops/instrumentation/agentic/haystack/instrumentor.py b/agentops/instrumentation/agentic/haystack/instrumentor.py new file mode 100644 index 000000000..4e371c09d --- /dev/null +++ b/agentops/instrumentation/agentic/haystack/instrumentor.py @@ -0,0 +1,176 @@ +from typing import Any, Dict +from opentelemetry.trace import SpanKind +from opentelemetry.instrumentation.utils import unwrap +from wrapt import wrap_function_wrapper + +from agentops.instrumentation.common import ( + CommonInstrumentor, + InstrumentorConfig, + StandardMetrics, + create_wrapper_factory, + create_span, + SpanAttributeManager, +) +from agentops.semconv import SpanAttributes + + +_instruments = ("haystack-ai >= 2.0.0",) + + +class HaystackInstrumentor(CommonInstrumentor): + def __init__(self): + config = InstrumentorConfig( + library_name="haystack", + library_version="2", + wrapped_methods=[], + metrics_enabled=False, + dependencies=_instruments, + ) + super().__init__(config) + self._attribute_manager = None + + def _initialize(self, **kwargs): + application_name = kwargs.get("application_name", "default_application") + environment = kwargs.get("environment", "default_environment") + self._attribute_manager = SpanAttributeManager(service_name=application_name, deployment_environment=environment) + + def _create_metrics(self, meter) -> Dict[str, Any]: + return StandardMetrics.create_standard_metrics(meter) + + def _custom_wrap(self, **kwargs): + attr_manager = self._attribute_manager + + wrap_function_wrapper( + "haystack.components.generators.openai", + "OpenAIGenerator.run", + create_wrapper_factory(_wrap_haystack_run_impl, self._metrics, attr_manager)(self._tracer), + ) + + wrap_function_wrapper( + "haystack.components.generators.chat", + "AzureOpenAIChatGenerator.run", + create_wrapper_factory(_wrap_haystack_run_impl, self._metrics, attr_manager)(self._tracer), + ) + + try: + wrap_function_wrapper( + "haystack.components.generators.openai", + "OpenAIGenerator.stream", + create_wrapper_factory(_wrap_haystack_stream_impl, self._metrics, attr_manager)(self._tracer), + ) + except Exception: + pass + + try: + wrap_function_wrapper( + "haystack.components.generators.chat", + "AzureOpenAIChatGenerator.stream", + create_wrapper_factory(_wrap_haystack_stream_impl, self._metrics, attr_manager)(self._tracer), + ) + except Exception: + pass + + def _custom_unwrap(self, **kwargs): + unwrap("haystack.components.generators.openai", "OpenAIGenerator.run") + unwrap("haystack.components.generators.chat", "AzureOpenAIChatGenerator.run") + try: + unwrap("haystack.components.generators.openai", "OpenAIGenerator.stream") + except Exception: + pass + try: + unwrap("haystack.components.generators.chat", "AzureOpenAIChatGenerator.stream") + except Exception: + pass + + +def _first_non_empty_text(value): + if isinstance(value, list) and value: + return _first_non_empty_text(value[0]) + if isinstance(value, dict): + if "content" in value: + return str(value["content"]) + if "text" in value: + return str(value["text"]) + if "replies" in value and value["replies"]: + return str(value["replies"][0]) + if value is None: + return None + return str(value) + + +def _extract_prompt(args, kwargs): + if "prompt" in kwargs: + return kwargs.get("prompt") + if "messages" in kwargs: + return kwargs.get("messages") + if args: + return args[0] + return None + + +def _get_model_name(instance): + for attr in ("model", "model_name", "deployment_name", "deployment"): + if hasattr(instance, attr): + val = getattr(instance, attr) + if val: + return str(val) + return None + + +def _wrap_haystack_run_impl(tracer, metrics, attr_manager, wrapped, instance, args, kwargs): + model = _get_model_name(instance) + with create_span( + tracer, + "haystack.generator.run", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_SYSTEM: "haystack", "gen_ai.model": model, SpanAttributes.LLM_REQUEST_STREAMING: False}, + attribute_manager=attr_manager, + ) as span: + prompt = _extract_prompt(args, kwargs) + prompt_text = _first_non_empty_text(prompt) + if prompt_text: + span.set_attribute("gen_ai.prompt.0.content", prompt_text[:500]) + + result = wrapped(*args, **kwargs) + + reply_text = None + if isinstance(result, dict): + reply_text = _first_non_empty_text(result.get("replies")) + if not reply_text: + reply_text = _first_non_empty_text(result) + else: + reply_text = _first_non_empty_text(result) + + if reply_text: + span.set_attribute("gen_ai.response.0.content", str(reply_text)[:500]) + + return result + + +def _wrap_haystack_stream_impl(tracer, metrics, attr_manager, wrapped, instance, args, kwargs): + model = _get_model_name(instance) + with create_span( + tracer, + "haystack.generator.stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_SYSTEM: "haystack", "gen_ai.model": model, SpanAttributes.LLM_REQUEST_STREAMING: True}, + attribute_manager=attr_manager, + ) as span: + prompt = _extract_prompt(args, kwargs) + prompt_text = _first_non_empty_text(prompt) + if prompt_text: + span.set_attribute("gen_ai.prompt.0.content", prompt_text[:500]) + + out = wrapped(*args, **kwargs) + + try: + chunk_count = 0 + for chunk in out: + chunk_count += 1 + last_text = _first_non_empty_text(chunk) + if last_text: + span.set_attribute("gen_ai.response.0.content", str(last_text)[:500]) + yield chunk + span.set_attribute("gen_ai.response.chunk_count", chunk_count) + except TypeError: + return out diff --git a/docs/v1/integrations/haystack.mdx b/docs/v1/integrations/haystack.mdx index 9b570541a..1b5d3c29d 100644 --- a/docs/v1/integrations/haystack.mdx +++ b/docs/v1/integrations/haystack.mdx @@ -67,12 +67,17 @@ AgentOps makes monitoring your Haystack agents seamless. Haystack, much like Aut +## Supported generators + +- OpenAI: `haystack.components.generators.openai.OpenAIGenerator` +- Azure OpenAI Chat: `haystack.components.generators.chat.AzureOpenAIChatGenerator` + ## Full Examples -You can refer to the following example - +You can refer to the following examples - - [Simple Haystack example (OpenAI)](https://github.com/AgentOps-AI/agentops/blob/main/examples/haystack/haystack_example.py) - +- [Haystack Azure OpenAI Chat example](https://github.com/AgentOps-AI/agentops/blob/main/examples/haystack/azure_haystack_example.py) diff --git a/examples/haystack/azure_haystack_example.py b/examples/haystack/azure_haystack_example.py new file mode 100644 index 000000000..a5e3b17d1 --- /dev/null +++ b/examples/haystack/azure_haystack_example.py @@ -0,0 +1,47 @@ +import os + +import agentops +from haystack.components.generators.chat import AzureOpenAIChatGenerator + + +def main(): + agentops.init(os.getenv("AGENTOPS_API_KEY")) + + if not os.getenv("AZURE_OPENAI_API_KEY") or not os.getenv("AZURE_OPENAI_ENDPOINT"): + print("Skipping Azure example: missing AZURE_OPENAI_API_KEY or AZURE_OPENAI_ENDPOINT (CI-safe skip)") + return + + tracer = agentops.start_trace( + trace_name="Haystack Azure Chat Example", + tags=["haystack", "azure", "chat", "agentops-example"], + ) + + api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-06-01") + deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini") + + generator = AzureOpenAIChatGenerator( + api_key=os.getenv("AZURE_OPENAI_API_KEY"), + api_version=api_version, + azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), + deployment_name=deployment, + ) + + messages = [{"role": "user", "content": "In one sentence, what is AgentOps?"}] + result = generator.run(messages=messages) + replies = result.get("replies") or [] + print("Haystack Azure reply:", replies[0] if replies else "") + + print("\n" + "=" * 50) + print("Now let's verify that our LLM calls were tracked properly...") + try: + validation_result = agentops.validate_trace_spans(trace_context=tracer) + agentops.print_validation_summary(validation_result) + except agentops.ValidationError as e: + print(f"\nāŒ Error validating spans: {e}") + raise + + agentops.end_trace(tracer, end_state="Success") + + +if __name__ == "__main__": + main() diff --git a/examples/haystack/haystack_example.py b/examples/haystack/haystack_example.py index 541ab2dd8..6dbd39d7a 100644 --- a/examples/haystack/haystack_example.py +++ b/examples/haystack/haystack_example.py @@ -7,6 +7,10 @@ def main(): agentops.init(os.getenv("AGENTOPS_API_KEY")) + if not os.getenv("OPENAI_API_KEY"): + print("Skipping OpenAI example: missing OPENAI_API_KEY") + return + tracer = agentops.start_trace( trace_name="Haystack OpenAI Example", tags=["haystack", "openai", "agentops-example"], @@ -19,7 +23,7 @@ def main(): print("Haystack reply:", replies[0] if replies else "") print("\n" + "=" * 50) - print("Now let's verify that our LLM calls were tracked properly...") + print("Now let's verify that our LLM calls were tracked properly with AgentOps...") try: validation_result = agentops.validate_trace_spans(trace_context=tracer) agentops.print_validation_summary(validation_result) diff --git a/examples/haystack/requirements.txt b/examples/haystack/requirements.txt index dc4b30512..8a6be0ce6 100644 --- a/examples/haystack/requirements.txt +++ b/examples/haystack/requirements.txt @@ -1,2 +1,2 @@ haystack-ai>=2.0.0 -openai>=1.0.0 +openai>=1.102.0