##// END OF EJS Templates
make Comm's publishing threadsafe...
Min RK -
Show More
@@ -1,162 +1,169 b''
1 1 """Base class for a Comm"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 import threading
6 7 import uuid
7 8
9 from zmq.eventloop.ioloop import IOLoop
10
8 11 from IPython.config import LoggingConfigurable
9 12 from IPython.kernel.zmq.kernelbase import Kernel
10 13
11 14 from IPython.utils.jsonutil import json_clean
12 15 from IPython.utils.traitlets import Instance, Unicode, Bytes, Bool, Dict, Any
13 16
14 17
15 18 class Comm(LoggingConfigurable):
16
19 """Class for communicating between a Frontend and a Kernel"""
17 20 # If this is instantiated by a non-IPython kernel, shell will be None
18 21 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC',
19 22 allow_none=True)
20 23 kernel = Instance('IPython.kernel.zmq.kernelbase.Kernel')
21 24 def _kernel_default(self):
22 25 if Kernel.initialized():
23 26 return Kernel.instance()
24 27
25 28 iopub_socket = Any()
26 29 def _iopub_socket_default(self):
27 30 return self.kernel.iopub_socket
28 31 session = Instance('IPython.kernel.zmq.session.Session')
29 32 def _session_default(self):
30 33 if self.kernel is not None:
31 34 return self.kernel.session
32 35
33 36 target_name = Unicode('comm')
34 37 target_module = Unicode(None, allow_none=True, help="""requirejs module from
35 38 which to load comm target.""")
36 39
37 40 topic = Bytes()
38 41 def _topic_default(self):
39 42 return ('comm-%s' % self.comm_id).encode('ascii')
40 43
41 44 _open_data = Dict(help="data dict, if any, to be included in comm_open")
42 45 _close_data = Dict(help="data dict, if any, to be included in comm_close")
43 46
44 47 _msg_callback = Any()
45 48 _close_callback = Any()
46 49
47 50 _closed = Bool(True)
48 51 comm_id = Unicode()
49 52 def _comm_id_default(self):
50 53 return uuid.uuid4().hex
51 54
52 55 primary = Bool(True, help="Am I the primary or secondary Comm?")
53 56
54 57 def __init__(self, target_name='', data=None, **kwargs):
55 58 if target_name:
56 59 kwargs['target_name'] = target_name
57 60 super(Comm, self).__init__(**kwargs)
58 61 if self.primary:
59 62 # I am primary, open my peer.
60 63 self.open(data)
61 64 else:
62 65 self._closed = False
63 66
64 67 def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
65 68 """Helper for sending a comm message on IOPub"""
69 if threading.current_thread().name != 'MainThread' and IOLoop.initialized():
70 # make sure we never send on a zmq socket outside the main IOLoop thread
71 IOLoop.instance().add_callback(lambda : self._publish_msg(msg_type, data, metadata, buffers, **keys))
72 return
66 73 data = {} if data is None else data
67 74 metadata = {} if metadata is None else metadata
68 75 content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
69 76 self.session.send(self.iopub_socket, msg_type,
70 77 content,
71 78 metadata=json_clean(metadata),
72 79 parent=self.kernel._parent_header,
73 80 ident=self.topic,
74 81 buffers=buffers,
75 82 )
76 83
77 84 def __del__(self):
78 85 """trigger close on gc"""
79 86 self.close()
80 87
81 88 # publishing messages
82 89
83 90 def open(self, data=None, metadata=None, buffers=None):
84 91 """Open the frontend-side version of this comm"""
85 92 if data is None:
86 93 data = self._open_data
87 94 comm_manager = getattr(self.kernel, 'comm_manager', None)
88 95 if comm_manager is None:
89 96 raise RuntimeError("Comms cannot be opened without a kernel "
90 97 "and a comm_manager attached to that kernel.")
91 98
92 99 comm_manager.register_comm(self)
93 100 try:
94 101 self._publish_msg('comm_open',
95 102 data=data, metadata=metadata, buffers=buffers,
96 103 target_name=self.target_name,
97 104 target_module=self.target_module,
98 105 )
99 106 self._closed = False
100 107 except:
101 108 comm_manager.unregister_comm(self)
102 109 raise
103 110
104 111 def close(self, data=None, metadata=None, buffers=None):
105 112 """Close the frontend-side version of this comm"""
106 113 if self._closed:
107 114 # only close once
108 115 return
109 116 self._closed = True
110 117 if data is None:
111 118 data = self._close_data
112 119 self._publish_msg('comm_close',
113 120 data=data, metadata=metadata, buffers=buffers,
114 121 )
115 122 self.kernel.comm_manager.unregister_comm(self)
116 123
117 124 def send(self, data=None, metadata=None, buffers=None):
118 125 """Send a message to the frontend-side version of this comm"""
119 126 self._publish_msg('comm_msg',
120 127 data=data, metadata=metadata, buffers=buffers,
121 128 )
122 129
123 130 # registering callbacks
124 131
125 132 def on_close(self, callback):
126 133 """Register a callback for comm_close
127 134
128 135 Will be called with the `data` of the close message.
129 136
130 137 Call `on_close(None)` to disable an existing callback.
131 138 """
132 139 self._close_callback = callback
133 140
134 141 def on_msg(self, callback):
135 142 """Register a callback for comm_msg
136 143
137 144 Will be called with the `data` of any comm_msg messages.
138 145
139 146 Call `on_msg(None)` to disable an existing callback.
140 147 """
141 148 self._msg_callback = callback
142 149
143 150 # handling of incoming messages
144 151
145 152 def handle_close(self, msg):
146 153 """Handle a comm_close message"""
147 154 self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
148 155 if self._close_callback:
149 156 self._close_callback(msg)
150 157
151 158 def handle_msg(self, msg):
152 159 """Handle a comm_msg message"""
153 160 self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
154 161 if self._msg_callback:
155 162 if self.shell:
156 163 self.shell.events.trigger('pre_execute')
157 164 self._msg_callback(msg)
158 165 if self.shell:
159 166 self.shell.events.trigger('post_execute')
160 167
161 168
162 169 __all__ = ['Comm']
General Comments 0
You need to be logged in to leave comments. Login now