Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions burr/core/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1507,15 +1507,15 @@ def pydantic(
writes: List[str],
state_input_type: Type["BaseModel"],
state_output_type: Type["BaseModel"],
stream_type: Union[Type["BaseModel"], Type[dict]],
stream_type: Union[Type["BaseModel"], Type[dict], object],
tags: Optional[List[str]] = None,
) -> Callable:
"""Creates a streaming action that uses pydantic models.

:param reads: The fields this consumes from the state.
:param writes: The fields this writes to the state.
:param stream_type: The pydantic model or dictionary type that is used to represent the partial results.
Use a dict if you want this untyped.
Use a dict if you want this untyped. Can also be a union of models (e.g., Model1 | Model2).
:param state_input_type: The pydantic model type that is used to represent the input state.
:param state_output_type: The pydantic model type that is used to represent the output state.
:param tags: Optional list of tags to associate with this action
Expand Down
25 changes: 22 additions & 3 deletions burr/integrations/pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,22 @@ async def async_action_function(state: State, **kwargs) -> State:
return decorator


PartialType = Union[Type[pydantic.BaseModel], Type[dict]]
# 'object' broadens the annotation to accept union forms (Model1 | Model2 / Union[...])
# which the type system cannot express more narrowly; _is_valid_stream_type enforces the real constraint.
PartialType = Union[Type[pydantic.BaseModel], Type[dict], object]


def _is_valid_stream_type(t: object) -> bool:
if t is dict or (isinstance(t, type) and issubclass(t, pydantic.BaseModel)):
return True
if typing.get_origin(t) is Union:
return all(_is_valid_stream_type(arg) for arg in typing.get_args(t))
# Python 3.10+ `X | Y` syntax produces types.UnionType
_UnionType = getattr(types, "UnionType", None)
if _UnionType is not None and isinstance(t, _UnionType):
return all(_is_valid_stream_type(arg) for arg in t.__args__)
return False


PydanticStreamingActionFunctionSync = Callable[
..., Generator[Tuple[Union[pydantic.BaseModel, dict], Optional[pydantic.BaseModel]], None, None]
Expand All @@ -290,15 +305,19 @@ async def async_action_function(state: State, **kwargs) -> State:

def _validate_and_extract_signature_types_streaming(
fn: PydanticStreamingActionFunction,
stream_type: Optional[Union[Type[pydantic.BaseModel], Type[dict]]],
stream_type: Optional[PartialType],
state_input_type: Optional[Type[pydantic.BaseModel]] = None,
state_output_type: Optional[Type[pydantic.BaseModel]] = None,
) -> Tuple[
Type[pydantic.BaseModel], Type[pydantic.BaseModel], Union[Type[dict], Type[pydantic.BaseModel]]
Type[pydantic.BaseModel], Type[pydantic.BaseModel], PartialType
]:
if stream_type is None:
# TODO -- derive from the signature
raise ValueError(f"stream_type is required for function: {fn.__qualname__}")
if not _is_valid_stream_type(stream_type):
raise ValueError(
f"stream_type must be a Pydantic BaseModel subclass, dict, or a union of those. Got: {stream_type!r}"
)
if state_input_type is None:
# TODO -- derive from the signature
raise ValueError(f"state_input_type is required for function: {fn.__qualname__}")
Expand Down
Loading
Loading