manager.py
185 lines
| 5.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r13195 | """Base class to manage comms""" | ||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2013 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r13202 | import sys | ||
MinRK
|
r13195 | from IPython.config import LoggingConfigurable | ||
from IPython.core.prompts import LazyEvaluate | ||||
from IPython.core.getipython import get_ipython | ||||
from IPython.utils.importstring import import_item | ||||
from IPython.utils.traitlets import Instance, Unicode, Dict, Any | ||||
from .comm import Comm | ||||
#----------------------------------------------------------------------------- | ||||
# Code | ||||
#----------------------------------------------------------------------------- | ||||
def lazy_keys(dikt): | ||||
"""Return lazy-evaluated string representation of a dictionary's keys | ||||
Key list is only constructed if it will actually be used. | ||||
Used for debug-logging. | ||||
""" | ||||
return LazyEvaluate(lambda d: list(d.keys())) | ||||
MinRK
|
r13202 | def with_output(method): | ||
"""method decorator for ensuring output is handled properly in a message handler | ||||
- sets parent header before entering the method | ||||
MinRK
|
r13203 | - publishes busy/idle | ||
MinRK
|
r13202 | - flushes stdout/stderr after | ||
""" | ||||
def method_with_output(self, stream, ident, msg): | ||||
MinRK
|
r13203 | parent = msg['header'] | ||
self.shell.set_parent(parent) | ||||
MinRK
|
r13232 | self.shell.kernel._publish_status('busy', parent) | ||
MinRK
|
r13202 | try: | ||
return method(self, stream, ident, msg) | ||||
finally: | ||||
sys.stdout.flush() | ||||
sys.stderr.flush() | ||||
MinRK
|
r13232 | self.shell.kernel._publish_status('idle', parent) | ||
MinRK
|
r13202 | |||
return method_with_output | ||||
MinRK
|
r13195 | class CommManager(LoggingConfigurable): | ||
"""Manager for Comms in the Kernel""" | ||||
shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') | ||||
def _shell_default(self): | ||||
return get_ipython() | ||||
iopub_socket = Any() | ||||
def _iopub_socket_default(self): | ||||
MinRK
|
r13199 | return self.shell.kernel.iopub_socket | ||
MinRK
|
r13195 | session = Instance('IPython.kernel.zmq.session.Session') | ||
def _session_default(self): | ||||
if self.shell is None: | ||||
return | ||||
MinRK
|
r13199 | return self.shell.kernel.session | ||
MinRK
|
r13195 | |||
comms = Dict() | ||||
targets = Dict() | ||||
# Public APIs | ||||
MinRK
|
r13204 | def register_target(self, target_name, f): | ||
"""Register a callable f for a given target name | ||||
MinRK
|
r13195 | |||
MinRK
|
r13227 | f will be called with two arguments when a comm_open message is received with `target`: | ||
- the Comm instance | ||||
- the `comm_open` message itself. | ||||
MinRK
|
r13195 | |||
f can be a Python callable or an import string for one. | ||||
""" | ||||
if isinstance(f, basestring): | ||||
f = import_item(f) | ||||
MinRK
|
r13204 | self.targets[target_name] = f | ||
MinRK
|
r13195 | |||
MinRK
|
r13226 | def unregister_target(self, target_name, f): | ||
"""Unregister a callable registered with register_target""" | ||||
return self.targets.pop(target_name); | ||||
MinRK
|
r13227 | |||
MinRK
|
r13195 | def register_comm(self, comm): | ||
"""Register a new comm""" | ||||
comm_id = comm.comm_id | ||||
comm.shell = self.shell | ||||
comm.iopub_socket = self.iopub_socket | ||||
self.comms[comm_id] = comm | ||||
return comm_id | ||||
def unregister_comm(self, comm_id): | ||||
"""Unregister a comm, and close its counterpart""" | ||||
# unlike get_comm, this should raise a KeyError | ||||
comm = self.comms.pop(comm_id) | ||||
comm.close() | ||||
def get_comm(self, comm_id): | ||||
"""Get a comm with a particular id | ||||
Returns the comm if found, otherwise None. | ||||
This will not raise an error, | ||||
it will log messages if the comm cannot be found. | ||||
""" | ||||
if comm_id not in self.comms: | ||||
self.log.error("No such comm: %s", comm_id) | ||||
self.log.debug("Current comms: %s", lazy_keys(self.comms)) | ||||
return | ||||
# call, because we store weakrefs | ||||
comm = self.comms[comm_id] | ||||
return comm | ||||
# Message handlers | ||||
MinRK
|
r13202 | @with_output | ||
MinRK
|
r13195 | def comm_open(self, stream, ident, msg): | ||
"""Handler for comm_open messages""" | ||||
content = msg['content'] | ||||
comm_id = content['comm_id'] | ||||
MinRK
|
r13204 | target_name = content['target_name'] | ||
f = self.targets.get(target_name, None) | ||||
MinRK
|
r13195 | comm = Comm(comm_id=comm_id, | ||
shell=self.shell, | ||||
iopub_socket=self.iopub_socket, | ||||
primary=False, | ||||
) | ||||
MinRK
|
r13204 | if f is None: | ||
self.log.error("No such comm target registered: %s", target_name) | ||||
MinRK
|
r13196 | comm.close() | ||
MinRK
|
r13195 | return | ||
self.register_comm(comm) | ||||
MinRK
|
r13227 | try: | ||
f(comm, msg) | ||||
except Exception: | ||||
self.log.error("Exception opening comm with target: %s", target_name, exc_info=True) | ||||
comm.close() | ||||
self.unregister_comm(comm_id) | ||||
MinRK
|
r13195 | |||
MinRK
|
r13202 | @with_output | ||
MinRK
|
r13195 | def comm_msg(self, stream, ident, msg): | ||
"""Handler for comm_msg messages""" | ||||
content = msg['content'] | ||||
comm_id = content['comm_id'] | ||||
comm = self.get_comm(comm_id) | ||||
if comm is None: | ||||
# no such comm | ||||
return | ||||
MinRK
|
r13227 | try: | ||
comm.handle_msg(msg) | ||||
except Exception: | ||||
self.log.error("Exception in comm_msg for %s", comm_id, exc_info=True) | ||||
MinRK
|
r13195 | |||
MinRK
|
r13202 | @with_output | ||
MinRK
|
r13195 | def comm_close(self, stream, ident, msg): | ||
"""Handler for comm_close messages""" | ||||
content = msg['content'] | ||||
comm_id = content['comm_id'] | ||||
comm = self.get_comm(comm_id) | ||||
if comm is None: | ||||
# no such comm | ||||
MinRK
|
r13227 | self.log.debug("No such comm to close: %s", comm_id) | ||
MinRK
|
r13195 | return | ||
del self.comms[comm_id] | ||||
MinRK
|
r13227 | |||
try: | ||||
comm.handle_close(msg) | ||||
except Exception: | ||||
self.log.error("Exception handling comm_close for %s", comm_id, exc_info=True) | ||||
MinRK
|
r13195 | |||
__all__ = ['CommManager'] | ||||