Skip to content

Commit 73e4e09

Browse files
committed
feat: add conversation stream config builder
* add typed streaming callbacks and decorator-based builder for sse responses * extend search parameters to accept conversation streaming configuration
1 parent 5391085 commit 73e4e09

1 file changed

Lines changed: 154 additions & 0 deletions

File tree

src/typesense/types/document.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,159 @@ class NLLanguageParameters(typing.TypedDict):
586586
nl_query_debug: typing.NotRequired[bool]
587587

588588

589+
class MessageChunk(typing.TypedDict):
590+
"""
591+
A single chunk from a conversation stream response.
592+
593+
Attributes:
594+
conversation_id (str): ID of the conversation.
595+
message (str): Message content for this chunk.
596+
"""
597+
598+
conversation_id: str
599+
message: str
600+
601+
602+
class StreamConfig(typing.Generic[TDoc], typing.TypedDict, total=False):
603+
"""
604+
Configuration for streaming conversation search responses.
605+
606+
Attributes:
607+
on_chunk: Callback invoked for each streamed chunk (conversation_id, message).
608+
on_complete: Callback invoked when the stream completes with the full search response.
609+
on_error: Callback invoked if an error occurs during streaming.
610+
"""
611+
612+
on_chunk: typing.Callable[["MessageChunk"], None]
613+
on_complete: typing.Callable[["SearchResponse[TDoc]"], None]
614+
on_error: typing.Callable[[BaseException], None]
615+
616+
617+
OnChunkCallback = typing.Callable[["MessageChunk"], None]
618+
OnCompleteCallback = typing.Callable[["SearchResponse[TDoc]"], None]
619+
OnErrorCallback = typing.Callable[[BaseException], None]
620+
621+
_OnChunkT = typing.TypeVar("_OnChunkT", bound=OnChunkCallback)
622+
_OnCompleteT = typing.TypeVar("_OnCompleteT", bound=OnCompleteCallback)
623+
_OnErrorT = typing.TypeVar("_OnErrorT", bound=OnErrorCallback)
624+
625+
626+
class StreamConfigBuilder:
627+
"""
628+
Builder for StreamConfig using decorators.
629+
630+
Example:
631+
>>> stream = StreamConfigBuilder()
632+
>>>
633+
>>> @stream.on_chunk
634+
... def handle_chunk(chunk: MessageChunk) -> None:
635+
... print(chunk["message"], end="", flush=True)
636+
>>>
637+
>>> @stream.on_complete
638+
... def handle_complete(response: dict) -> None:
639+
... print(f"Done! Found {response.get('found', 0)}")
640+
>>>
641+
>>> response = client.collections["docs"].documents.search({
642+
... "q": "query",
643+
... "query_by": "content",
644+
... "conversation_stream": True,
645+
... "stream_config": stream,
646+
... })
647+
"""
648+
649+
def __init__(self) -> None:
650+
"""Initialize an empty StreamConfigBuilder."""
651+
self._on_chunk: typing.Optional[OnChunkCallback] = None
652+
self._on_complete: typing.Optional[OnCompleteCallback] = None
653+
self._on_error: typing.Optional[OnErrorCallback] = None
654+
655+
def on_chunk(self, func: _OnChunkT) -> _OnChunkT:
656+
"""
657+
Decorator to register an on_chunk callback.
658+
659+
Args:
660+
func: Callback invoked for each streamed message chunk.
661+
662+
Returns:
663+
The original function (unmodified).
664+
"""
665+
self._on_chunk = func
666+
return func
667+
668+
def on_complete(self, func: _OnCompleteT) -> _OnCompleteT:
669+
"""
670+
Decorator to register an on_complete callback.
671+
672+
Args:
673+
func: Callback invoked when streaming completes with the full response.
674+
675+
Returns:
676+
The original function (unmodified).
677+
"""
678+
self._on_complete = func
679+
return func
680+
681+
def on_error(self, func: _OnErrorT) -> _OnErrorT:
682+
"""
683+
Decorator to register an on_error callback.
684+
685+
Args:
686+
func: Callback invoked if an error occurs during streaming.
687+
688+
Returns:
689+
The original function (unmodified).
690+
"""
691+
self._on_error = func
692+
return func
693+
694+
def build(self) -> StreamConfig:
695+
"""
696+
Build the StreamConfig dictionary.
697+
698+
Returns:
699+
A StreamConfig with the registered callbacks.
700+
"""
701+
config: StreamConfig = {}
702+
if self._on_chunk is not None:
703+
config["on_chunk"] = self._on_chunk
704+
if self._on_complete is not None:
705+
config["on_complete"] = self._on_complete
706+
if self._on_error is not None:
707+
config["on_error"] = self._on_error
708+
return config
709+
710+
def get(
711+
self, key: str, default: typing.Any = None
712+
) -> typing.Optional[typing.Callable[..., None]]:
713+
"""
714+
Get a callback by key (for compatibility with dict-like access).
715+
716+
Args:
717+
key: The callback name ('on_chunk', 'on_complete', or 'on_error').
718+
default: Default value if the callback is not set.
719+
720+
Returns:
721+
The callback function or the default value.
722+
"""
723+
return self.build().get(key, default)
724+
725+
726+
class ConversationStreamParameters(typing.Generic[TDoc], typing.TypedDict):
727+
"""
728+
Parameters for conversational search streaming.
729+
730+
Attributes:
731+
conversation_stream (bool): When true, the search response is streamed (SSE).
732+
stream_config: Callbacks for stream events. Not sent to the API.
733+
Can be a StreamConfig dict or a StreamConfigBuilder instance.
734+
"""
735+
736+
conversation_stream: typing.NotRequired[bool]
737+
stream_config: typing.NotRequired[
738+
typing.Union[StreamConfig[TDoc], StreamConfigBuilder]
739+
]
740+
741+
589742
class SearchParameters(
590743
RequiredSearchParameters,
591744
QueryParameters,
@@ -598,6 +751,7 @@ class SearchParameters(
598751
TypoToleranceParameters,
599752
CachingParameters,
600753
NLLanguageParameters,
754+
ConversationStreamParameters,
601755
):
602756
"""Parameters for searching documents."""
603757

0 commit comments

Comments
 (0)