From 57e6f7bfb2c23b3a2ad6c826d9ae5ef012ff8913 Mon Sep 17 00:00:00 2001 From: Isabel Paredes Date: Mon, 15 Dec 2025 17:19:17 +0100 Subject: [PATCH 1/2] Add function to wait for welcome msg on iopub channel --- .../services/kernels/connection/channels.py | 54 ++++++++++++++++++- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/jupyter_server/services/kernels/connection/channels.py b/jupyter_server/services/kernels/connection/channels.py index bde7f2fc9..b11af432b 100644 --- a/jupyter_server/services/kernels/connection/channels.py +++ b/jupyter_server/services/kernels/connection/channels.py @@ -15,6 +15,7 @@ from tornado.ioloop import IOLoop from tornado.websocket import WebSocketClosedError from traitlets import Any, Bool, Dict, Float, Instance, Int, List, Unicode, default +from packaging.version import Version try: from jupyter_client.jsonutil import json_default @@ -120,6 +121,12 @@ def _default_close_future(self): """The default close future.""" return Future() + kernel_protocol_version = Instance(klass=Version) + + @default("kernel_protocol_version") + def _default_kernel_protocol_version(self): + return Version("0.0") + session_key = Unicode("") _iopub_window_msg_count = Int() @@ -155,6 +162,43 @@ def create_stream(self): self.channels[channel] = stream = meth(identity=identity) stream.channel = channel + def wait_for_iopub_welcome(self): + """Waits for an iopub_welcome message from the kernel's IOPub channel. + Used for kernels supporting protocol version >=5.4 that send explicit + welcome messages instead of requiring the nudge() handshake. Returns a + Future that resolves with the message type when a welcome is received, + or an empty dict on parsing errors.""" + + # The IOPub channel used by the client, where a welcome is expected. + iopub_channel = self.channels["iopub"] + iopub_future: Future[t.Any] = Future() + + def on_iopub(msg): + """Handle iopub replies.""" + _idents, msg = self.session.feed_identities(msg) + + try: + msg = self.session.deserialize(msg) + except BaseException: + self.log.error("Bad iopub reply", exc_info=True) + iopub_future.set_result({}) + return + else: + self.log.debug("Received iopub message: %s", msg) + if msg["msg_type"] == "iopub_welcome": + iopub_future.set_result(msg["msg_type"]) + + iopub_channel.on_recv(on_iopub) + + def cleanup(_=None): + """Common cleanup""" + iopub_channel.stop_on_recv() + + iopub_future.add_done_callback(cleanup) + + return _ensure_future(iopub_future) + + def nudge(self): """Nudge the zmq connections with kernel_info_requests Returns a Future that will resolve when we have received @@ -336,6 +380,10 @@ def connect(self): """Handle a connection.""" self.multi_kernel_manager.notify_connect(self.kernel_id) + # Check if the kernel protocol supports XPUB; if so, a welcome message + # is to be expected. + wait_for_welcome = self.kernel_protocol_version >= Version("5.4") + # on new connections, flush the message buffer buffer_info = self.multi_kernel_manager.get_buffer(self.kernel_id, self.session_key) if buffer_info and buffer_info["session_key"] == self.session_key: @@ -348,7 +396,7 @@ def connect(self): # The kernel's ports have not changed; use the channels captured in the buffer self.channels = buffer_info["channels"] - connected = self.nudge() + connected = self.wait_for_iopub_welcome() if wait_for_welcome else self.nudge() def replay(value): replay_buffer = buffer_info["buffer"] @@ -362,7 +410,7 @@ def replay(value): else: try: self.create_stream() - connected = self.nudge() + connected = self.wait_for_iopub_welcome() if wait_for_welcome else self.nudge() except web.HTTPError as e: # Do not log error if the kernel is already shutdown, # as it's normal that it's not responding @@ -626,6 +674,8 @@ def _finish_kernel_info(self, info): and signal that connection can continue. """ protocol_version = info.get("protocol_version", client_protocol_version) + self.kernel_protocol_version = Version(protocol_version) + if protocol_version != client_protocol_version: self.session.adapt_version = int(protocol_version.split(".")[0]) self.log.info( From a70231e6921fc374ead493c058567bc23c620e2e Mon Sep 17 00:00:00 2001 From: Isabel Paredes Date: Mon, 15 Dec 2025 17:23:19 +0100 Subject: [PATCH 2/2] Clean up --- jupyter_server/services/kernels/connection/channels.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/jupyter_server/services/kernels/connection/channels.py b/jupyter_server/services/kernels/connection/channels.py index b11af432b..5287f75f7 100644 --- a/jupyter_server/services/kernels/connection/channels.py +++ b/jupyter_server/services/kernels/connection/channels.py @@ -11,11 +11,11 @@ from textwrap import dedent from jupyter_client import protocol_version as client_protocol_version # type:ignore[attr-defined] +from packaging.version import Version from tornado import gen, web from tornado.ioloop import IOLoop from tornado.websocket import WebSocketClosedError from traitlets import Any, Bool, Dict, Float, Instance, Int, List, Unicode, default -from packaging.version import Version try: from jupyter_client.jsonutil import json_default @@ -198,7 +198,6 @@ def cleanup(_=None): return _ensure_future(iopub_future) - def nudge(self): """Nudge the zmq connections with kernel_info_requests Returns a Future that will resolve when we have received