Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ipykernel/inprocess/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ipykernel.jsonutil import json_clean
from ipykernel.zmqshell import ZMQInteractiveShell

from ..iostream import BackgroundSocket, IOPubThread, OutStream
from ..iostream import IOPubThread, OutStream
from .constants import INPROCESS_KEY
from .socket import DummySocket

Expand Down Expand Up @@ -60,11 +60,11 @@ def _default_iopub_thread(self):
thread.start()
return thread

iopub_socket: BackgroundSocket = Instance(BackgroundSocket) # type:ignore[assignment]
iopub_socket: IOPubThread = Instance(IOPubThread) # type:ignore[assignment]

@default("iopub_socket")
def _default_iopub_socket(self):
return self.iopub_thread.background_socket
return self.iopub_thread

stdin_socket = Instance(DummySocket, ())

Expand Down Expand Up @@ -133,9 +133,9 @@ def _redirected_io(self):

def _io_dispatch(self, change):
"""Called when a message is sent to the IO socket."""
assert self.iopub_socket.io_thread is not None
assert self.iopub_socket is not None
assert self.session is not None
_ident, msg = self.session.recv(self.iopub_socket.io_thread.socket, copy=False)
_ident, msg = self.session.recv(self.iopub_socket.socket, copy=False)
for frontend in self.frontends:
assert frontend is not None
frontend.iopub_channel.call_handlers(msg)
Expand Down
59 changes: 4 additions & 55 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ class IOPubThread:
"""An object for sending IOPub messages in a background thread

Prevents a blocking main thread from delaying output from threads.

IOPubThread(pub_socket).background_socket is a Socket-API-providing object
whose IO is always run in a thread.
"""

def __init__(self, socket, pipe=False, session=False):
Expand All @@ -61,7 +58,6 @@ def __init__(self, socket, pipe=False, session=False):
self.socket = socket
self.session = session
self._stopped = False
self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
self._pipe_flag = pipe
self.io_loop = IOLoop(make_current=False)
Expand Down Expand Up @@ -340,6 +336,10 @@ def schedule(self, f):
else:
f()

def send(self, msg, *args, **kwargs):
"""Send a message to the socket."""
return self.send_multipart([msg], *args, **kwargs)

def send_multipart(self, *args, **kwargs):
"""send_multipart schedules actual zmq send in my thread.

Expand Down Expand Up @@ -367,57 +367,6 @@ def _really_send(self, msg, *args, **kwargs):
ctx.term()


class BackgroundSocket:
"""Wrapper around IOPub thread that provides zmq send[_multipart]"""

io_thread = None

def __init__(self, io_thread):
"""Initialize the socket."""
self.io_thread = io_thread

def __getattr__(self, attr):
"""Wrap socket attr access for backward-compatibility"""
if attr.startswith("__") and attr.endswith("__"):
# don't wrap magic methods
super().__getattr__(attr) # type:ignore[misc]
assert self.io_thread is not None
if hasattr(self.io_thread.socket, attr):
warnings.warn(
f"Accessing zmq Socket attribute {attr} on BackgroundSocket"
f" is deprecated since ipykernel 4.3.0"
f" use .io_thread.socket.{attr}",
DeprecationWarning,
stacklevel=2,
)
return getattr(self.io_thread.socket, attr)
return super().__getattr__(attr) # type:ignore[misc]

def __setattr__(self, attr, value):
"""Set an attribute on the socket."""
if attr == "io_thread" or (attr.startswith("__") and attr.endswith("__")):
super().__setattr__(attr, value)
else:
warnings.warn(
f"Setting zmq Socket attribute {attr} on BackgroundSocket"
f" is deprecated since ipykernel 4.3.0"
f" use .io_thread.socket.{attr}",
DeprecationWarning,
stacklevel=2,
)
assert self.io_thread is not None
setattr(self.io_thread.socket, attr, value)

def send(self, msg, *args, **kwargs):
"""Send a message to the socket."""
return self.send_multipart([msg], *args, **kwargs)

def send_multipart(self, *args, **kwargs):
"""Schedule send in IO thread"""
assert self.io_thread is not None
return self.io_thread.send_multipart(*args, **kwargs)


class OutStream(TextIOBase):
"""A file like object that publishes the stream to a 0MQ PUB socket.

Expand Down
3 changes: 1 addition & 2 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,7 @@ def init_iopub(self, context):
self.configure_tornado_logger()
self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True, session=self.session)
self.iopub_thread.start()
# backward-compat: wrap iopub socket API in background thread
self.iopub_socket = self.iopub_thread.background_socket
self.iopub_socket = self.iopub_thread

def init_heartbeat(self):
"""start the heart beating"""
Expand Down
13 changes: 3 additions & 10 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import zmq
from jupyter_client.session import Session

from ipykernel.iostream import MASTER, BackgroundSocket, IOPubThread, OutStream
from ipykernel.iostream import MASTER, IOPubThread, OutStream


@pytest.fixture()
Expand Down Expand Up @@ -81,15 +81,8 @@ def test_io_thread(iopub_thread):
thread._really_send(None)


def test_background_socket(iopub_thread):
sock = BackgroundSocket(iopub_thread)
assert sock.__class__ == BackgroundSocket
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
sock.linger = 101
assert iopub_thread.socket.linger == 101
assert sock.io_thread == iopub_thread
sock.send(b"hi")
def test_iopub_thread_send(iopub_thread):
iopub_thread.send(b"hi")


def test_outstream(iopub_thread):
Expand Down
Loading