-
Notifications
You must be signed in to change notification settings - Fork 125
[Feature][Python] Implement EventListener in python sdk #688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
twosom
wants to merge
13
commits into
apache:main
Choose a base branch
from
twosom:implementation-python-event-listener
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
84d0278
feat(python): add EventContext class for python sdk
twosom 243d8a0
feat(python): add `EVENT_LISTENER` config in AgentConfigOptions
twosom 6302282
feat(python): added `EventListener` and `EventListenerMeta` class
twosom 072f43f
chore : formatting
twosom 50e15af
feat(python): add event listener instantiation and context conversion…
twosom b0f600a
refactor : refact test_event_listener test code
twosom 6db6fa2
feat(java): implement Java-side adapter to bridge Python event listen…
twosom 5b4042e
refactor: add utility method in AgentPlan
twosom d62bb2d
refactor(java): extract event conversion logic in PythonActionExecutor
twosom a23239c
feat(runtime): wire up Python event listeners in ActionExecutionOperator
twosom b880ad2
feat(python): support event listeners in LocalRunner
twosom 8fe6ee6
doc : add event listener doc for python
twosom 680c866
chore : add license header
twosom File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| --- | ||
| title: Event Listener | ||
| weight: 8 | ||
| type: docs | ||
| --- | ||
| <!-- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|
|
||
| ## Overview | ||
|
|
||
| `EventListener` is a callback mechanism that allows you to monitor and react to events as they are processed by the agent. It is triggered at the beginning of event processing, before any actions are executed. | ||
|
|
||
| Common use cases include: | ||
| - **Monitoring & Metrics**: Tracking event throughput, latency, or specific event types. | ||
| - **Logging**: Capturing event details for auditing or debugging. | ||
| - **Side Effects**: Triggering external notifications or system updates based on event reception. | ||
|
|
||
| ## Implementation | ||
|
|
||
| You can implement `EventListener` in either Java or Python. | ||
|
|
||
| {{< tabs "EventListener Implementation" >}} | ||
|
|
||
| {{< tab "Python" >}} | ||
| In Python, inherit from the `EventListener` class and implement the `on_event_processed` method. | ||
|
|
||
| ```python | ||
| from flink_agents.api.listener.event_listener import EventListener | ||
| from flink_agents.api.event_context import EventContext | ||
| from flink_agents.api.events.event import Event | ||
|
|
||
| class MyPythonListener(EventListener): | ||
| def on_event_processed(self, context: EventContext, event: Event) -> None: | ||
| print(f"Received event: {event.get_type()} at {context.timestamp}") | ||
| ``` | ||
| {{< /tab >}} | ||
|
|
||
| {{< tab "Java" >}} | ||
| In Java, implement the `EventListener` interface. | ||
|
|
||
| ```java | ||
| import org.apache.flink.agents.api.Event; | ||
| import org.apache.flink.agents.api.EventContext; | ||
| import org.apache.flink.agents.api.listener.EventListener; | ||
|
|
||
| public class MyJavaListener implements EventListener { | ||
| @Override | ||
| public void onEventProcessed(EventContext context, Event event) { | ||
| System.out.println("Received event: " + event.getType() | ||
| + " at " + context.getTimestamp()); | ||
| } | ||
| } | ||
| ``` | ||
| {{< /tab >}} | ||
|
|
||
| {{< /tabs >}} | ||
|
|
||
| ## Configuration | ||
|
|
||
| Register your listeners in the `AgentPlan` configuration using the `event-listeners` option. | ||
|
|
||
| {{< tabs "Configuration" >}} | ||
|
|
||
| {{< tab "Python" >}} | ||
|
|
||
| ```python | ||
| agents_env = AgentsExecutionEnvironment.get_execution_environment(env) | ||
|
|
||
| # Register listeners using str(ClassName) | ||
| agents_env.get_config().set( | ||
| AgentConfigOptions.EVENT_LISTENERS, | ||
| [str(MyPythonListener)] | ||
| ) | ||
| ``` | ||
| {{< /tab >}} | ||
|
|
||
| {{< tab "Java" >}} | ||
|
|
||
| ```java | ||
| AgentsExecutionEnvironment agentsEnv = AgentsExecutionEnvironment.getExecutionEnvironment(env); | ||
|
|
||
| // Register listeners using fully qualified class names | ||
| agentsEnv.getConfig().set( | ||
| AgentConfigOptions.EVENT_LISTENERS, | ||
| Collections.singletonList(MyJavaListener.class.getName()) | ||
| ); | ||
| ``` | ||
| {{< /tab >}} | ||
|
|
||
| {{< /tabs >}} | ||
|
|
||
| ## Best Practices & Limitations | ||
|
|
||
| - **Performance**: Listeners are executed **synchronously**. Keep them lightweight and avoid long-running or blocking operations. | ||
| - **No-Arg Constructor**: Implementing classes must have a public no-argument constructor for dynamic instantiation. | ||
| - **Error Handling**: Implementations must handle their own error recovery. Any unhandled exceptions thrown by a listener will disrupt the main event processing flow and may cause the agent to fail. | ||
| - **Cross-Language Support**: If you are using the Java runtime, you can still register Python listeners. The framework will handle the Java-to-Python conversion of `Event` and `EventContext` objects. To optimize performance, this conversion happens only once per event notification, even if multiple Python listeners are registered. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| ################################################################################ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| ################################################################################# | ||
| from pydantic import BaseModel | ||
|
|
||
|
|
||
| class EventContext(BaseModel): | ||
| """Contextual information about an event, such as its type and timestamp. | ||
|
|
||
| Attributes: | ||
| ---------- | ||
| eventType : str | ||
| The routing key for the event, matching the ``EVENT_TYPE`` constant or | ||
| type string. | ||
| timestamp : str | ||
| Timestamp of when the event occurred. | ||
| """ | ||
|
|
||
| eventType: str | ||
|
|
||
| timestamp: str | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| ################################################################################ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| ################################################################################# |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| ################################################################################ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| ################################################################################# | ||
| import inspect | ||
| from abc import ABC, ABCMeta, abstractmethod | ||
|
|
||
| from flink_agents.api.event_context import EventContext | ||
| from flink_agents.api.events.event import Event | ||
|
|
||
|
|
||
| class EventListenerMeta(ABCMeta): | ||
| """Metaclass for EventListener that provides a specialized string representation. | ||
|
|
||
| This metaclass overrides the ``__str__`` method for classes that implement | ||
| ``EventListener``. The resulting string format is | ||
| ``module:class_path.on_event_processed``, which is specifically designed to be | ||
| parsed by the agent's runtime (e.g., in Java via Pemja) to dynamically | ||
| instantiate the listener class. | ||
|
|
||
| The string representation handles: | ||
|
|
||
| - Standard module-level classes. | ||
| - Nested classes (using their full qualified name). | ||
| - Classes defined in the ``__main__`` module (attempting to resolve the actual | ||
| filename if available). | ||
| - Validation to ensure classes are not defined in a local scope (which would | ||
| make them inaccessible for remote/dynamic instantiation). | ||
| """ | ||
| def __str__(cls) -> str: | ||
| """Return a string representation of the listener class for dynamic | ||
| instantiation. | ||
|
|
||
| The format is ``module:class_path.on_event_processed``. | ||
| Example: ``my_module:MyListener.on_event_processed`` or | ||
| ``my_module:Outer.Inner.on_event_processed``. | ||
|
|
||
| Returns: | ||
| ------- | ||
| str | ||
| A string identifier for the class and its handler method. | ||
|
|
||
| Raises: | ||
| ------ | ||
| ValueError | ||
| If the class is defined within a local scope. | ||
| """ | ||
| class_qualname = cls.__qualname__ | ||
|
|
||
| if "<locals>" in class_qualname: | ||
| err_msg = ( | ||
| f"Cannot instantiate local class in '{class_qualname}'. " | ||
| f"Classes defined within a local scope (indicated by '<locals>') " | ||
| f"are not accessible via module attributes. Move the class to the module level." | ||
| ) | ||
| raise ValueError(err_msg) | ||
|
|
||
| module_obj = inspect.getmodule(cls) | ||
| if module_obj is None: | ||
| module_name = cls.__module__ | ||
| else: | ||
| module_name = module_obj.__name__ | ||
|
|
||
| if module_name == "__main__": | ||
| if hasattr(module_obj, "__file__"): | ||
| from pathlib import Path | ||
| file_path = Path(module_obj.__file__) | ||
| module_name = file_path.stem | ||
|
|
||
| return f"{module_name}:{class_qualname}.on_event_processed" | ||
|
|
||
|
|
||
| class EventListener(ABC, metaclass=EventListenerMeta): | ||
| """Interface for event listeners that are notified when events are received | ||
| for processing. | ||
|
|
||
| EventListener provides a callback mechanism triggered at the beginning of | ||
| event processing. This is useful for monitoring, metrics collection, | ||
| debugging, or triggering side effects based on event reception. | ||
|
|
||
| Event listeners are executed synchronously when an event is received, | ||
| before any actions are triggered. Implementations should be lightweight | ||
| and avoid blocking operations to prevent impacting agent performance. | ||
|
|
||
| **Note:** Implementing classes must provide a public no-argument constructor to | ||
| allow for dynamic instantiation by the agent. | ||
| """ | ||
|
|
||
| @abstractmethod | ||
| def on_event_processed(self, context: EventContext, event: Event) -> None: | ||
| """Called when an event is being processed. | ||
|
|
||
| This method is invoked when an event is received by the agent, before | ||
| it is processed by any actions. The listener can inspect the event and | ||
| its context to perform additional processing such as logging, metrics | ||
| collection, or triggering external notifications. | ||
|
|
||
| **Important:** This method should not throw exceptions as they will be | ||
| caught and logged but will not affect the main event processing flow. | ||
| Implementations should handle their own error recovery. | ||
|
|
||
| Parameters: | ||
| ---------- | ||
| context : EventContext | ||
| The context associated with the event | ||
| event : Event | ||
| The event that is being processed | ||
| """ |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
camelCase here clashes with the rest of
flink_agents/api, which is snake_case throughout. User code readingcontext.eventTypewill stand out. Wouldevent_typewithalias="eventType"satisfy the Java JSON contract while keeping the Python attribute idiomatic?