Skip to content

Conversation

@rycerzes
Copy link
Contributor

@rycerzes rycerzes commented Dec 7, 2025

Add WebSocket support with concurrent session management

Adds WebSocket endpoints for persistent environment sessions with configurable concurrency limits #194

High-level Diff

These are the results on the server side:

- env = MyEnvironment()
  app = create_app(
-      env,
+     MyEnvironment,              # Pass class, not instance
      MyAction,
      MyObservation,
+     max_concurrent_envs=4,      # Allow 4 concurrent WebSocket sessions
)

On the client side, it requires a change or url:

from envs.echo_env import EchoEnv, EchoAction

+ client = EchoEnv(base_url="ws://localhost:8000/ws")
- client = EchoEnv(base_url="http://localhost:8000")

result = client.reset()
result = client.step(EchoAction(message="Hello!"))

# or async with
+ result = await client.reset()
+ result = await client.step(EchoAction(message="Hello!"))

This leads to high concurrency with limited resources:

image

Changes

  • WebSocket endpoint at /ws with message protocol for reset/step/state/close
  • Factory pattern support: pass environment class instead of instance to create per-session environments
  • ConcurrencyConfig for setting max concurrent sessions, timeout, and capacity behavior
  • CONCURRENCY_SAFE flag on environments (defaults to False) with startup validation
  • Session capacity tracking and error handling
  • New client: WebSocketEnvClient for persistent connections

API

New types:

  • ConcurrencyConfig(max_concurrent_envs, session_timeout_seconds, reject_on_capacity)
  • SessionInfo and ServerCapacityStatus for session metadata
  • WebSocket message types: WSResetMessage, WSStepMessage, WSStateMessage, WSCloseMessage
  • Response types: WSObservationResponse, WSStateResponse, WSErrorResponse

Usage:

# Factory mode for concurrent sessions
app = create_app(
    env=MyEnvironment,  # Pass class, not instance
    max_concurrent_envs=4
)

Defaults to max_concurrent_envs=1 for backward compatibility. Environments must set CONCURRENCY_SAFE=True to allow higher concurrency.

TODO

  • Session timeout enforcement (tracked but not implemented)
  • openenv init needs the WebSocket code integrated into the template:
  • Resource monitoring (memory/CPU per session)
  • Connection queueing when reject_on_capacity=False
  • Mark safe environments as CONCURRENCY_SAFE=True
  • Update envs to support concurrency

rycerzes and others added 5 commits December 4, 2025 23:01
…erver capabilities

- Introduced WebSocketEnvClient for persistent sessions with multi-step interactions.
- Updated HTTPEnvServer to support WebSocket connections and manage multiple concurrent environments.
- Added WebSocket message types and responses for better communication.
- Enhanced Environment interface with concurrency safety attributes.
@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Dec 7, 2025
@rycerzes rycerzes changed the base branch from main to release December 7, 2025 20:28
@rycerzes
Copy link
Contributor Author

rycerzes commented Dec 7, 2025

@burtenshaw draft PR for the ws and concurrency. I have merged the #238 into this as well.

Few notes, before #232 gets merged:

  • openenv init generates boilerplate template according to the old structure

  • openenv init needs the WebSocket code integrated into the template:

    • Add WebSocket client example/template
    • Update server templates to show WebSocket endpoint usage
    • Include documentation on CONCURRENCY_SAFE flag and concurrent sessions
  • VectorEnv abstraction for batched operations inspired by Gymnasium

@burtenshaw burtenshaw mentioned this pull request Dec 8, 2025
@burtenshaw
Copy link
Collaborator

burtenshaw commented Dec 8, 2025

Amazing work @rycerzes . Thanks

  • openenv init generates boilerplate template according to the old structure.

I'll integrate this in a new PR for you to merge here.

  • VectorEnv abstraction for batched operations inspired by Gymnasium

I think we can leave this for a subsequent PR.

Also, this env might be useful to you. It's basically just a benchmarking env that let's you test concurrency asynchronously like this.

@burtenshaw
Copy link
Collaborator

@rycerzes could you help me to understand this please:

openenv init generates boilerplate template according to the old structure.

What do you mean by old structure? afaik #232 openenv init generates a template with a corresponding structure to the branch. i.e. from:

from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State

@burtenshaw burtenshaw changed the title feat: WebSocket-based Concurrency Architecture [FEATURE] WebSocket-based Concurrency Architecture Dec 8, 2025
@rycerzes
Copy link
Contributor Author

rycerzes commented Dec 8, 2025

@burtenshaw

Thanks for the clarification! You're absolutely right - I need to correct my earlier comment.

What do you mean by old structure? afaik #232 openenv init generates a template with a corresponding structure to the branch. i.e. from:

from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State

I must have run openenv init from the main branch when I was testing, which would explain the confusion. The openenv init command on both the impl/concurrency branch and in #232 does generate the correct new structure with openenv.core imports.

I just verified this by running uv run openenv init test_env -o tests/ on the current branch, and it correctly generates all files with the new import structure. I have updated my above comment accordingly 👍


Also, this env might be useful to you. It's basically just a benchmarking env that let's you test concurrency asynchronously like this.

Thanks! That benchmark env would be perfect for testing the concurrency implementation. I'll take a look at it.
Apologies for the confusion on point 1!

Copy link

@Wauplin Wauplin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this very important piece @rycerzes! I've left quite some comments on how I would do things but some parts are left to the maintainers' decisions 🤗 Especially:

  1. should we allow "instantiate a server by passing an env instead of an env factory" to keep backward compatibility? => I would say "no" since project is still in early phase
  2. should we maintain both a "HTTP-based interface" and a "websocket-based interface"? => same, I would say "no" at it means doubling the amount of work (2 paths in the http server and 2 very similar clients to maintain with same interface with different internal logic). Better to keep only 1 interface that is more robust for the future. End users should not be impacted by this decision (except for the breaking change to adapt).

Apart from that, I usually tend to advice to simplify logic by not adding too many optional features at first. More options usually means more internal logic and more maintenance burden on the long run. So if something is not explicitly required, let's keep it for later.

Note that I haven't run the code myself. Will give it a try soon!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I would create a base model with only the config:

class BaseMessage(BaseModel): # naming to adapt?
    model_config = ConfigDict(extra="forbid", validate_assignment=True)

This one would be reused in every type (since we want the same config all the time).


Then for each WS message, use Literal[...] for the type field:

class WSResetMessage(BaseMessage):
    """WebSocket message to reset the environment."""

    type: Literal["reset"]
    data: Dict[str, Any] = Field(
        default_factory=dict,
        description="Optional reset parameters (seed, episode_id, etc.)",
    )

(same for each message)

And finally define the "main" WSMessage type like this:

WSIncomingMessage = Annotated[
    WSResetMessage | WSStepMessage | WSStateMessage | WSCloseMessage,
    Field(discriminator="type")
]

=> IDEs and linters should be able to understand that WSMessage can only accept a few values for type and that value defines what additional data can be attached to the message.

Comment on lines +285 to +294
class ConcurrencySafetyLevel(str):
"""
Classification of environment concurrency safety.
Environments are classified based on their ability to safely handle
multiple concurrent sessions within a single container.
"""

UNSAFE = "unsafe"
SAFE = "safe"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used anywhere? If goal is to define an enum, you should either do class ConcurrencySafetyLevel(enum.Enum, str): or simply Literal["safe", "unsafe"]. But just a boolean value is even better.

Comment on lines +311 to +315
session_timeout_seconds: Optional[float] = Field(
default=None,
gt=0,
description="Timeout in seconds for inactive sessions. None means no timeout.",
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
session_timeout_seconds: Optional[float] = Field(
default=None,
gt=0,
description="Timeout in seconds for inactive sessions. None means no timeout.",
)
session_timeout: Optional[float] = Field(
default=None,
gt=0,
description="Timeout in seconds for inactive sessions. None means no timeout.",
)

I think "_seconds" can be made implicit and documented in the description. It's quite common to have timeout in seconds in requests, httpx, etc.

Comment on lines +300 to +303
model_config = ConfigDict(
extra="forbid",
validate_assignment=True,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be factorized in the base config I mentioned above (same for other BaseModels)

Comment on lines +322 to +354
class ServerCapacityStatus(BaseModel):
"""Status of server capacity for concurrent sessions."""

model_config = ConfigDict(
extra="forbid",
validate_assignment=True,
)

active_sessions: int = Field(
ge=0,
description="Number of currently active sessions",
)
max_sessions: int = Field(
ge=1,
description="Maximum number of allowed sessions",
)
available_slots: int = Field(
ge=0,
description="Number of available session slots",
)
is_at_capacity: bool = Field(
description="Whether the server has reached maximum capacity",
)

@classmethod
def from_counts(cls, active: int, max_sessions: int) -> "ServerCapacityStatus":
"""Create status from active and max session counts."""
available = max(0, max_sessions - active)
return cls(
active_sessions=active,
max_sessions=max_sessions,
available_slots=available,
is_at_capacity=active >= max_sessions,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this class could be simplified with something like this:

class ServerCapacityStatus(BaseModel):
    """Status of server capacity for concurrent sessions."""

    active_sessions: int = Field( ge=0, description="Number of currently active sessions")
    max_sessions: int = Field(ge=1, description="Maximum number of allowed sessions")

    @model_validator(mode="after")
    def check_capacity_bounds(self) -> "ServerCapacityStatus":
        if self.active_sessions > self.max_sessions:
            raise ValueError(
                f"active_sessions ({self.active_sessions}) cannot exceed "
                f"max_sessions ({self.max_sessions})"
            )
        return self

    @property
    def available_slots(self) -> int:
        """Number of available session slots"""
        return max_sessions - active_sessions

    @property
    def is_at_capacity(self) -> int:  # Not sure this property is really necessary 
        """Whether the server has reached maximum capacity"""
        return self.available_slots == 0

This way available_slots and is_at_capacity are inferred properties, not stored values. And we always validate that active and max sessions are coherent.

Comment on lines +573 to +591
# Register concurrency config endpoint
@app.get(
"/concurrency",
response_model=ConcurrencyConfig,
tags=["Environment Info"],
summary="Get concurrency configuration",
description="""
Get the current concurrency configuration for this server.
Returns information about:
- **max_concurrent_envs**: Maximum number of concurrent WebSocket sessions
- **session_timeout_seconds**: Timeout for inactive sessions (None if no timeout)
- **reject_on_capacity**: Whether to reject or queue connections at capacity
""",
)
async def get_concurrency_config() -> ConcurrencyConfig:
"""Return concurrency configuration."""
return self._concurrency_config

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not but not sure it's necessary ?

Comment on lines +676 to +679
msg_type = message_dict.get("type", "")

try:
if msg_type == "reset":
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel logic could be simplified like this:

try:
    match msg_type:
        case "reset":
            ... # todo: implement
            response = WSObservationResponse(...)
        case "step":
            ... # todo: implement
            response = WSObservationResponse(...)
        case "state":
            ... # todo: implement
            response = WSStateResponse(...)
        case "close":
            ... # todo: implement
        case _:
            response = WSErrorResponse(
                data={"message": f"Unknown message type: {msg_type}", "code": "UNKNOWN_TYPE"}
            )

    await websocket.send_text(response.model_dump_json())

except ValidationError as e:
    error_resp = WSErrorResponse(
        data={"message": "Invalid message", "code": "VALIDATION_ERROR", "errors": e.errors()}
    )
    await websocket.send_text(error_resp.model_dump_json())
except Exception as e:
    error_resp = WSErrorResponse(
        data={"message": str(e), "code": "EXECUTION_ERROR"}
    )
    await websocket.send_text(error_resp.model_dump_json())

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way you have clear logic based on msg_type value + the validation errors are all caught in the same place


def create_app(
env: Environment,
env: Union[Environment, Callable[[], Environment], Type[Environment]],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
env: Union[Environment, Callable[[], Environment], Type[Environment]],
env: Callable[[], Environment],

should be enough if we break backward compat'? (at least for now since we don't accept inputs for environment resets yet)

Comment on lines +28 to +33
try:
import websockets
from websockets.sync.client import connect as ws_connect
except ImportError:
websockets = None # type: ignore
ws_connect = None # type: ignore
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since websockets is made a required dependency in pyproject.toml I think we should consider it as always available (simplifies a bit the logic)

Comment on lines +88 to +94
ws_url = base_url.rstrip("/")
if ws_url.startswith("http://"):
ws_url = "ws://" + ws_url[7:]
elif ws_url.startswith("https://"):
ws_url = "wss://" + ws_url[8:]
elif not ws_url.startswith("ws://") and not ws_url.startswith("wss://"):
ws_url = "ws://" + ws_url
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) could be a unit-tested helper (can be hard to track all specificities when updating this type of logic in the future)

@burtenshaw
Copy link
Collaborator

burtenshaw commented Dec 8, 2025

@pankit-eng @zkwentz Can you validate these two backward compatibility points from @Wauplin on this PR . In short, should we go all in on websockets or maintain a http implementation?

  • should we allow "instantiate a server by passing an env instead of an env factory" to keep backward compatibility? => I would say "no" since project is still in early phase

Server side app will look like this:

# Factory mode for concurrent sessions
app = create_app(
    env=MyEnvironment,  # Pass class, not instance
    max_concurrent_envs=4
)
  • should we maintain both a "HTTP-based interface" and a "websocket-based interface"? => same, I would say "no" at it means doubling the amount of work (2 paths in the http server and 2 very similar clients to maintain with same interface with different internal logic). Better to keep only 1 interface that is more robust for the future. End users should not be impacted by this decision (except for the breaking change to adapt).

iiuc, it client code will only look like this:

from envs.echo_env import EchoEnv, EchoAction

client = EchoEnv(base_url="ws://localhost:8000/ws")

result = await client.reset()
result = await client.step(EchoAction(...))

@burtenshaw
Copy link
Collaborator

@rycerzes I tested out this branch and it worked well. I updated the PR description myself with a high-level before and after snippet and some benchmarking info.

@rycerzes
Copy link
Contributor Author

Thanks @Wauplin for the detailed review! Really appreciate all the feedback - the suggestions on simplifying the message types with discriminators, refactoring the capacity status, and cleaning up the validation logic make a lot of sense. I'll work through these and have them resolved by end of Friday.

@burtenshaw Thanks for testing the branch and updating the PR description with the benchmarking info!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants