From 355d5f08c115074d06b45adcd2042d60e31826fe Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Tue, 7 Apr 2026 00:16:28 -0400 Subject: [PATCH 01/15] add obs handler --- CLAUDE.md | 72 +++++++++++++++++++++ playground/obshandler.py | 135 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 CLAUDE.md create mode 100644 playground/obshandler.py diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000..5df04fa5 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,72 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +Effectful is an algebraic effect system library for Python, providing algebraic effect handlers for metaprogramming and DSL implementation. It integrates with PyTorch, Pyro, JAX, and NumPyro for tensor operations and probabilistic programming. The core library has zero dependencies; all integrations are optional extras. + +## Common Commands + +### Development Setup +```bash +uv sync --all-extras --dev # Install all dependencies +``` + +### Testing +```bash +make test # Lint + all tests +pytest effectful/ tests/ -n auto # All tests (parallel) +pytest tests/test_ops_syntax.py -v # Single test file +pytest -k test_name -v # Single test by name +``` + +CI runs tests in groups: `core`, `indexed`, `torch`, `pyro`, `jax`, `numpyro`, `llm`, `examples`. To run a specific group locally: +```bash +pytest tests/test_handlers_torch*.py -n auto +pytest effectful/ tests/test_ops_*.py tests/test_internals_*.py -n auto # core group +``` + +Note: `--doctest-modules` is enabled by default via pyproject.toml. + +### Linting & Formatting +```bash +make lint # mypy + ruff check + ruff format --diff + nbqa on docs +make format # Auto-fix: ruff --fix + ruff format +``` + +Ruff rules: `F`, `I`, `PERF`, `UP` (target Python 3.12). + +### LLM Test Fixtures +```bash +make rebuild-fixtures # Rebuilds LLM test fixtures (needs API keys) +``` + +## Architecture + +### Core (`effectful/ops/`) + +- **`types.py`** — Foundational types: `Operation[**Q, V]` (abstract effect), `Term[T]` (unevaluated operation application), `Interpretation` (handler dict mapping operations to functions), `NotHandled` exception. +- **`syntax.py`** — DSL for defining operations: `defop()`, `defdata()`, `deffn()`, `defstream()`. Includes `Scoped` annotation for variable binding semantics and `syntactic_eq()` for structural equality. +- **`semantics.py`** — Handler runtime: `handler()` context manager installs interpretations, `evaluate()` fully reduces expressions, `fwd()` delegates to parent handler, `coproduct()`/`product()` compose interpretations, `fvsof()` finds free variables. + +### Handlers (`effectful/handlers/`) + +Each handler implements interpretations for a specific library's operations: + +- **`indexed.py`** — Named dimension indexing +- **`torch.py`** — PyTorch tensor operations with named dimensions +- **`pyro.py`** — Pyro probabilistic programming +- **`numpyro.py`** — NumPyro distributions (depends on JAX) +- **`jax/`** — JAX operations with numpy/scipy wrappers +- **`llm/`** — LLM integration (experimental): encoding Python→prompts, evaluating LLM outputs→Python, completions, templates + +### Internals (`effectful/internals/`) + +- **`unification.py`** — Pattern matching and unification algorithm (used by handler dispatch) +- **`runtime.py`** — Handler execution context (interpretation stack) +- **`product_n.py`** — N-ary product types + +### Key Pattern + +Operations are defined with `defop()`, producing callable objects. When no handler is installed, calling an operation creates a `Term` (lazy/symbolic). Handlers are installed via `handler()` context manager, providing concrete implementations. Multiple handlers compose via `coproduct()` (union) or `product()` (safe merge). The `fwd()` function inside a handler delegates to the next handler in the stack. diff --git a/playground/obshandler.py b/playground/obshandler.py new file mode 100644 index 00000000..98852339 --- /dev/null +++ b/playground/obshandler.py @@ -0,0 +1,135 @@ +"""Custom effect handlers for the code agent. + +These compose with effectful's handler system to add logging, token budgets, +tool confirmation, and other cross-cutting concerns without modifying agent code. + +Memory is modeled as algebraic effects: operations define the interface, +FileMemoryHandler provides file-backed implementation. Swap handlers for +testing, DB-backed storage, or remote memory. +""" + +import random +from typing import Callable, Any +from effectful.handlers.llm import Tool, Template +from effectful.ops.types import NotHandled +from effectful.handlers.llm.completions import completion +from effectful.ops.semantics import fwd, coproduct, handler +from effectful.ops.syntax import ObjectInterpretation, implements +from effectful.handlers.llm.completions import ( + LiteLLMProvider, +) + + +class ObservabilityHandler(ObjectInterpretation): + """Tracks the call stack of :class:`Tool` and invokes a callback + function on the callstack paired with raw completion responses""" + + def __init__[**P,T](self, resp_fn : Callable[[Any, Tool[P,T]],None]): + self.callstack = [] + self.resp_fn = resp_fn + + @implements(completion) + def _observe_completion(self, *args, **kwargs) -> Any: + print("calling completion") + model = kwargs.get("model", args[0] if args else "unknown") + old_stack = self.callstack.copy() + response = fwd(*args, **kwargs) + self.resp_fn(old_stack,response) + return response + + @implements(Tool.__apply__) + def _call_tool[**P,T]( + self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs + ) -> T: + print("calling _call_tool") + try: + # print(f"adding {tool} to {self.callstack}") + self.callstack.append(tool) + response = fwd(tool,*args,**kwargs) + finally: + # print(f"popping tool from {self.callstack}") + self.callstack.pop() + # print(f"and now we have {self.callstack}") + + + return response + + @implements(Template.__apply__) + def _call_template[**P,T]( + self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs + ) -> T: + print("calling _call_template") + try: + print(f"adding {tool} to {self.callstack}") + self.callstack.append(tool) + response = fwd(tool,*args,**kwargs) + finally: + # print(f"popping tool from {self.callstack}") + self.callstack.pop() + # print(f"and now we have {self.callstack}") + + + return response + + +# provider = LiteLLMProvider( +# model='gpt-5.4' +# ) +provider = LiteLLMProvider( + model='openai/gemma', + api_key='', + api_base='http://127.0.0.1:8080/', + temperature=1.0, + top_p=0.95, + top_k=64 +) + + +obsprovider = ObservabilityHandler( + resp_fn = lambda st,resp: print(f"{st.}>{resp}") +) + +themes = ['zombies', 'the universe', 'exorcism'] + +# @Tool.define +# def random_theme() -> str: +# """Don't take any argument. Return a randomly picked theme.""" +# return random.choice(list(get_themes())) + +# @Tool.define +# def get_themes() -> list[str]: +# """Don't take any argument. Return a list of 3 strings, each of which represents a movie genre.""" +# raise NotHandled + + +# @Template.define +# def twosentencehorror() -> str: +# """Write a two-sentence horror story on a theme picked by the tool `random_theme`.""" +# raise NotHandled + +# @Template.define +# def pick_fruit() -> str: +# """Return the name of a fruit.""" +# raise NotHandled + +@Template.define +def find_treasure() -> str: + """Call the tool `bob` to find where the treasure is.""" + raise NotHandled + +@Template.define +def bob() -> str: + """Call the tool `alice` to find where the treasure is.""" + raise NotHandled + + +@Tool.define +def alice() -> str: + """Returns where the treasure is.""" + return "hell" + +combined_provider = coproduct(provider, obsprovider) + +def test_handler(provider): + with handler(provider): + print(find_treasure()) From d7fa4d83d8b17486103c62c0e9de27ec46ed973b Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Tue, 7 Apr 2026 16:35:11 -0400 Subject: [PATCH 02/15] Add demo completion logger --- playground/obshandler.py | 176 +++++++++++++++++++++++++-------------- 1 file changed, 114 insertions(+), 62 deletions(-) diff --git a/playground/obshandler.py b/playground/obshandler.py index 98852339..268410f8 100644 --- a/playground/obshandler.py +++ b/playground/obshandler.py @@ -1,15 +1,15 @@ -"""Custom effect handlers for the code agent. +"""Observability handler. -These compose with effectful's handler system to add logging, token budgets, -tool confirmation, and other cross-cutting concerns without modifying agent code. - -Memory is modeled as algebraic effects: operations define the interface, -FileMemoryHandler provides file-backed implementation. Swap handlers for -testing, DB-backed storage, or remote memory. +Composes with effectful's handler system to enable access to call +traces and raw completion outputs. The exact logging behavior is +specified by implementing a listener interface. """ +from abc import ABCMeta, abstractmethod import random -from typing import Callable, Any +from dataclasses import dataclass +from typing import Callable, Any, cast, override + from effectful.handlers.llm import Tool, Template from effectful.ops.types import NotHandled from effectful.handlers.llm.completions import completion @@ -19,57 +19,132 @@ LiteLLMProvider, ) +from time import time + + +# How does mro work? Should I call super? +class ObservabilityListener(metaclass=ABCMeta): + def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: + pass + + def exit_tool_call[**P,Q](self, tool: Tool[P,Q], result: Q | None) -> None: + pass + + def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: + pass + + def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: + pass + + def enter_completion(self) -> None: + pass + + def exit_completion(self, resp: Any) -> None: + pass + + +class EmptyCallStackException(Exception): + pass + +class CallStackListener(ObservabilityListener): + def __init__(self): + self.callstack: list[Any] = [] + + @override + def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: + self.callstack.append(tool) + + @override + def exit_tool_call[**P,Q](self, tool: Tool[P, Q], result: Q | None) -> None: + self.callstack.pop() + + @override + def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: + self.callstack.append(template) + + @override + def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: + assert(len(self.callstack)>0) + self.callstack.pop() + + def current_function(self) -> Any: + try: + return self.callstack[-1] + except IndexError: + raise EmptyCallStackException() + + def current_template(self) -> Any: + try: + return next(func for func in reversed(self.callstack) if + isinstance(func, Template)) + except IndexError: + raise EmptyCallStackException() + +class CompletionLogger(CallStackListener): + def __init__(self): + super().__init__() + self.timestack = [] + + @override + def enter_completion(self): + super().enter_completion() + print(f"{self.current_template()} is calling completion") + self.timestack.append(time()) + + @override + def exit_completion(self, resp: Any) -> None: + time_elapsed = time() - self.timestack[-1] + self.timestack.pop() + print(f"Completion for {self.current_template()} finished in {time_elapsed:.2f} seconds") + super().exit_completion(resp) + + class ObservabilityHandler(ObjectInterpretation): """Tracks the call stack of :class:`Tool` and invokes a callback function on the callstack paired with raw completion responses""" - def __init__[**P,T](self, resp_fn : Callable[[Any, Tool[P,T]],None]): - self.callstack = [] - self.resp_fn = resp_fn + def __init__[**P,T](self, listener: ObservabilityListener): + self.listener = listener @implements(completion) def _observe_completion(self, *args, **kwargs) -> Any: - print("calling completion") model = kwargs.get("model", args[0] if args else "unknown") - old_stack = self.callstack.copy() - response = fwd(*args, **kwargs) - self.resp_fn(old_stack,response) - return response + self.listener.enter_completion() + response: Any = None + try: + response = fwd(*args, **kwargs) + return response + finally: + self.listener.exit_completion(response) @implements(Tool.__apply__) def _call_tool[**P,T]( self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs ) -> T: - print("calling _call_tool") + result_opt: T | None = None try: - # print(f"adding {tool} to {self.callstack}") - self.callstack.append(tool) - response = fwd(tool,*args,**kwargs) + self.listener.enter_tool_call(tool) + result = cast(T, fwd(tool,*args,**kwargs)) + result_opt = result + return result finally: - # print(f"popping tool from {self.callstack}") - self.callstack.pop() - # print(f"and now we have {self.callstack}") + self.listener.exit_tool_call(tool, result_opt) - return response - @implements(Template.__apply__) def _call_template[**P,T]( - self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs + self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs ) -> T: - print("calling _call_template") + result_opt: T | None = None try: - print(f"adding {tool} to {self.callstack}") - self.callstack.append(tool) - response = fwd(tool,*args,**kwargs) + self.listener.enter_template_call(template) + result = cast(T, fwd(template,*args,**kwargs)) + result_opt = result + return result finally: - # print(f"popping tool from {self.callstack}") - self.callstack.pop() - # print(f"and now we have {self.callstack}") - + self.listener.exit_template_call(template, result_opt) - return response # provider = LiteLLMProvider( @@ -85,48 +160,25 @@ def _call_template[**P,T]( ) -obsprovider = ObservabilityHandler( - resp_fn = lambda st,resp: print(f"{st.}>{resp}") -) +obsprovider = ObservabilityHandler(listener=CompletionLogger()) themes = ['zombies', 'the universe', 'exorcism'] -# @Tool.define -# def random_theme() -> str: -# """Don't take any argument. Return a randomly picked theme.""" -# return random.choice(list(get_themes())) - -# @Tool.define -# def get_themes() -> list[str]: -# """Don't take any argument. Return a list of 3 strings, each of which represents a movie genre.""" -# raise NotHandled - - -# @Template.define -# def twosentencehorror() -> str: -# """Write a two-sentence horror story on a theme picked by the tool `random_theme`.""" -# raise NotHandled - -# @Template.define -# def pick_fruit() -> str: -# """Return the name of a fruit.""" -# raise NotHandled @Template.define def find_treasure() -> str: - """Call the tool `bob` to find where the treasure is.""" + """Ask Bob to find where the treasure is.""" raise NotHandled @Template.define def bob() -> str: - """Call the tool `alice` to find where the treasure is.""" + """Ask Alice to find where the treasure is.""" raise NotHandled - @Tool.define def alice() -> str: """Returns where the treasure is.""" - return "hell" + return "school" combined_provider = coproduct(provider, obsprovider) From e919ce05101c01ed6b30b2c2291f27ef283b322c Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Tue, 7 Apr 2026 23:52:05 -0400 Subject: [PATCH 03/15] fix bugs in the obs handler implementation --- playground/obshandler.py | 42 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/playground/obshandler.py b/playground/obshandler.py index 268410f8..ac38ff7c 100644 --- a/playground/obshandler.py +++ b/playground/obshandler.py @@ -23,7 +23,7 @@ # How does mro work? Should I call super? -class ObservabilityListener(metaclass=ABCMeta): +class ObservabilityListener: def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: pass @@ -52,20 +52,25 @@ def __init__(self): @override def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: + super().enter_tool_call(tool) self.callstack.append(tool) @override def exit_tool_call[**P,Q](self, tool: Tool[P, Q], result: Q | None) -> None: + assert len(self.callstack) > 0 and tool is self.callstack[-1] self.callstack.pop() + super().exit_tool_call(tool, result) @override def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: + super().enter_template_call(template) self.callstack.append(template) @override def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: - assert(len(self.callstack)>0) + assert len(self.callstack) > 0 and template is self.callstack[-1] self.callstack.pop() + super().exit_template_call(template, result) def current_function(self) -> Any: try: @@ -77,9 +82,38 @@ def current_template(self) -> Any: try: return next(func for func in reversed(self.callstack) if isinstance(func, Template)) - except IndexError: + except StopIteration: raise EmptyCallStackException() +@dataclass +class ThinkingRecord: + template: Any + reasoning_content: str | None + thinking_blocks: list[Any] | None + + +class ThinkingListener(CallStackListener): + def __init__(self): + super().__init__() + self.thinking_records: list[ThinkingRecord] = [] + + @override + def exit_completion(self, resp: Any) -> None: + if resp is not None: + message = resp.choices[0].message + reasoning_content = message.get("reasoning_content") + thinking_blocks = message.get("thinking_blocks") + if reasoning_content or thinking_blocks: + self.thinking_records.append( + ThinkingRecord( + template=self.current_template(), + reasoning_content=reasoning_content, + thinking_blocks=thinking_blocks, + ) + ) + super().exit_completion(resp) + + class CompletionLogger(CallStackListener): def __init__(self): super().__init__() @@ -104,7 +138,7 @@ class ObservabilityHandler(ObjectInterpretation): """Tracks the call stack of :class:`Tool` and invokes a callback function on the callstack paired with raw completion responses""" - def __init__[**P,T](self, listener: ObservabilityListener): + def __init__(self, listener: ObservabilityListener): self.listener = listener @implements(completion) From 9c9ab24104a4c96cceb0d328c17b2eb5d09a2734 Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 00:17:30 -0400 Subject: [PATCH 04/15] Add some test cases --- playground/obshandler.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/playground/obshandler.py b/playground/obshandler.py index ac38ff7c..6f2e001a 100644 --- a/playground/obshandler.py +++ b/playground/obshandler.py @@ -5,10 +5,8 @@ specified by implementing a listener interface. """ -from abc import ABCMeta, abstractmethod -import random from dataclasses import dataclass -from typing import Callable, Any, cast, override +from typing import Any, cast, override from effectful.handlers.llm import Tool, Template from effectful.ops.types import NotHandled @@ -194,7 +192,8 @@ def _call_template[**P,T]( ) -obsprovider = ObservabilityHandler(listener=CompletionLogger()) +listener= ThinkingListener() +obsprovider = ObservabilityHandler(listener) themes = ['zombies', 'the universe', 'exorcism'] @@ -214,8 +213,15 @@ def alice() -> str: """Returns where the treasure is.""" return "school" +@Template.define +def pick_fruit() -> str: + """Return the name of a fruit.""" + raise NotHandled + + combined_provider = coproduct(provider, obsprovider) def test_handler(provider): with handler(provider): + print(pick_fruit()) print(find_treasure()) From c14f1b86e9f80e3ed0b983613fb0371d2cc446cb Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 01:17:57 -0400 Subject: [PATCH 05/15] add terrible demo that uses multiple inheritance --- playground/obshandler.py | 75 +++++++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 31 deletions(-) diff --git a/playground/obshandler.py b/playground/obshandler.py index 6f2e001a..132b31c2 100644 --- a/playground/obshandler.py +++ b/playground/obshandler.py @@ -5,8 +5,9 @@ specified by implementing a listener interface. """ +from collections import defaultdict from dataclasses import dataclass -from typing import Any, cast, override +from typing import Any, Hashable, cast, override from effectful.handlers.llm import Tool, Template from effectful.ops.types import NotHandled @@ -16,11 +17,11 @@ from effectful.handlers.llm.completions import ( LiteLLMProvider, ) +from functools import reduce from time import time -# How does mro work? Should I call super? class ObservabilityListener: def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: pass @@ -111,30 +112,34 @@ def exit_completion(self, resp: Any) -> None: ) super().exit_completion(resp) +class ElapsedListener(CallStackListener): + """Tracks the elapsed time of each :class:`Tool` or + :class:`Template` call.""" -class CompletionLogger(CallStackListener): def __init__(self): super().__init__() self.timestack = [] + self.elapsed:defaultdict[Hashable,float] = defaultdict(float) @override def enter_completion(self): super().enter_completion() - print(f"{self.current_template()} is calling completion") self.timestack.append(time()) @override def exit_completion(self, resp: Any) -> None: time_elapsed = time() - self.timestack[-1] + self.elapsed[self.current_template()] += time_elapsed self.timestack.pop() - print(f"Completion for {self.current_template()} finished in {time_elapsed:.2f} seconds") super().exit_completion(resp) class ObservabilityHandler(ObjectInterpretation): - """Tracks the call stack of :class:`Tool` and invokes a callback - function on the callstack paired with raw completion responses""" + """Tracks the call stack of :class:`Tool` and :class:`Template` + and invokes a callback functions contained in an + :class:`ObservabilityListener` + """ def __init__(self, listener: ObservabilityListener): self.listener = listener @@ -152,7 +157,7 @@ def _observe_completion(self, *args, **kwargs) -> Any: @implements(Tool.__apply__) def _call_tool[**P,T]( - self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs + self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs ) -> T: result_opt: T | None = None try: @@ -166,7 +171,7 @@ def _call_tool[**P,T]( @implements(Template.__apply__) def _call_template[**P,T]( - self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs + self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs ) -> T: result_opt: T | None = None try: @@ -179,25 +184,6 @@ def _call_template[**P,T]( -# provider = LiteLLMProvider( -# model='gpt-5.4' -# ) -provider = LiteLLMProvider( - model='openai/gemma', - api_key='', - api_base='http://127.0.0.1:8080/', - temperature=1.0, - top_p=0.95, - top_k=64 -) - - -listener= ThinkingListener() -obsprovider = ObservabilityHandler(listener) - -themes = ['zombies', 'the universe', 'exorcism'] - - @Template.define def find_treasure() -> str: """Ask Bob to find where the treasure is.""" @@ -219,9 +205,36 @@ def pick_fruit() -> str: raise NotHandled -combined_provider = coproduct(provider, obsprovider) +class ThinkingElapsedListener(ThinkingListener, ElapsedListener): + def __init__(self): + super().__init__() -def test_handler(provider): - with handler(provider): + +def test_handler(): +# provider = LiteLLMProvider( +# model='anthropic/claude-sonnet-4-20250514', +# thinking={"type": "enabled", "budget_tokens": 1024} +# ) + provider = LiteLLMProvider( + model='openai/gemma', + api_key='', + api_base='http://127.0.0.1:8080/', + temperature=1.0, + top_p=0.95, + top_k=64 + ) + + listener = ThinkingElapsedListener() + obsprovider = ObservabilityHandler(listener) + + with handler(reduce(coproduct, [provider, obsprovider])): print(pick_fruit()) print(find_treasure()) + + print('----------------------------------------') + for thinking in listener.thinking_records: + print(thinking) + + print('----------------------------------------') + for func, time in listener.elapsed.items(): + print(f"{func}:{time:.2f}s") From 304bff44c54ed4ca78a2305dad4939ed8cb5897b Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 01:21:07 -0400 Subject: [PATCH 06/15] delete unused variable --- playground/obshandler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/playground/obshandler.py b/playground/obshandler.py index 132b31c2..fdbcc718 100644 --- a/playground/obshandler.py +++ b/playground/obshandler.py @@ -146,7 +146,6 @@ def __init__(self, listener: ObservabilityListener): @implements(completion) def _observe_completion(self, *args, **kwargs) -> Any: - model = kwargs.get("model", args[0] if args else "unknown") self.listener.enter_completion() response: Any = None try: From 59f746f0ca454186e99fae18a61781668169db63 Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 12:21:57 -0400 Subject: [PATCH 07/15] add everything to completions.py --- effectful/handlers/llm/completions.py | 205 ++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) diff --git a/effectful/handlers/llm/completions.py b/effectful/handlers/llm/completions.py index fc6ca47a..3649dbdc 100644 --- a/effectful/handlers/llm/completions.py +++ b/effectful/handlers/llm/completions.py @@ -6,6 +6,7 @@ import inspect import string import textwrap +import time import traceback import typing import uuid @@ -490,3 +491,207 @@ def _call[**P, T]( history.clear() history.update(history_copy) return typing.cast(T, result) + + +class ObservabilityListener(typing.Protocol): + """Interface for observing :class:`Tool`, :class:`Template`, and + completion call events. + + All methods are no-ops by default, allowing subclasses to + subscribe to only the events they care about. + """ + + def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: + # can't just `pass` because that would mark the method as abstract + return None + + def exit_tool_call[**P,Q](self, tool: Tool[P,Q], result: Q | None) -> None: + return None + + def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: + return None + + def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: + return None + + def enter_completion(self) -> None: + return None + + def exit_completion(self, resp: typing.Any) -> None: + return None + + +class ObservabilityHandler(ObjectInterpretation): + """Effect handler that wraps :class:`Tool`, :class:`Template`, and completion calls + and invokes callback functions registered in :attr:`listener`. + + Compose with a provider via :func:`coproduct` or nested :func:`handler` + context managers to add observability without modifying the provider:: + + listener = ThinkingElapsedListener() + obs = ObservabilityHandler(listener) + with handler(provider), handler(obs): + result = my_template() + """ + + def __init__(self, listener: ObservabilityListener): + self.listener = listener + + @implements(completion) + def _observe_completion(self, *args, **kwargs) -> typing.Any: + self.listener.enter_completion() + response: typing.Any = None + try: + response = fwd(*args, **kwargs) + return response + finally: + self.listener.exit_completion(response) + + @implements(Tool.__apply__) + def _call_tool[**P,T]( + self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs + ) -> T: + result_opt: T | None = None + try: + self.listener.enter_tool_call(tool) + result = typing.cast(T, fwd(tool,*args,**kwargs)) + result_opt = result + return result + finally: + self.listener.exit_tool_call(tool, result_opt) + + + @implements(Template.__apply__) + def _call_template[**P,T]( + self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs + ) -> T: + result_opt: T | None = None + try: + self.listener.enter_template_call(template) + result = typing.cast(T, fwd(template,*args,**kwargs)) + result_opt = result + return result + finally: + self.listener.exit_template_call(template, result_opt) + + +class EmptyCallStackException(Exception): + """Raised when accessing the call stack is empty.""" + pass + +class NoTemplateException(Exception): + """Raised when accessing the call stack does not have a :class:`Template`.""" + pass + +class CallStackListener(ObservabilityListener): + """Listener that maintains a call stack of active Tool and Template calls. + + The call stack can be accessed directly through :attr:`callstack`. + The methods :meth:`current_function` and :meth:`current_template` + are provided for convenience to access the function (including + both templates and tools) or template that is currently executing + (i.e. on top of the call stack). + """ + + def __init__(self) -> None: + self.callstack: list[Tool[...,typing.Any]] = [] + + @typing.override + def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: + super().enter_tool_call(tool) + self.callstack.append(tool) + + @typing.override + def exit_tool_call[**P,Q](self, tool: Tool[P, Q], result: Q | None) -> None: + assert len(self.callstack) > 0 and tool is self.callstack[-1] + self.callstack.pop() + super().exit_tool_call(tool, result) + + @typing.override + def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: + super().enter_template_call(template) + self.callstack.append(template) + + @typing.override + def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: + assert len(self.callstack) > 0 and template is self.callstack[-1] + self.callstack.pop() + super().exit_template_call(template, result) + + def current_function(self) -> typing.Any: + """Return the innermost active :class:`Tool` or :class:`Template`. + + :raises EmptyCallStackException: if the call stack is empty. + """ + try: + return self.callstack[-1] + except IndexError: + raise EmptyCallStackException() + + def current_template(self) -> typing.Any: + """Return the innermost active :class:`Template`, skipping any nested :class:`Tool`s. + + :raises NoTemplateException: if no :class:`Template` is on the stack. + """ + try: + return next(func for func in reversed(self.callstack) if + isinstance(func, Template)) + except StopIteration: + raise NoTemplateException() + +@dataclasses.dataclass +class ThinkingRecord: + """A single thinking/reasoning extraction paired with its source template.""" + template: Template[...,typing.Any] + reasoning_content: str | None + thinking_blocks: list[typing.Any] | None + + +class ThinkingListener(CallStackListener): + """Extracts thinking and reasoning content from litellm completion responses.""" + + def __init__(self) -> None: + super().__init__() + self.thinking_records: list[ThinkingRecord] = [] + + @typing.override + def exit_completion(self, resp: typing.Any) -> None: + if resp is not None: + message = resp.choices[0].message + reasoning_content = message.get("reasoning_content") + thinking_blocks = message.get("thinking_blocks") + if reasoning_content or thinking_blocks: + self.thinking_records.append( + ThinkingRecord( + template=self.current_template(), + reasoning_content=reasoning_content, + thinking_blocks=thinking_blocks, + ) + ) + super().exit_completion(resp) + +class ElapsedListener(CallStackListener): + """Tracks the elapsed time of each :class:`Template` call.""" + + def __init__(self) -> None: + super().__init__() + self.timestack: list[float] = [] + self.elapsed:collections.defaultdict[typing.Hashable,float] = collections.defaultdict(float) + + @typing.override + def enter_completion(self): + super().enter_completion() + self.timestack.append(time.time()) + + @typing.override + def exit_completion(self, resp: typing.Any) -> None: + time_elapsed = time.time() - self.timestack[-1] + self.elapsed[self.current_template()] += time_elapsed + self.timestack.pop() + super().exit_completion(resp) + + +class ThinkingElapsedListener(ThinkingListener, ElapsedListener): + """Combines thinking extraction and elapsed time tracking.""" + def __init__(self): + super().__init__() From 9b9fc61c16251cefc87c2ab4c76bf68f0fcab9e9 Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 15:57:42 -0400 Subject: [PATCH 08/15] extend the call stack listener to include an extra dict field --- effectful/handlers/llm/completions.py | 37 +++++++++++++++------------ 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/effectful/handlers/llm/completions.py b/effectful/handlers/llm/completions.py index 3649dbdc..25e85000 100644 --- a/effectful/handlers/llm/completions.py +++ b/effectful/handlers/llm/completions.py @@ -538,7 +538,7 @@ def __init__(self, listener: ObservabilityListener): self.listener = listener @implements(completion) - def _observe_completion(self, *args, **kwargs) -> typing.Any: + def _completion(self, *args, **kwargs) -> typing.Any: self.listener.enter_completion() response: typing.Any = None try: @@ -583,6 +583,11 @@ class NoTemplateException(Exception): """Raised when accessing the call stack does not have a :class:`Template`.""" pass +@dataclasses.dataclass(frozen=True) +class CallInfo[F : Tool[...,typing.Any]]: + func: F + info: dict[typing.Any,typing.Any] + class CallStackListener(ObservabilityListener): """Listener that maintains a call stack of active Tool and Template calls. @@ -594,12 +599,12 @@ class CallStackListener(ObservabilityListener): """ def __init__(self) -> None: - self.callstack: list[Tool[...,typing.Any]] = [] + self.callstack: list[CallInfo[Tool[...,typing.Any]]] = [] @typing.override def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: super().enter_tool_call(tool) - self.callstack.append(tool) + self.callstack.append(CallInfo(tool,{})) @typing.override def exit_tool_call[**P,Q](self, tool: Tool[P, Q], result: Q | None) -> None: @@ -610,15 +615,15 @@ def exit_tool_call[**P,Q](self, tool: Tool[P, Q], result: Q | None) -> None: @typing.override def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: super().enter_template_call(template) - self.callstack.append(template) + self.callstack.append(CallInfo(template,{})) @typing.override def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: - assert len(self.callstack) > 0 and template is self.callstack[-1] + assert len(self.callstack) > 0 and template is self.callstack[-1].func self.callstack.pop() super().exit_template_call(template, result) - def current_function(self) -> typing.Any: + def current_func_info(self) -> CallInfo[Tool[...,typing.Any]]: """Return the innermost active :class:`Tool` or :class:`Template`. :raises EmptyCallStackException: if the call stack is empty. @@ -628,14 +633,16 @@ def current_function(self) -> typing.Any: except IndexError: raise EmptyCallStackException() - def current_template(self) -> typing.Any: + def current_template_info(self) -> CallInfo[Template[...,typing.Any]]: """Return the innermost active :class:`Template`, skipping any nested :class:`Tool`s. :raises NoTemplateException: if no :class:`Template` is on the stack. """ try: - return next(func for func in reversed(self.callstack) if - isinstance(func, Template)) + # need to repack CallInfo to make the typechecker happy + return (next(CallInfo(ci.func,ci.info) + for ci in reversed(self.callstack) + if isinstance(ci.func, Template))) except StopIteration: raise NoTemplateException() @@ -663,7 +670,7 @@ def exit_completion(self, resp: typing.Any) -> None: if reasoning_content or thinking_blocks: self.thinking_records.append( ThinkingRecord( - template=self.current_template(), + template=self.current_template_info().func, reasoning_content=reasoning_content, thinking_blocks=thinking_blocks, ) @@ -671,23 +678,21 @@ def exit_completion(self, resp: typing.Any) -> None: super().exit_completion(resp) class ElapsedListener(CallStackListener): - """Tracks the elapsed time of each :class:`Template` call.""" + """Tracks the elapsed time of each :class:`Tool` or :class:`Template` call.""" def __init__(self) -> None: super().__init__() - self.timestack: list[float] = [] self.elapsed:collections.defaultdict[typing.Hashable,float] = collections.defaultdict(float) @typing.override def enter_completion(self): super().enter_completion() - self.timestack.append(time.time()) + self.current_func_info().info['time'] = time.time() @typing.override def exit_completion(self, resp: typing.Any) -> None: - time_elapsed = time.time() - self.timestack[-1] - self.elapsed[self.current_template()] += time_elapsed - self.timestack.pop() + time_elapsed = time.time() - self.current_func_info().info['time'] + self.elapsed[self.current_func_info().func] += time_elapsed super().exit_completion(resp) From b20ef70119dd14649d74e6363fe4706a4ef95089 Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 16:07:20 -0400 Subject: [PATCH 09/15] pulling out the listeners into the test file --- effectful/handlers/llm/completions.py | 57 +-------- playground/obshandler.py | 176 ++++---------------------- 2 files changed, 27 insertions(+), 206 deletions(-) diff --git a/effectful/handlers/llm/completions.py b/effectful/handlers/llm/completions.py index 25e85000..b851677f 100644 --- a/effectful/handlers/llm/completions.py +++ b/effectful/handlers/llm/completions.py @@ -608,7 +608,7 @@ def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: @typing.override def exit_tool_call[**P,Q](self, tool: Tool[P, Q], result: Q | None) -> None: - assert len(self.callstack) > 0 and tool is self.callstack[-1] + assert len(self.callstack) > 0 and tool is self.callstack[-1].func self.callstack.pop() super().exit_tool_call(tool, result) @@ -645,58 +645,3 @@ def current_template_info(self) -> CallInfo[Template[...,typing.Any]]: if isinstance(ci.func, Template))) except StopIteration: raise NoTemplateException() - -@dataclasses.dataclass -class ThinkingRecord: - """A single thinking/reasoning extraction paired with its source template.""" - template: Template[...,typing.Any] - reasoning_content: str | None - thinking_blocks: list[typing.Any] | None - - -class ThinkingListener(CallStackListener): - """Extracts thinking and reasoning content from litellm completion responses.""" - - def __init__(self) -> None: - super().__init__() - self.thinking_records: list[ThinkingRecord] = [] - - @typing.override - def exit_completion(self, resp: typing.Any) -> None: - if resp is not None: - message = resp.choices[0].message - reasoning_content = message.get("reasoning_content") - thinking_blocks = message.get("thinking_blocks") - if reasoning_content or thinking_blocks: - self.thinking_records.append( - ThinkingRecord( - template=self.current_template_info().func, - reasoning_content=reasoning_content, - thinking_blocks=thinking_blocks, - ) - ) - super().exit_completion(resp) - -class ElapsedListener(CallStackListener): - """Tracks the elapsed time of each :class:`Tool` or :class:`Template` call.""" - - def __init__(self) -> None: - super().__init__() - self.elapsed:collections.defaultdict[typing.Hashable,float] = collections.defaultdict(float) - - @typing.override - def enter_completion(self): - super().enter_completion() - self.current_func_info().info['time'] = time.time() - - @typing.override - def exit_completion(self, resp: typing.Any) -> None: - time_elapsed = time.time() - self.current_func_info().info['time'] - self.elapsed[self.current_func_info().func] += time_elapsed - super().exit_completion(resp) - - -class ThinkingElapsedListener(ThinkingListener, ElapsedListener): - """Combines thinking extraction and elapsed time tracking.""" - def __init__(self): - super().__init__() diff --git a/playground/obshandler.py b/playground/obshandler.py index fdbcc718..aeff8780 100644 --- a/playground/obshandler.py +++ b/playground/obshandler.py @@ -1,98 +1,30 @@ -"""Observability handler. - -Composes with effectful's handler system to enable access to call -traces and raw completion outputs. The exact logging behavior is -specified by implementing a listener interface. -""" +import dataclasses from collections import defaultdict -from dataclasses import dataclass -from typing import Any, Hashable, cast, override +from typing import Any, Hashable, override from effectful.handlers.llm import Tool, Template from effectful.ops.types import NotHandled from effectful.handlers.llm.completions import completion from effectful.ops.semantics import fwd, coproduct, handler -from effectful.ops.syntax import ObjectInterpretation, implements from effectful.handlers.llm.completions import ( - LiteLLMProvider, + LiteLLMProvider, ObservabilityHandler, CallStackListener ) -from functools import reduce - from time import time -class ObservabilityListener: - def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: - pass - - def exit_tool_call[**P,Q](self, tool: Tool[P,Q], result: Q | None) -> None: - pass - - def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: - pass - - def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: - pass - - def enter_completion(self) -> None: - pass - - def exit_completion(self, resp: Any) -> None: - pass - - -class EmptyCallStackException(Exception): - pass - -class CallStackListener(ObservabilityListener): - def __init__(self): - self.callstack: list[Any] = [] - - @override - def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: - super().enter_tool_call(tool) - self.callstack.append(tool) - - @override - def exit_tool_call[**P,Q](self, tool: Tool[P, Q], result: Q | None) -> None: - assert len(self.callstack) > 0 and tool is self.callstack[-1] - self.callstack.pop() - super().exit_tool_call(tool, result) - - @override - def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: - super().enter_template_call(template) - self.callstack.append(template) - - @override - def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: - assert len(self.callstack) > 0 and template is self.callstack[-1] - self.callstack.pop() - super().exit_template_call(template, result) - - def current_function(self) -> Any: - try: - return self.callstack[-1] - except IndexError: - raise EmptyCallStackException() - - def current_template(self) -> Any: - try: - return next(func for func in reversed(self.callstack) if - isinstance(func, Template)) - except StopIteration: - raise EmptyCallStackException() - -@dataclass +@dataclasses.dataclass class ThinkingRecord: - template: Any + """A single thinking/reasoning extraction paired with its source template.""" + template: Template[...,Any] reasoning_content: str | None thinking_blocks: list[Any] | None class ThinkingListener(CallStackListener): - def __init__(self): + """Extracts thinking and reasoning content from litellm completion responses.""" + + def __init__(self) -> None: super().__init__() self.thinking_records: list[ThinkingRecord] = [] @@ -105,7 +37,7 @@ def exit_completion(self, resp: Any) -> None: if reasoning_content or thinking_blocks: self.thinking_records.append( ThinkingRecord( - template=self.current_template(), + template=self.current_template_info().func, reasoning_content=reasoning_content, thinking_blocks=thinking_blocks, ) @@ -113,74 +45,28 @@ def exit_completion(self, resp: Any) -> None: super().exit_completion(resp) class ElapsedListener(CallStackListener): - """Tracks the elapsed time of each :class:`Tool` or - :class:`Template` call.""" + """Tracks the elapsed time of each :class:`Tool` or :class:`Template` call.""" - def __init__(self): + def __init__(self) -> None: super().__init__() - self.timestack = [] self.elapsed:defaultdict[Hashable,float] = defaultdict(float) @override def enter_completion(self): super().enter_completion() - self.timestack.append(time()) + self.current_func_info().info['time'] = time() @override def exit_completion(self, resp: Any) -> None: - time_elapsed = time() - self.timestack[-1] - self.elapsed[self.current_template()] += time_elapsed - self.timestack.pop() + time_elapsed = time() - self.current_func_info().info['time'] + self.elapsed[self.current_func_info().func] += time_elapsed super().exit_completion(resp) - -class ObservabilityHandler(ObjectInterpretation): - """Tracks the call stack of :class:`Tool` and :class:`Template` - and invokes a callback functions contained in an - :class:`ObservabilityListener` - """ - - def __init__(self, listener: ObservabilityListener): - self.listener = listener - - @implements(completion) - def _observe_completion(self, *args, **kwargs) -> Any: - self.listener.enter_completion() - response: Any = None - try: - response = fwd(*args, **kwargs) - return response - finally: - self.listener.exit_completion(response) - - @implements(Tool.__apply__) - def _call_tool[**P,T]( - self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs - ) -> T: - result_opt: T | None = None - try: - self.listener.enter_tool_call(tool) - result = cast(T, fwd(tool,*args,**kwargs)) - result_opt = result - return result - finally: - self.listener.exit_tool_call(tool, result_opt) - - - @implements(Template.__apply__) - def _call_template[**P,T]( - self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs - ) -> T: - result_opt: T | None = None - try: - self.listener.enter_template_call(template) - result = cast(T, fwd(template,*args,**kwargs)) - result_opt = result - return result - finally: - self.listener.exit_template_call(template, result_opt) - +class ThinkingElapsedListener(ThinkingListener, ElapsedListener): + """Combines thinking extraction and elapsed time tracking.""" + def __init__(self): + super().__init__() @Template.define @@ -203,30 +89,17 @@ def pick_fruit() -> str: """Return the name of a fruit.""" raise NotHandled - -class ThinkingElapsedListener(ThinkingListener, ElapsedListener): - def __init__(self): - super().__init__() - - def test_handler(): -# provider = LiteLLMProvider( -# model='anthropic/claude-sonnet-4-20250514', -# thinking={"type": "enabled", "budget_tokens": 1024} -# ) provider = LiteLLMProvider( - model='openai/gemma', - api_key='', - api_base='http://127.0.0.1:8080/', - temperature=1.0, - top_p=0.95, - top_k=64 + model='anthropic/claude-sonnet-4-20250514', + thinking={"type": "enabled", "budget_tokens": 1024} ) listener = ThinkingElapsedListener() obsprovider = ObservabilityHandler(listener) - with handler(reduce(coproduct, [provider, obsprovider])): + + with handler(provider), handler(obsprovider): print(pick_fruit()) print(find_treasure()) @@ -237,3 +110,6 @@ def test_handler(): print('----------------------------------------') for func, time in listener.elapsed.items(): print(f"{func}:{time:.2f}s") + +if __name__ == '__main__': + test_handler() From a6d8604a3177c73f474651785bbb3db00f0f4c9b Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 16:26:43 -0400 Subject: [PATCH 10/15] remove claude.md from version control --- CLAUDE.md | 72 ------------------------------------------------------- 1 file changed, 72 deletions(-) delete mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index 5df04fa5..00000000 --- a/CLAUDE.md +++ /dev/null @@ -1,72 +0,0 @@ -# CLAUDE.md - -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. - -## Project Overview - -Effectful is an algebraic effect system library for Python, providing algebraic effect handlers for metaprogramming and DSL implementation. It integrates with PyTorch, Pyro, JAX, and NumPyro for tensor operations and probabilistic programming. The core library has zero dependencies; all integrations are optional extras. - -## Common Commands - -### Development Setup -```bash -uv sync --all-extras --dev # Install all dependencies -``` - -### Testing -```bash -make test # Lint + all tests -pytest effectful/ tests/ -n auto # All tests (parallel) -pytest tests/test_ops_syntax.py -v # Single test file -pytest -k test_name -v # Single test by name -``` - -CI runs tests in groups: `core`, `indexed`, `torch`, `pyro`, `jax`, `numpyro`, `llm`, `examples`. To run a specific group locally: -```bash -pytest tests/test_handlers_torch*.py -n auto -pytest effectful/ tests/test_ops_*.py tests/test_internals_*.py -n auto # core group -``` - -Note: `--doctest-modules` is enabled by default via pyproject.toml. - -### Linting & Formatting -```bash -make lint # mypy + ruff check + ruff format --diff + nbqa on docs -make format # Auto-fix: ruff --fix + ruff format -``` - -Ruff rules: `F`, `I`, `PERF`, `UP` (target Python 3.12). - -### LLM Test Fixtures -```bash -make rebuild-fixtures # Rebuilds LLM test fixtures (needs API keys) -``` - -## Architecture - -### Core (`effectful/ops/`) - -- **`types.py`** — Foundational types: `Operation[**Q, V]` (abstract effect), `Term[T]` (unevaluated operation application), `Interpretation` (handler dict mapping operations to functions), `NotHandled` exception. -- **`syntax.py`** — DSL for defining operations: `defop()`, `defdata()`, `deffn()`, `defstream()`. Includes `Scoped` annotation for variable binding semantics and `syntactic_eq()` for structural equality. -- **`semantics.py`** — Handler runtime: `handler()` context manager installs interpretations, `evaluate()` fully reduces expressions, `fwd()` delegates to parent handler, `coproduct()`/`product()` compose interpretations, `fvsof()` finds free variables. - -### Handlers (`effectful/handlers/`) - -Each handler implements interpretations for a specific library's operations: - -- **`indexed.py`** — Named dimension indexing -- **`torch.py`** — PyTorch tensor operations with named dimensions -- **`pyro.py`** — Pyro probabilistic programming -- **`numpyro.py`** — NumPyro distributions (depends on JAX) -- **`jax/`** — JAX operations with numpy/scipy wrappers -- **`llm/`** — LLM integration (experimental): encoding Python→prompts, evaluating LLM outputs→Python, completions, templates - -### Internals (`effectful/internals/`) - -- **`unification.py`** — Pattern matching and unification algorithm (used by handler dispatch) -- **`runtime.py`** — Handler execution context (interpretation stack) -- **`product_n.py`** — N-ary product types - -### Key Pattern - -Operations are defined with `defop()`, producing callable objects. When no handler is installed, calling an operation creates a `Term` (lazy/symbolic). Handlers are installed via `handler()` context manager, providing concrete implementations. Multiple handlers compose via `coproduct()` (union) or `product()` (safe merge). The `fwd()` function inside a handler delegates to the next handler in the stack. From bcf1b03bbbd51edff12da2f5a45c979981bd046c Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 16:44:07 -0400 Subject: [PATCH 11/15] create a temporary folder with examples --- obs-examples/thinking_elapsed.py | 125 ++++++++++++++++++ .../thinking_elapsed_multi.py | 0 2 files changed, 125 insertions(+) create mode 100644 obs-examples/thinking_elapsed.py rename playground/obshandler.py => obs-examples/thinking_elapsed_multi.py (100%) diff --git a/obs-examples/thinking_elapsed.py b/obs-examples/thinking_elapsed.py new file mode 100644 index 00000000..c8bd009d --- /dev/null +++ b/obs-examples/thinking_elapsed.py @@ -0,0 +1,125 @@ +import dataclasses + +from collections import defaultdict +from functools import reduce +from collections.abc import Callable +from typing import Any, Hashable, override + +from effectful.handlers.llm import Tool, Template +from effectful.ops.types import NotHandled +from effectful.handlers.llm.completions import ( + LiteLLMProvider, ObservabilityHandler, ObservabilityListener, + CallStackListener, CallInfo, +) +from effectful.ops.semantics import coproduct, handler +from time import time + + +@dataclasses.dataclass +class ThinkingRecord: + """A single thinking/reasoning extraction paired with its source template.""" + template: Template[..., Any] + reasoning_content: str | None + thinking_blocks: list[Any] | None + + +class ThinkingListener(ObservabilityListener): + """Extracts thinking and reasoning content from litellm completion responses.""" + + def __init__( + self, + get_template_info: Callable[[], CallInfo[Template[..., Any]]], + ) -> None: + self.thinking_records: list[ThinkingRecord] = [] + self._get_template_info = get_template_info + + @override + def exit_completion(self, resp: Any) -> None: + if resp is not None: + message = resp.choices[0].message + reasoning_content = message.get("reasoning_content") + thinking_blocks = message.get("thinking_blocks") + if reasoning_content or thinking_blocks: + self.thinking_records.append( + ThinkingRecord( + template=self._get_template_info().func, + reasoning_content=reasoning_content, + thinking_blocks=thinking_blocks, + ) + ) + + +class ElapsedListener(ObservabilityListener): + """Tracks the elapsed time of each :class:`Template` call.""" + + def __init__( + self, + get_func_info: Callable[[], CallInfo[Tool[..., Any]]], + ) -> None: + self.elapsed: defaultdict[Hashable, float] = defaultdict(float) + self._get_func_info = get_func_info + + @override + def enter_completion(self) -> None: + self._get_func_info().info['time'] = time() + + @override + def exit_completion(self, resp: Any) -> None: + func_info = self._get_func_info() + time_elapsed = time() - func_info.info['time'] + self.elapsed[func_info.func] += time_elapsed + + +@Template.define +def find_treasure() -> str: + """Ask Bob to find where the treasure is.""" + raise NotHandled + +@Template.define +def bob() -> str: + """Ask Alice to find where the treasure is.""" + raise NotHandled + +@Tool.define +def alice() -> str: + """Returns where the treasure is.""" + return "school" + +@Template.define +def pick_fruit() -> str: + """Return the name of a fruit.""" + raise NotHandled + + +def test_handler(): + provider = LiteLLMProvider( + model='anthropic/claude-sonnet-4-20250514', + thinking={"type": "enabled", "budget_tokens": 1024} + ) + + callstack = CallStackListener() + thinking = ThinkingListener(callstack.current_template_info) + elapsed = ElapsedListener(callstack.current_func_info) + + combined = reduce(coproduct, [ + provider, + ObservabilityHandler(thinking), + ObservabilityHandler(elapsed), + ObservabilityHandler(callstack), + ]) + + with handler(combined): + print(pick_fruit()) + print(find_treasure()) + + print('----------------------------------------') + for record in thinking.thinking_records: + print(record) + + print('----------------------------------------') + for func, elapsed_time in elapsed.elapsed.items(): + print(f"{func}:{elapsed_time:.2f}s") + + +if __name__ == '__main__': + test_handler() diff --git a/playground/obshandler.py b/obs-examples/thinking_elapsed_multi.py similarity index 100% rename from playground/obshandler.py rename to obs-examples/thinking_elapsed_multi.py From 944cc48a03879ac452f94536bcf332487b8c3dcd Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 16:46:18 -0400 Subject: [PATCH 12/15] remove redundant import --- effectful/handlers/llm/completions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/effectful/handlers/llm/completions.py b/effectful/handlers/llm/completions.py index b851677f..15e9067f 100644 --- a/effectful/handlers/llm/completions.py +++ b/effectful/handlers/llm/completions.py @@ -6,7 +6,6 @@ import inspect import string import textwrap -import time import traceback import typing import uuid From d59854d4844597542cc1cdac11c09d4b10e0ab7e Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Wed, 8 Apr 2026 16:51:15 -0400 Subject: [PATCH 13/15] linter pass --- effectful/handlers/llm/completions.py | 60 ++++++++++++++++----------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/effectful/handlers/llm/completions.py b/effectful/handlers/llm/completions.py index 15e9067f..c8126af1 100644 --- a/effectful/handlers/llm/completions.py +++ b/effectful/handlers/llm/completions.py @@ -500,17 +500,19 @@ class ObservabilityListener(typing.Protocol): subscribe to only the events they care about. """ - def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: + def enter_tool_call[**P, Q](self, tool: Tool[P, Q]) -> None: # can't just `pass` because that would mark the method as abstract return None - def exit_tool_call[**P,Q](self, tool: Tool[P,Q], result: Q | None) -> None: + def exit_tool_call[**P, Q](self, tool: Tool[P, Q], result: Q | None) -> None: return None - def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: + def enter_template_call[**P, Q](self, template: Template[P, Q]) -> None: return None - def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: + def exit_template_call[**P, Q]( + self, template: Template[P, Q], result: Q | None + ) -> None: return None def enter_completion(self) -> None: @@ -547,27 +549,26 @@ def _completion(self, *args, **kwargs) -> typing.Any: self.listener.exit_completion(response) @implements(Tool.__apply__) - def _call_tool[**P,T]( - self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs + def _call_tool[**P, T]( + self, tool: Tool[P, T], *args: P.args, **kwargs: P.kwargs ) -> T: result_opt: T | None = None try: self.listener.enter_tool_call(tool) - result = typing.cast(T, fwd(tool,*args,**kwargs)) + result = typing.cast(T, fwd(tool, *args, **kwargs)) result_opt = result return result finally: self.listener.exit_tool_call(tool, result_opt) - @implements(Template.__apply__) - def _call_template[**P,T]( - self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs + def _call_template[**P, T]( + self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs ) -> T: result_opt: T | None = None try: self.listener.enter_template_call(template) - result = typing.cast(T, fwd(template,*args,**kwargs)) + result = typing.cast(T, fwd(template, *args, **kwargs)) result_opt = result return result finally: @@ -576,16 +577,21 @@ def _call_template[**P,T]( class EmptyCallStackException(Exception): """Raised when accessing the call stack is empty.""" + pass + class NoTemplateException(Exception): """Raised when accessing the call stack does not have a :class:`Template`.""" + pass + @dataclasses.dataclass(frozen=True) -class CallInfo[F : Tool[...,typing.Any]]: +class CallInfo[F: Tool[..., typing.Any]]: func: F - info: dict[typing.Any,typing.Any] + info: dict[typing.Any, typing.Any] + class CallStackListener(ObservabilityListener): """Listener that maintains a call stack of active Tool and Template calls. @@ -598,31 +604,33 @@ class CallStackListener(ObservabilityListener): """ def __init__(self) -> None: - self.callstack: list[CallInfo[Tool[...,typing.Any]]] = [] + self.callstack: list[CallInfo[Tool[..., typing.Any]]] = [] @typing.override - def enter_tool_call[**P,Q](self, tool: Tool[P,Q]) -> None: + def enter_tool_call[**P, Q](self, tool: Tool[P, Q]) -> None: super().enter_tool_call(tool) - self.callstack.append(CallInfo(tool,{})) + self.callstack.append(CallInfo(tool, {})) @typing.override - def exit_tool_call[**P,Q](self, tool: Tool[P, Q], result: Q | None) -> None: + def exit_tool_call[**P, Q](self, tool: Tool[P, Q], result: Q | None) -> None: assert len(self.callstack) > 0 and tool is self.callstack[-1].func self.callstack.pop() super().exit_tool_call(tool, result) @typing.override - def enter_template_call[**P,Q](self, template: Template[P,Q]) -> None: + def enter_template_call[**P, Q](self, template: Template[P, Q]) -> None: super().enter_template_call(template) - self.callstack.append(CallInfo(template,{})) + self.callstack.append(CallInfo(template, {})) @typing.override - def exit_template_call[**P,Q](self, template: Template[P,Q], result: Q | None) -> None: + def exit_template_call[**P, Q]( + self, template: Template[P, Q], result: Q | None + ) -> None: assert len(self.callstack) > 0 and template is self.callstack[-1].func self.callstack.pop() super().exit_template_call(template, result) - def current_func_info(self) -> CallInfo[Tool[...,typing.Any]]: + def current_func_info(self) -> CallInfo[Tool[..., typing.Any]]: """Return the innermost active :class:`Tool` or :class:`Template`. :raises EmptyCallStackException: if the call stack is empty. @@ -632,15 +640,17 @@ def current_func_info(self) -> CallInfo[Tool[...,typing.Any]]: except IndexError: raise EmptyCallStackException() - def current_template_info(self) -> CallInfo[Template[...,typing.Any]]: + def current_template_info(self) -> CallInfo[Template[..., typing.Any]]: """Return the innermost active :class:`Template`, skipping any nested :class:`Tool`s. :raises NoTemplateException: if no :class:`Template` is on the stack. """ try: # need to repack CallInfo to make the typechecker happy - return (next(CallInfo(ci.func,ci.info) - for ci in reversed(self.callstack) - if isinstance(ci.func, Template))) + return next( + CallInfo(ci.func, ci.info) + for ci in reversed(self.callstack) + if isinstance(ci.func, Template) + ) except StopIteration: raise NoTemplateException() From 7d4e6eb3e9a381a0ce28f0ae3d04996fcd1b487a Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Thu, 9 Apr 2026 11:51:03 -0400 Subject: [PATCH 14/15] rename from observability to logging --- effectful/handlers/llm/completions.py | 12 ++++++------ obs-examples/thinking_elapsed.py | 12 ++++++------ obs-examples/thinking_elapsed_multi.py | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/effectful/handlers/llm/completions.py b/effectful/handlers/llm/completions.py index c8126af1..61a5a175 100644 --- a/effectful/handlers/llm/completions.py +++ b/effectful/handlers/llm/completions.py @@ -492,7 +492,7 @@ def _call[**P, T]( return typing.cast(T, result) -class ObservabilityListener(typing.Protocol): +class LoggingListener(typing.Protocol): """Interface for observing :class:`Tool`, :class:`Template`, and completion call events. @@ -522,20 +522,20 @@ def exit_completion(self, resp: typing.Any) -> None: return None -class ObservabilityHandler(ObjectInterpretation): +class LoggingHandler(ObjectInterpretation): """Effect handler that wraps :class:`Tool`, :class:`Template`, and completion calls and invokes callback functions registered in :attr:`listener`. Compose with a provider via :func:`coproduct` or nested :func:`handler` - context managers to add observability without modifying the provider:: + context managers to add logging without modifying the provider logic:: listener = ThinkingElapsedListener() - obs = ObservabilityHandler(listener) + obs = LoggingHandler(listener) with handler(provider), handler(obs): result = my_template() """ - def __init__(self, listener: ObservabilityListener): + def __init__(self, listener: LoggingListener): self.listener = listener @implements(completion) @@ -593,7 +593,7 @@ class CallInfo[F: Tool[..., typing.Any]]: info: dict[typing.Any, typing.Any] -class CallStackListener(ObservabilityListener): +class CallStackListener(LoggingListener): """Listener that maintains a call stack of active Tool and Template calls. The call stack can be accessed directly through :attr:`callstack`. diff --git a/obs-examples/thinking_elapsed.py b/obs-examples/thinking_elapsed.py index c8bd009d..73c2cd9d 100644 --- a/obs-examples/thinking_elapsed.py +++ b/obs-examples/thinking_elapsed.py @@ -8,7 +8,7 @@ from effectful.handlers.llm import Tool, Template from effectful.ops.types import NotHandled from effectful.handlers.llm.completions import ( - LiteLLMProvider, ObservabilityHandler, ObservabilityListener, + LiteLLMProvider, LoggingHandler, LoggingListener, CallStackListener, CallInfo, ) from effectful.ops.semantics import coproduct, handler @@ -23,7 +23,7 @@ class ThinkingRecord: thinking_blocks: list[Any] | None -class ThinkingListener(ObservabilityListener): +class ThinkingListener(LoggingListener): """Extracts thinking and reasoning content from litellm completion responses.""" def __init__( @@ -49,7 +49,7 @@ def exit_completion(self, resp: Any) -> None: ) -class ElapsedListener(ObservabilityListener): +class ElapsedListener(LoggingListener): """Tracks the elapsed time of each :class:`Template` call.""" def __init__( @@ -103,9 +103,9 @@ def test_handler(): combined = reduce(coproduct, [ provider, - ObservabilityHandler(thinking), - ObservabilityHandler(elapsed), - ObservabilityHandler(callstack), + LoggingHandler(thinking), + LoggingHandler(elapsed), + LoggingHandler(callstack), ]) with handler(combined): diff --git a/obs-examples/thinking_elapsed_multi.py b/obs-examples/thinking_elapsed_multi.py index aeff8780..2c249867 100644 --- a/obs-examples/thinking_elapsed_multi.py +++ b/obs-examples/thinking_elapsed_multi.py @@ -8,7 +8,7 @@ from effectful.handlers.llm.completions import completion from effectful.ops.semantics import fwd, coproduct, handler from effectful.handlers.llm.completions import ( - LiteLLMProvider, ObservabilityHandler, CallStackListener + LiteLLMProvider, LoggingHandler, CallStackListener ) from time import time @@ -96,7 +96,7 @@ def test_handler(): ) listener = ThinkingElapsedListener() - obsprovider = ObservabilityHandler(listener) + obsprovider = LoggingHandler(listener) with handler(provider), handler(obsprovider): From e24d135ef3f68f97807487b18d0d747a514165f8 Mon Sep 17 00:00:00 2001 From: Yiyun Liu Date: Fri, 10 Apr 2026 12:57:07 -0400 Subject: [PATCH 15/15] fix ci This issue is really caused but rather exposed by the PR. The completion operator shouldn't copy litellm.completion's type signature --- effectful/handlers/llm/completions.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/effectful/handlers/llm/completions.py b/effectful/handlers/llm/completions.py index 61a5a175..fd8e9eeb 100644 --- a/effectful/handlers/llm/completions.py +++ b/effectful/handlers/llm/completions.py @@ -160,8 +160,15 @@ def to_feedback_message(self, include_traceback: bool) -> Message: type MessageResult[T] = tuple[Message, typing.Sequence[DecodedToolCall], T | None] +# prevent :func:`functools.wraps` from copying over `litellm.completion`'s type annotations, +# which contains unresolvable forward references without import aiohttp +_assn_without_type = [ + attr for attr in functools.WRAPPER_ASSIGNMENTS if attr != "__annotations__" +] + + @Operation.define -@functools.wraps(litellm.completion) +@functools.wraps(litellm.completion, assigned=_assn_without_type) def completion(*args, **kwargs) -> typing.Any: """Low-level LLM request. Handlers may log/modify requests and delegate via fwd(). @@ -172,6 +179,9 @@ def completion(*args, **kwargs) -> typing.Any: return litellm.completion(*args, **kwargs) +del _assn_without_type + + @Operation.define def call_assistant[T, U]( tools: collections.abc.Mapping[str, Tool],