##// END OF EJS Templates
deleting a comm now unregisters the comm. unrehistering the comm does not close it anymore
sylvain.corlay -
Show More
@@ -1,133 +1,134 b''
1 """Base class for a Comm"""
1 """Base class for a Comm"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import uuid
6 import uuid
7
7
8 from IPython.config import LoggingConfigurable
8 from IPython.config import LoggingConfigurable
9 from IPython.core.getipython import get_ipython
9 from IPython.core.getipython import get_ipython
10
10
11 from IPython.utils.jsonutil import json_clean
11 from IPython.utils.jsonutil import json_clean
12 from IPython.utils.traitlets import Instance, Unicode, Bytes, Bool, Dict, Any
12 from IPython.utils.traitlets import Instance, Unicode, Bytes, Bool, Dict, Any
13
13
14
14
15 class Comm(LoggingConfigurable):
15 class Comm(LoggingConfigurable):
16
16
17 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
17 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
18 def _shell_default(self):
18 def _shell_default(self):
19 return get_ipython()
19 return get_ipython()
20
20
21 iopub_socket = Any()
21 iopub_socket = Any()
22 def _iopub_socket_default(self):
22 def _iopub_socket_default(self):
23 return self.shell.kernel.iopub_socket
23 return self.shell.kernel.iopub_socket
24 session = Instance('IPython.kernel.zmq.session.Session')
24 session = Instance('IPython.kernel.zmq.session.Session')
25 def _session_default(self):
25 def _session_default(self):
26 if self.shell is None:
26 if self.shell is None:
27 return
27 return
28 return self.shell.kernel.session
28 return self.shell.kernel.session
29
29
30 target_name = Unicode('comm')
30 target_name = Unicode('comm')
31
31
32 topic = Bytes()
32 topic = Bytes()
33 def _topic_default(self):
33 def _topic_default(self):
34 return ('comm-%s' % self.comm_id).encode('ascii')
34 return ('comm-%s' % self.comm_id).encode('ascii')
35
35
36 _open_data = Dict(help="data dict, if any, to be included in comm_open")
36 _open_data = Dict(help="data dict, if any, to be included in comm_open")
37 _close_data = Dict(help="data dict, if any, to be included in comm_close")
37 _close_data = Dict(help="data dict, if any, to be included in comm_close")
38
38
39 _msg_callback = Any()
39 _msg_callback = Any()
40 _close_callback = Any()
40 _close_callback = Any()
41
41
42 _closed = Bool(False)
42 _closed = Bool(False)
43 comm_id = Unicode()
43 comm_id = Unicode()
44 def _comm_id_default(self):
44 def _comm_id_default(self):
45 return uuid.uuid4().hex
45 return uuid.uuid4().hex
46
46
47 primary = Bool(True, help="Am I the primary or secondary Comm?")
47 primary = Bool(True, help="Am I the primary or secondary Comm?")
48
48
49 def __init__(self, target_name='', data=None, **kwargs):
49 def __init__(self, target_name='', data=None, **kwargs):
50 if target_name:
50 if target_name:
51 kwargs['target_name'] = target_name
51 kwargs['target_name'] = target_name
52 super(Comm, self).__init__(**kwargs)
52 super(Comm, self).__init__(**kwargs)
53 get_ipython().comm_manager.register_comm(self)
53 get_ipython().comm_manager.register_comm(self)
54 if self.primary:
54 if self.primary:
55 # I am primary, open my peer.
55 # I am primary, open my peer.
56 self.open(data)
56 self.open(data)
57
57
58 def _publish_msg(self, msg_type, data=None, metadata=None, **keys):
58 def _publish_msg(self, msg_type, data=None, metadata=None, **keys):
59 """Helper for sending a comm message on IOPub"""
59 """Helper for sending a comm message on IOPub"""
60 data = {} if data is None else data
60 data = {} if data is None else data
61 metadata = {} if metadata is None else metadata
61 metadata = {} if metadata is None else metadata
62 content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
62 content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
63 self.session.send(self.iopub_socket, msg_type,
63 self.session.send(self.iopub_socket, msg_type,
64 content,
64 content,
65 metadata=json_clean(metadata),
65 metadata=json_clean(metadata),
66 parent=self.shell.get_parent(),
66 parent=self.shell.get_parent(),
67 ident=self.topic,
67 ident=self.topic,
68 )
68 )
69
69
70 def __del__(self):
70 def __del__(self):
71 """trigger close on gc"""
71 """trigger close on gc"""
72 self.close()
72 self.close()
73 get_ipython().comm_manager.unregister_comm(self)
73
74
74 # publishing messages
75 # publishing messages
75
76
76 def open(self, data=None, metadata=None):
77 def open(self, data=None, metadata=None):
77 """Open the frontend-side version of this comm"""
78 """Open the frontend-side version of this comm"""
78 if data is None:
79 if data is None:
79 data = self._open_data
80 data = self._open_data
80 self._publish_msg('comm_open', data, metadata, target_name=self.target_name)
81 self._publish_msg('comm_open', data, metadata, target_name=self.target_name)
81
82
82 def close(self, data=None, metadata=None):
83 def close(self, data=None, metadata=None):
83 """Close the frontend-side version of this comm"""
84 """Close the frontend-side version of this comm"""
84 if self._closed:
85 if self._closed:
85 # only close once
86 # only close once
86 return
87 return
87 if data is None:
88 if data is None:
88 data = self._close_data
89 data = self._close_data
89 self._publish_msg('comm_close', data, metadata)
90 self._publish_msg('comm_close', data, metadata)
90 self._closed = True
91 self._closed = True
91
92
92 def send(self, data=None, metadata=None):
93 def send(self, data=None, metadata=None):
93 """Send a message to the frontend-side version of this comm"""
94 """Send a message to the frontend-side version of this comm"""
94 self._publish_msg('comm_msg', data, metadata)
95 self._publish_msg('comm_msg', data, metadata)
95
96
96 # registering callbacks
97 # registering callbacks
97
98
98 def on_close(self, callback):
99 def on_close(self, callback):
99 """Register a callback for comm_close
100 """Register a callback for comm_close
100
101
101 Will be called with the `data` of the close message.
102 Will be called with the `data` of the close message.
102
103
103 Call `on_close(None)` to disable an existing callback.
104 Call `on_close(None)` to disable an existing callback.
104 """
105 """
105 self._close_callback = callback
106 self._close_callback = callback
106
107
107 def on_msg(self, callback):
108 def on_msg(self, callback):
108 """Register a callback for comm_msg
109 """Register a callback for comm_msg
109
110
110 Will be called with the `data` of any comm_msg messages.
111 Will be called with the `data` of any comm_msg messages.
111
112
112 Call `on_msg(None)` to disable an existing callback.
113 Call `on_msg(None)` to disable an existing callback.
113 """
114 """
114 self._msg_callback = callback
115 self._msg_callback = callback
115
116
116 # handling of incoming messages
117 # handling of incoming messages
117
118
118 def handle_close(self, msg):
119 def handle_close(self, msg):
119 """Handle a comm_close message"""
120 """Handle a comm_close message"""
120 self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
121 self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
121 if self._close_callback:
122 if self._close_callback:
122 self._close_callback(msg)
123 self._close_callback(msg)
123
124
124 def handle_msg(self, msg):
125 def handle_msg(self, msg):
125 """Handle a comm_msg message"""
126 """Handle a comm_msg message"""
126 self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
127 self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
127 if self._msg_callback:
128 if self._msg_callback:
128 self.shell.events.trigger('pre_execute')
129 self.shell.events.trigger('pre_execute')
129 self._msg_callback(msg)
130 self._msg_callback(msg)
130 self.shell.events.trigger('post_execute')
131 self.shell.events.trigger('post_execute')
131
132
132
133
133 __all__ = ['Comm']
134 __all__ = ['Comm']
@@ -1,151 +1,150 b''
1 """Base class to manage comms"""
1 """Base class to manage comms"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import sys
6 import sys
7
7
8 from IPython.config import LoggingConfigurable
8 from IPython.config import LoggingConfigurable
9 from IPython.core.prompts import LazyEvaluate
9 from IPython.core.prompts import LazyEvaluate
10 from IPython.core.getipython import get_ipython
10 from IPython.core.getipython import get_ipython
11
11
12 from IPython.utils.importstring import import_item
12 from IPython.utils.importstring import import_item
13 from IPython.utils.py3compat import string_types
13 from IPython.utils.py3compat import string_types
14 from IPython.utils.traitlets import Instance, Unicode, Dict, Any
14 from IPython.utils.traitlets import Instance, Unicode, Dict, Any
15
15
16 from .comm import Comm
16 from .comm import Comm
17
17
18
18
19 def lazy_keys(dikt):
19 def lazy_keys(dikt):
20 """Return lazy-evaluated string representation of a dictionary's keys
20 """Return lazy-evaluated string representation of a dictionary's keys
21
21
22 Key list is only constructed if it will actually be used.
22 Key list is only constructed if it will actually be used.
23 Used for debug-logging.
23 Used for debug-logging.
24 """
24 """
25 return LazyEvaluate(lambda d: list(d.keys()))
25 return LazyEvaluate(lambda d: list(d.keys()))
26
26
27
27
28 class CommManager(LoggingConfigurable):
28 class CommManager(LoggingConfigurable):
29 """Manager for Comms in the Kernel"""
29 """Manager for Comms in the Kernel"""
30
30
31 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
31 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
32 def _shell_default(self):
32 def _shell_default(self):
33 return get_ipython()
33 return get_ipython()
34 iopub_socket = Any()
34 iopub_socket = Any()
35 def _iopub_socket_default(self):
35 def _iopub_socket_default(self):
36 return self.shell.kernel.iopub_socket
36 return self.shell.kernel.iopub_socket
37 session = Instance('IPython.kernel.zmq.session.Session')
37 session = Instance('IPython.kernel.zmq.session.Session')
38 def _session_default(self):
38 def _session_default(self):
39 if self.shell is None:
39 if self.shell is None:
40 return
40 return
41 return self.shell.kernel.session
41 return self.shell.kernel.session
42
42
43 comms = Dict()
43 comms = Dict()
44 targets = Dict()
44 targets = Dict()
45
45
46 # Public APIs
46 # Public APIs
47
47
48 def register_target(self, target_name, f):
48 def register_target(self, target_name, f):
49 """Register a callable f for a given target name
49 """Register a callable f for a given target name
50
50
51 f will be called with two arguments when a comm_open message is received with `target`:
51 f will be called with two arguments when a comm_open message is received with `target`:
52
52
53 - the Comm instance
53 - the Comm instance
54 - the `comm_open` message itself.
54 - the `comm_open` message itself.
55
55
56 f can be a Python callable or an import string for one.
56 f can be a Python callable or an import string for one.
57 """
57 """
58 if isinstance(f, string_types):
58 if isinstance(f, string_types):
59 f = import_item(f)
59 f = import_item(f)
60
60
61 self.targets[target_name] = f
61 self.targets[target_name] = f
62
62
63 def unregister_target(self, target_name, f):
63 def unregister_target(self, target_name, f):
64 """Unregister a callable registered with register_target"""
64 """Unregister a callable registered with register_target"""
65 return self.targets.pop(target_name);
65 return self.targets.pop(target_name);
66
66
67 def register_comm(self, comm):
67 def register_comm(self, comm):
68 """Register a new comm"""
68 """Register a new comm"""
69 comm_id = comm.comm_id
69 comm_id = comm.comm_id
70 comm.shell = self.shell
70 comm.shell = self.shell
71 comm.iopub_socket = self.iopub_socket
71 comm.iopub_socket = self.iopub_socket
72 self.comms[comm_id] = comm
72 self.comms[comm_id] = comm
73 return comm_id
73 return comm_id
74
74
75 def unregister_comm(self, comm_id):
75 def unregister_comm(self, comm_id):
76 """Unregister a comm, and close its counterpart"""
76 """Unregister a comm, and close its counterpart"""
77 # unlike get_comm, this should raise a KeyError
77 # unlike get_comm, this should raise a KeyError
78 comm = self.comms.pop(comm_id)
78 comm = self.comms.pop(comm_id)
79 comm.close()
80
79
81 def get_comm(self, comm_id):
80 def get_comm(self, comm_id):
82 """Get a comm with a particular id
81 """Get a comm with a particular id
83
82
84 Returns the comm if found, otherwise None.
83 Returns the comm if found, otherwise None.
85
84
86 This will not raise an error,
85 This will not raise an error,
87 it will log messages if the comm cannot be found.
86 it will log messages if the comm cannot be found.
88 """
87 """
89 if comm_id not in self.comms:
88 if comm_id not in self.comms:
90 self.log.error("No such comm: %s", comm_id)
89 self.log.error("No such comm: %s", comm_id)
91 self.log.debug("Current comms: %s", lazy_keys(self.comms))
90 self.log.debug("Current comms: %s", lazy_keys(self.comms))
92 return
91 return
93 # call, because we store weakrefs
92 # call, because we store weakrefs
94 comm = self.comms[comm_id]
93 comm = self.comms[comm_id]
95 return comm
94 return comm
96
95
97 # Message handlers
96 # Message handlers
98 def comm_open(self, stream, ident, msg):
97 def comm_open(self, stream, ident, msg):
99 """Handler for comm_open messages"""
98 """Handler for comm_open messages"""
100 content = msg['content']
99 content = msg['content']
101 comm_id = content['comm_id']
100 comm_id = content['comm_id']
102 target_name = content['target_name']
101 target_name = content['target_name']
103 f = self.targets.get(target_name, None)
102 f = self.targets.get(target_name, None)
104 comm = Comm(comm_id=comm_id,
103 comm = Comm(comm_id=comm_id,
105 shell=self.shell,
104 shell=self.shell,
106 iopub_socket=self.iopub_socket,
105 iopub_socket=self.iopub_socket,
107 primary=False,
106 primary=False,
108 )
107 )
109 if f is None:
108 if f is None:
110 self.log.error("No such comm target registered: %s", target_name)
109 self.log.error("No such comm target registered: %s", target_name)
111 comm.close()
110 comm.close()
112 return
111 return
113 self.register_comm(comm)
112 self.register_comm(comm)
114 try:
113 try:
115 f(comm, msg)
114 f(comm, msg)
116 except Exception:
115 except Exception:
117 self.log.error("Exception opening comm with target: %s", target_name, exc_info=True)
116 self.log.error("Exception opening comm with target: %s", target_name, exc_info=True)
118 comm.close()
117 comm.close()
119 self.unregister_comm(comm_id)
118 self.unregister_comm(comm_id)
120
119
121 def comm_msg(self, stream, ident, msg):
120 def comm_msg(self, stream, ident, msg):
122 """Handler for comm_msg messages"""
121 """Handler for comm_msg messages"""
123 content = msg['content']
122 content = msg['content']
124 comm_id = content['comm_id']
123 comm_id = content['comm_id']
125 comm = self.get_comm(comm_id)
124 comm = self.get_comm(comm_id)
126 if comm is None:
125 if comm is None:
127 # no such comm
126 # no such comm
128 return
127 return
129 try:
128 try:
130 comm.handle_msg(msg)
129 comm.handle_msg(msg)
131 except Exception:
130 except Exception:
132 self.log.error("Exception in comm_msg for %s", comm_id, exc_info=True)
131 self.log.error("Exception in comm_msg for %s", comm_id, exc_info=True)
133
132
134 def comm_close(self, stream, ident, msg):
133 def comm_close(self, stream, ident, msg):
135 """Handler for comm_close messages"""
134 """Handler for comm_close messages"""
136 content = msg['content']
135 content = msg['content']
137 comm_id = content['comm_id']
136 comm_id = content['comm_id']
138 comm = self.get_comm(comm_id)
137 comm = self.get_comm(comm_id)
139 if comm is None:
138 if comm is None:
140 # no such comm
139 # no such comm
141 self.log.debug("No such comm to close: %s", comm_id)
140 self.log.debug("No such comm to close: %s", comm_id)
142 return
141 return
143 del self.comms[comm_id]
142 del self.comms[comm_id]
144
143
145 try:
144 try:
146 comm.handle_close(msg)
145 comm.handle_close(msg)
147 except Exception:
146 except Exception:
148 self.log.error("Exception handling comm_close for %s", comm_id, exc_info=True)
147 self.log.error("Exception handling comm_close for %s", comm_id, exc_info=True)
149
148
150
149
151 __all__ = ['CommManager']
150 __all__ = ['CommManager']
General Comments 0
You need to be logged in to leave comments. Login now