Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/aiko_services/main/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def create_stream(self, stream_id, graph_path=None,
pass

@abstractmethod
def destroy_stream(self, stream_id, graceful=False):
def destroy_stream(self, stream_id, graceful=False, use_thread_local=True, diagnostic={}):
pass

@abstractmethod
Expand Down Expand Up @@ -1014,7 +1014,10 @@ def create_stream(self, stream_id, graph_path=None,
self._disable_thread_local("create_stream()")
return True

def destroy_stream(self, stream_id, graceful=False, use_thread_local=True):
def destroy_stream(self, stream_id,
graceful=False,
use_thread_local=True,
diagnostic={}):
stream_id = str(stream_id)

# TODO: Proper solution for handling of remote Pipeline proxy
Expand Down Expand Up @@ -1049,7 +1052,7 @@ def destroy_stream(self, stream_id, graceful=False, use_thread_local=True):
stream.lock.acquire("destroy_stream()")

if graceful and len(stream.frames):
arguments = [stream_id, graceful, use_thread_local]
arguments = [stream_id, graceful, use_thread_local, diagnostic]
self._post_message(
ActorTopic.IN, "destroy_stream", arguments, delay=3.0)
return False
Expand All @@ -1059,14 +1062,18 @@ def destroy_stream(self, stream_id, graceful=False, use_thread_local=True):
if stream_id in self.DEBUG: # DEBUG: 2024-12-02
del self.DEBUG[stream_id]

destroy_stream_state = stream.state

graph_path = self.pipeline_graph.get_path(self.share["graph_path"])
for node in graph_path:
element, element_name, local, _ = \
PipelineGraph.get_element(node)
if local: ## Local element ##
try:
stream_event, diagnostic = element.stop_stream(
stream_event, stop_stream_data = element.stop_stream(
stream, stream_id)
if stream_event == StreamEvent.ERROR:
diagnostic = stop_stream_data
except Exception as exception:
self.logger.error("Exception in " \
"pipeline.destroy_stream() --> stop_stream()")
Expand All @@ -1077,7 +1084,23 @@ def destroy_stream(self, stream_id, graceful=False, use_thread_local=True):
element_name, stream, stream_event, diagnostic,
in_destroy_stream=True))
if stream.state == StreamState.ERROR:
break
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This break prevents stop_stream being called on downstream elements if an upstream stop_stream raises

destroy_stream_state = StreamState.ERROR
elif destroy_stream_state == StreamState.ERROR:
stream.state = StreamState.ERROR
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feed the error state back into the next stop_stream call, so that element knows the stream has stopped in error and can cleanup/finalise appropriately


# Notify listeners that the stream has stopped
stop_state = stream.state
if stop_state >= StreamState.RUN:
stop_state = StreamState.STOP
stream_info = {
"stream_id": stream.stream_id,
"frame_id": stream.frame_id,
"state": stop_state}
if stream.queue_response:
stream.queue_response.put((stream_info, diagnostic))
if stream.topic_response:
actor = get_actor_mqtt(stream.topic_response, Pipeline)
actor.process_frame_response(stream_info, diagnostic)
finally:
if use_thread_local:
stream.lock.release()
Expand Down Expand Up @@ -1580,7 +1603,8 @@ def get_stream_id():
if not in_destroy_stream: # avoid destroy_stream() recursion
if stream.lock._in_use:
stream.lock.release()
self.destroy_stream(get_stream_id(), use_thread_local=False)
stream.state = StreamState.ERROR
self.destroy_stream(get_stream_id(), use_thread_local=False, diagnostic=diagnostic)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing diagnostic here allows the stop-stream message put on the response_queueindestroy_streamto include the diagnostic message fromstart_stream`


return stream_state

Expand Down