##// END OF EJS Templates
Unregister comm if there is an error in publishing the comm open message
Jason Grout -
Show More
@@ -1,155 +1,158 b''
1 """Base class for a Comm"""
1 """Base class for a Comm"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import uuid
6 import uuid
7
7
8 from IPython.config import LoggingConfigurable
8 from IPython.config import LoggingConfigurable
9 from IPython.kernel.zmq.kernelbase import Kernel
9 from IPython.kernel.zmq.kernelbase import Kernel
10
10
11 from IPython.utils.jsonutil import json_clean
11 from IPython.utils.jsonutil import json_clean
12 from IPython.utils.traitlets import Instance, Unicode, Bytes, Bool, Dict, Any
12 from IPython.utils.traitlets import Instance, Unicode, Bytes, Bool, Dict, Any
13
13
14
14
15 class Comm(LoggingConfigurable):
15 class Comm(LoggingConfigurable):
16
16
17 # If this is instantiated by a non-IPython kernel, shell will be None
17 # If this is instantiated by a non-IPython kernel, shell will be None
18 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC',
18 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC',
19 allow_none=True)
19 allow_none=True)
20 kernel = Instance('IPython.kernel.zmq.kernelbase.Kernel')
20 kernel = Instance('IPython.kernel.zmq.kernelbase.Kernel')
21 def _kernel_default(self):
21 def _kernel_default(self):
22 if Kernel.initialized():
22 if Kernel.initialized():
23 return Kernel.instance()
23 return Kernel.instance()
24
24
25 iopub_socket = Any()
25 iopub_socket = Any()
26 def _iopub_socket_default(self):
26 def _iopub_socket_default(self):
27 return self.kernel.iopub_socket
27 return self.kernel.iopub_socket
28 session = Instance('IPython.kernel.zmq.session.Session')
28 session = Instance('IPython.kernel.zmq.session.Session')
29 def _session_default(self):
29 def _session_default(self):
30 if self.kernel is not None:
30 if self.kernel is not None:
31 return self.kernel.session
31 return self.kernel.session
32
32
33 target_name = Unicode('comm')
33 target_name = Unicode('comm')
34
34
35 topic = Bytes()
35 topic = Bytes()
36 def _topic_default(self):
36 def _topic_default(self):
37 return ('comm-%s' % self.comm_id).encode('ascii')
37 return ('comm-%s' % self.comm_id).encode('ascii')
38
38
39 _open_data = Dict(help="data dict, if any, to be included in comm_open")
39 _open_data = Dict(help="data dict, if any, to be included in comm_open")
40 _close_data = Dict(help="data dict, if any, to be included in comm_close")
40 _close_data = Dict(help="data dict, if any, to be included in comm_close")
41
41
42 _msg_callback = Any()
42 _msg_callback = Any()
43 _close_callback = Any()
43 _close_callback = Any()
44
44
45 _closed = Bool(True)
45 _closed = Bool(True)
46 comm_id = Unicode()
46 comm_id = Unicode()
47 def _comm_id_default(self):
47 def _comm_id_default(self):
48 return uuid.uuid4().hex
48 return uuid.uuid4().hex
49
49
50 primary = Bool(True, help="Am I the primary or secondary Comm?")
50 primary = Bool(True, help="Am I the primary or secondary Comm?")
51
51
52 def __init__(self, target_name='', data=None, **kwargs):
52 def __init__(self, target_name='', data=None, **kwargs):
53 if target_name:
53 if target_name:
54 kwargs['target_name'] = target_name
54 kwargs['target_name'] = target_name
55 super(Comm, self).__init__(**kwargs)
55 super(Comm, self).__init__(**kwargs)
56 if self.primary:
56 if self.primary:
57 # I am primary, open my peer.
57 # I am primary, open my peer.
58 self.open(data)
58 self.open(data)
59 else:
59 else:
60 self._closed = False
60 self._closed = False
61
61
62 def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
62 def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
63 """Helper for sending a comm message on IOPub"""
63 """Helper for sending a comm message on IOPub"""
64 data = {} if data is None else data
64 data = {} if data is None else data
65 metadata = {} if metadata is None else metadata
65 metadata = {} if metadata is None else metadata
66 content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
66 content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
67 self.session.send(self.iopub_socket, msg_type,
67 self.session.send(self.iopub_socket, msg_type,
68 content,
68 content,
69 metadata=json_clean(metadata),
69 metadata=json_clean(metadata),
70 parent=self.kernel._parent_header,
70 parent=self.kernel._parent_header,
71 ident=self.topic,
71 ident=self.topic,
72 buffers=buffers,
72 buffers=buffers,
73 )
73 )
74
74
75 def __del__(self):
75 def __del__(self):
76 """trigger close on gc"""
76 """trigger close on gc"""
77 self.close()
77 self.close()
78
78
79 # publishing messages
79 # publishing messages
80
80
81 def open(self, data=None, metadata=None, buffers=None):
81 def open(self, data=None, metadata=None, buffers=None):
82 """Open the frontend-side version of this comm"""
82 """Open the frontend-side version of this comm"""
83 if data is None:
83 if data is None:
84 data = self._open_data
84 data = self._open_data
85 comm_manager = getattr(self.kernel, 'comm_manager', None)
85 comm_manager = getattr(self.kernel, 'comm_manager', None)
86 if comm_manager is None:
86 if comm_manager is None:
87 raise RuntimeError("Comms cannot be opened without a kernel "
87 raise RuntimeError("Comms cannot be opened without a kernel "
88 "and a comm_manager attached to that kernel.")
88 "and a comm_manager attached to that kernel.")
89
89
90 comm_manager.register_comm(self)
90 comm_manager.register_comm(self)
91 self._publish_msg('comm_open',
91 try:
92 data=data, metadata=metadata, buffers=buffers,
92 self._publish_msg('comm_open',
93 target_name=self.target_name,
93 data=data, metadata=metadata, buffers=buffers,
94 )
94 target_name=self.target_name)
95 self._closed = False
95 self._closed = False
96 except:
97 comm_manager.unregister_comm(self)
98 raise
96
99
97 def close(self, data=None, metadata=None, buffers=None):
100 def close(self, data=None, metadata=None, buffers=None):
98 """Close the frontend-side version of this comm"""
101 """Close the frontend-side version of this comm"""
99 if self._closed:
102 if self._closed:
100 # only close once
103 # only close once
101 return
104 return
102 self._closed = True
105 self._closed = True
103 if data is None:
106 if data is None:
104 data = self._close_data
107 data = self._close_data
105 self._publish_msg('comm_close',
108 self._publish_msg('comm_close',
106 data=data, metadata=metadata, buffers=buffers,
109 data=data, metadata=metadata, buffers=buffers,
107 )
110 )
108 self.kernel.comm_manager.unregister_comm(self)
111 self.kernel.comm_manager.unregister_comm(self)
109
112
110 def send(self, data=None, metadata=None, buffers=None):
113 def send(self, data=None, metadata=None, buffers=None):
111 """Send a message to the frontend-side version of this comm"""
114 """Send a message to the frontend-side version of this comm"""
112 self._publish_msg('comm_msg',
115 self._publish_msg('comm_msg',
113 data=data, metadata=metadata, buffers=buffers,
116 data=data, metadata=metadata, buffers=buffers,
114 )
117 )
115
118
116 # registering callbacks
119 # registering callbacks
117
120
118 def on_close(self, callback):
121 def on_close(self, callback):
119 """Register a callback for comm_close
122 """Register a callback for comm_close
120
123
121 Will be called with the `data` of the close message.
124 Will be called with the `data` of the close message.
122
125
123 Call `on_close(None)` to disable an existing callback.
126 Call `on_close(None)` to disable an existing callback.
124 """
127 """
125 self._close_callback = callback
128 self._close_callback = callback
126
129
127 def on_msg(self, callback):
130 def on_msg(self, callback):
128 """Register a callback for comm_msg
131 """Register a callback for comm_msg
129
132
130 Will be called with the `data` of any comm_msg messages.
133 Will be called with the `data` of any comm_msg messages.
131
134
132 Call `on_msg(None)` to disable an existing callback.
135 Call `on_msg(None)` to disable an existing callback.
133 """
136 """
134 self._msg_callback = callback
137 self._msg_callback = callback
135
138
136 # handling of incoming messages
139 # handling of incoming messages
137
140
138 def handle_close(self, msg):
141 def handle_close(self, msg):
139 """Handle a comm_close message"""
142 """Handle a comm_close message"""
140 self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
143 self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
141 if self._close_callback:
144 if self._close_callback:
142 self._close_callback(msg)
145 self._close_callback(msg)
143
146
144 def handle_msg(self, msg):
147 def handle_msg(self, msg):
145 """Handle a comm_msg message"""
148 """Handle a comm_msg message"""
146 self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
149 self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
147 if self._msg_callback:
150 if self._msg_callback:
148 if self.shell:
151 if self.shell:
149 self.shell.events.trigger('pre_execute')
152 self.shell.events.trigger('pre_execute')
150 self._msg_callback(msg)
153 self._msg_callback(msg)
151 if self.shell:
154 if self.shell:
152 self.shell.events.trigger('post_execute')
155 self.shell.events.trigger('post_execute')
153
156
154
157
155 __all__ = ['Comm']
158 __all__ = ['Comm']
General Comments 0
You need to be logged in to leave comments. Login now