##// END OF EJS Templates
make Comm's publishing threadsafe...
Min RK -
Show More
@@ -1,162 +1,169 b''
1 """Base class for a Comm"""
1 """Base class for a Comm"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import threading
6 import uuid
7 import uuid
7
8
9 from zmq.eventloop.ioloop import IOLoop
10
8 from IPython.config import LoggingConfigurable
11 from IPython.config import LoggingConfigurable
9 from IPython.kernel.zmq.kernelbase import Kernel
12 from IPython.kernel.zmq.kernelbase import Kernel
10
13
11 from IPython.utils.jsonutil import json_clean
14 from IPython.utils.jsonutil import json_clean
12 from IPython.utils.traitlets import Instance, Unicode, Bytes, Bool, Dict, Any
15 from IPython.utils.traitlets import Instance, Unicode, Bytes, Bool, Dict, Any
13
16
14
17
15 class Comm(LoggingConfigurable):
18 class Comm(LoggingConfigurable):
16
19 """Class for communicating between a Frontend and a Kernel"""
17 # If this is instantiated by a non-IPython kernel, shell will be None
20 # If this is instantiated by a non-IPython kernel, shell will be None
18 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC',
21 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC',
19 allow_none=True)
22 allow_none=True)
20 kernel = Instance('IPython.kernel.zmq.kernelbase.Kernel')
23 kernel = Instance('IPython.kernel.zmq.kernelbase.Kernel')
21 def _kernel_default(self):
24 def _kernel_default(self):
22 if Kernel.initialized():
25 if Kernel.initialized():
23 return Kernel.instance()
26 return Kernel.instance()
24
27
25 iopub_socket = Any()
28 iopub_socket = Any()
26 def _iopub_socket_default(self):
29 def _iopub_socket_default(self):
27 return self.kernel.iopub_socket
30 return self.kernel.iopub_socket
28 session = Instance('IPython.kernel.zmq.session.Session')
31 session = Instance('IPython.kernel.zmq.session.Session')
29 def _session_default(self):
32 def _session_default(self):
30 if self.kernel is not None:
33 if self.kernel is not None:
31 return self.kernel.session
34 return self.kernel.session
32
35
33 target_name = Unicode('comm')
36 target_name = Unicode('comm')
34 target_module = Unicode(None, allow_none=True, help="""requirejs module from
37 target_module = Unicode(None, allow_none=True, help="""requirejs module from
35 which to load comm target.""")
38 which to load comm target.""")
36
39
37 topic = Bytes()
40 topic = Bytes()
38 def _topic_default(self):
41 def _topic_default(self):
39 return ('comm-%s' % self.comm_id).encode('ascii')
42 return ('comm-%s' % self.comm_id).encode('ascii')
40
43
41 _open_data = Dict(help="data dict, if any, to be included in comm_open")
44 _open_data = Dict(help="data dict, if any, to be included in comm_open")
42 _close_data = Dict(help="data dict, if any, to be included in comm_close")
45 _close_data = Dict(help="data dict, if any, to be included in comm_close")
43
46
44 _msg_callback = Any()
47 _msg_callback = Any()
45 _close_callback = Any()
48 _close_callback = Any()
46
49
47 _closed = Bool(True)
50 _closed = Bool(True)
48 comm_id = Unicode()
51 comm_id = Unicode()
49 def _comm_id_default(self):
52 def _comm_id_default(self):
50 return uuid.uuid4().hex
53 return uuid.uuid4().hex
51
54
52 primary = Bool(True, help="Am I the primary or secondary Comm?")
55 primary = Bool(True, help="Am I the primary or secondary Comm?")
53
56
54 def __init__(self, target_name='', data=None, **kwargs):
57 def __init__(self, target_name='', data=None, **kwargs):
55 if target_name:
58 if target_name:
56 kwargs['target_name'] = target_name
59 kwargs['target_name'] = target_name
57 super(Comm, self).__init__(**kwargs)
60 super(Comm, self).__init__(**kwargs)
58 if self.primary:
61 if self.primary:
59 # I am primary, open my peer.
62 # I am primary, open my peer.
60 self.open(data)
63 self.open(data)
61 else:
64 else:
62 self._closed = False
65 self._closed = False
63
66
64 def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
67 def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
65 """Helper for sending a comm message on IOPub"""
68 """Helper for sending a comm message on IOPub"""
69 if threading.current_thread().name != 'MainThread' and IOLoop.initialized():
70 # make sure we never send on a zmq socket outside the main IOLoop thread
71 IOLoop.instance().add_callback(lambda : self._publish_msg(msg_type, data, metadata, buffers, **keys))
72 return
66 data = {} if data is None else data
73 data = {} if data is None else data
67 metadata = {} if metadata is None else metadata
74 metadata = {} if metadata is None else metadata
68 content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
75 content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
69 self.session.send(self.iopub_socket, msg_type,
76 self.session.send(self.iopub_socket, msg_type,
70 content,
77 content,
71 metadata=json_clean(metadata),
78 metadata=json_clean(metadata),
72 parent=self.kernel._parent_header,
79 parent=self.kernel._parent_header,
73 ident=self.topic,
80 ident=self.topic,
74 buffers=buffers,
81 buffers=buffers,
75 )
82 )
76
83
77 def __del__(self):
84 def __del__(self):
78 """trigger close on gc"""
85 """trigger close on gc"""
79 self.close()
86 self.close()
80
87
81 # publishing messages
88 # publishing messages
82
89
83 def open(self, data=None, metadata=None, buffers=None):
90 def open(self, data=None, metadata=None, buffers=None):
84 """Open the frontend-side version of this comm"""
91 """Open the frontend-side version of this comm"""
85 if data is None:
92 if data is None:
86 data = self._open_data
93 data = self._open_data
87 comm_manager = getattr(self.kernel, 'comm_manager', None)
94 comm_manager = getattr(self.kernel, 'comm_manager', None)
88 if comm_manager is None:
95 if comm_manager is None:
89 raise RuntimeError("Comms cannot be opened without a kernel "
96 raise RuntimeError("Comms cannot be opened without a kernel "
90 "and a comm_manager attached to that kernel.")
97 "and a comm_manager attached to that kernel.")
91
98
92 comm_manager.register_comm(self)
99 comm_manager.register_comm(self)
93 try:
100 try:
94 self._publish_msg('comm_open',
101 self._publish_msg('comm_open',
95 data=data, metadata=metadata, buffers=buffers,
102 data=data, metadata=metadata, buffers=buffers,
96 target_name=self.target_name,
103 target_name=self.target_name,
97 target_module=self.target_module,
104 target_module=self.target_module,
98 )
105 )
99 self._closed = False
106 self._closed = False
100 except:
107 except:
101 comm_manager.unregister_comm(self)
108 comm_manager.unregister_comm(self)
102 raise
109 raise
103
110
104 def close(self, data=None, metadata=None, buffers=None):
111 def close(self, data=None, metadata=None, buffers=None):
105 """Close the frontend-side version of this comm"""
112 """Close the frontend-side version of this comm"""
106 if self._closed:
113 if self._closed:
107 # only close once
114 # only close once
108 return
115 return
109 self._closed = True
116 self._closed = True
110 if data is None:
117 if data is None:
111 data = self._close_data
118 data = self._close_data
112 self._publish_msg('comm_close',
119 self._publish_msg('comm_close',
113 data=data, metadata=metadata, buffers=buffers,
120 data=data, metadata=metadata, buffers=buffers,
114 )
121 )
115 self.kernel.comm_manager.unregister_comm(self)
122 self.kernel.comm_manager.unregister_comm(self)
116
123
117 def send(self, data=None, metadata=None, buffers=None):
124 def send(self, data=None, metadata=None, buffers=None):
118 """Send a message to the frontend-side version of this comm"""
125 """Send a message to the frontend-side version of this comm"""
119 self._publish_msg('comm_msg',
126 self._publish_msg('comm_msg',
120 data=data, metadata=metadata, buffers=buffers,
127 data=data, metadata=metadata, buffers=buffers,
121 )
128 )
122
129
123 # registering callbacks
130 # registering callbacks
124
131
125 def on_close(self, callback):
132 def on_close(self, callback):
126 """Register a callback for comm_close
133 """Register a callback for comm_close
127
134
128 Will be called with the `data` of the close message.
135 Will be called with the `data` of the close message.
129
136
130 Call `on_close(None)` to disable an existing callback.
137 Call `on_close(None)` to disable an existing callback.
131 """
138 """
132 self._close_callback = callback
139 self._close_callback = callback
133
140
134 def on_msg(self, callback):
141 def on_msg(self, callback):
135 """Register a callback for comm_msg
142 """Register a callback for comm_msg
136
143
137 Will be called with the `data` of any comm_msg messages.
144 Will be called with the `data` of any comm_msg messages.
138
145
139 Call `on_msg(None)` to disable an existing callback.
146 Call `on_msg(None)` to disable an existing callback.
140 """
147 """
141 self._msg_callback = callback
148 self._msg_callback = callback
142
149
143 # handling of incoming messages
150 # handling of incoming messages
144
151
145 def handle_close(self, msg):
152 def handle_close(self, msg):
146 """Handle a comm_close message"""
153 """Handle a comm_close message"""
147 self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
154 self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
148 if self._close_callback:
155 if self._close_callback:
149 self._close_callback(msg)
156 self._close_callback(msg)
150
157
151 def handle_msg(self, msg):
158 def handle_msg(self, msg):
152 """Handle a comm_msg message"""
159 """Handle a comm_msg message"""
153 self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
160 self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
154 if self._msg_callback:
161 if self._msg_callback:
155 if self.shell:
162 if self.shell:
156 self.shell.events.trigger('pre_execute')
163 self.shell.events.trigger('pre_execute')
157 self._msg_callback(msg)
164 self._msg_callback(msg)
158 if self.shell:
165 if self.shell:
159 self.shell.events.trigger('post_execute')
166 self.shell.events.trigger('post_execute')
160
167
161
168
162 __all__ = ['Comm']
169 __all__ = ['Comm']
General Comments 0
You need to be logged in to leave comments. Login now