Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,9 @@ def init_kernel(self):
"""Create the Kernel object itself"""
if self.shell_channel_thread:
shell_stream = ZMQStream(self.shell_socket, self.shell_channel_thread.io_loop)
# Hand the stream to the shell-channel thread so SubshellManager can re-arm
# the read after each out-of-band reply send (the wedge fix).
self.shell_channel_thread.shell_stream = shell_stream
else:
shell_stream = ZMQStream(self.shell_socket)
control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop)
Expand Down
5 changes: 5 additions & 0 deletions ipykernel/shellchannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def __init__(
self._manager: SubshellManager | None = None
self._zmq_context = context # Avoid use of self._context
self._shell_socket = shell_socket
# Set by kernelapp.init_kernel after it builds the shell ZMQStream (this thread
# is created before the stream). Threaded into SubshellManager so it can re-arm
# the read after each out-of-band reply send (the wedge fix).
self.shell_stream = None
# Record the parent thread - the thread that started the app (usually the main thread)
self.parent_thread = current_thread()

Expand All @@ -43,6 +47,7 @@ def manager(self) -> SubshellManager:
self._zmq_context,
self.io_loop,
self._shell_socket,
self.shell_stream,
)
return self._manager

Expand Down
13 changes: 13 additions & 0 deletions ipykernel/subshell_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@ def __init__(
context: zmq.Context[t.Any],
shell_channel_io_loop: IOLoop,
shell_socket: zmq.Socket[t.Any],
shell_stream=None,
):
"""Initialize the subshell manager."""
self._parent_thread = current_thread()

self._context: zmq.Context[t.Any] = context
self._shell_channel_io_loop = shell_channel_io_loop
self._shell_socket = shell_socket
# ZMQStream that reads `shell_socket`. Replies are sent on this same socket
# out-of-band below, draining its edge-triggered ZMQ_FD read edge; re-arm the
# stream after each send so a concurrently-arrived request cannot strand unread.
self._shell_stream = shell_stream
self._cache: dict[str, SubshellThread] = {}
self._lock_cache = Lock() # Sync lock across threads when accessing cache.

Expand Down Expand Up @@ -226,6 +231,14 @@ def _process_control_request(
def _send_on_shell_channel(self, msg) -> None:
assert current_thread().name == SHELL_CHANNEL_THREAD_NAME
self._shell_socket.send_multipart(msg)
# Re-arm the shell ZMQStream read: this out-of-band send drained the ROUTER's
# edge-triggered ZMQ_FD read edge; reschedule the stream's handler so a
# concurrently-arrived request cannot strand unread (the wedge fix).
stream = self._shell_stream
if stream is not None and stream.socket is self._shell_socket:
self._shell_channel_io_loop.add_callback(
lambda: stream._handle_events(stream.socket, 0)
)

def _stop_subshell(self, subshell_thread: SubshellThread) -> None:
"""Stop a subshell thread and close all of its resources."""
Expand Down
Loading