##// END OF EJS Templates
Fixed high CPU usage of XREQ channel....
Brian Granger -
Show More
@@ -1,144 +1,147
1 1 """ Defines a KernelManager that provides signals and slots.
2 2 """
3 3
4 4 # System library imports.
5 5 from PyQt4 import QtCore
6 import zmq
6 7
7 8 # IPython imports.
8 9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
9 10 XReqSocketChannel, RepSocketChannel
10 11 from util import MetaQObjectHasTraits
11 12
12 13
14
13 15 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
14 16
15 17 # Emitted when any message is received.
16 18 message_received = QtCore.pyqtSignal(object)
17 19
18 20 # Emitted when a message of type 'pyout' or 'stdout' is received.
19 21 output_received = QtCore.pyqtSignal(object)
20 22
21 23 # Emitted when a message of type 'pyerr' or 'stderr' is received.
22 24 error_received = QtCore.pyqtSignal(object)
23 25
24 26 #---------------------------------------------------------------------------
25 27 # 'object' interface
26 28 #---------------------------------------------------------------------------
27 29
28 30 def __init__(self, *args, **kw):
29 31 """ Reimplemented to ensure that QtCore.QObject is initialized first.
30 32 """
31 33 QtCore.QObject.__init__(self)
32 34 SubSocketChannel.__init__(self, *args, **kw)
33 35
34 36 #---------------------------------------------------------------------------
35 37 # 'SubSocketChannel' interface
36 38 #---------------------------------------------------------------------------
37 39
38 40 def call_handlers(self, msg):
39 41 """ Reimplemented to emit signals instead of making callbacks.
40 42 """
41 43 # Emit the generic signal.
42 44 self.message_received.emit(msg)
43 45
44 46 # Emit signals for specialized message types.
45 47 msg_type = msg['msg_type']
46 48 if msg_type in ('pyout', 'stdout'):
47 49 self.output_received.emit(msg)
48 50 elif msg_type in ('pyerr', 'stderr'):
49 51 self.error_received.emit(msg)
50 52
51 53 def flush(self):
52 54 """ Reimplemented to ensure that signals are dispatched immediately.
53 55 """
54 56 super(QtSubSocketChannel, self).flush()
55 57 QtCore.QCoreApplication.instance().processEvents()
56 58
57 59
58 60 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
59 61
60 62 # Emitted when any message is received.
61 63 message_received = QtCore.pyqtSignal(object)
62 64
63 65 # Emitted when a reply has been received for the corresponding request type.
64 66 execute_reply = QtCore.pyqtSignal(object)
65 67 complete_reply = QtCore.pyqtSignal(object)
66 68 object_info_reply = QtCore.pyqtSignal(object)
67 69
68 70 #---------------------------------------------------------------------------
69 71 # 'object' interface
70 72 #---------------------------------------------------------------------------
71 73
72 74 def __init__(self, *args, **kw):
73 75 """ Reimplemented to ensure that QtCore.QObject is initialized first.
74 76 """
75 77 QtCore.QObject.__init__(self)
76 78 XReqSocketChannel.__init__(self, *args, **kw)
77 79
78 80 #---------------------------------------------------------------------------
79 81 # 'XReqSocketChannel' interface
80 82 #---------------------------------------------------------------------------
81 83
82 84 def call_handlers(self, msg):
83 85 """ Reimplemented to emit signals instead of making callbacks.
84 86 """
85 87 # Emit the generic signal.
86 88 self.message_received.emit(msg)
87 89
88 90 # Emit signals for specialized message types.
89 91 msg_type = msg['msg_type']
90 92 signal = getattr(self, msg_type, None)
91 93 if signal:
92 94 signal.emit(msg)
93 95
94 96 def _queue_request(self, msg, callback):
95 97 """ Reimplemented to skip callback handling.
96 98 """
97 99 self.command_queue.put(msg)
100 self.add_io_state(zmq.POLLOUT)
98 101
99 102
100 103 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
101 104
102 105 #---------------------------------------------------------------------------
103 106 # 'object' interface
104 107 #---------------------------------------------------------------------------
105 108
106 109 def __init__(self, *args, **kw):
107 110 """ Reimplemented to ensure that QtCore.QObject is initialized first.
108 111 """
109 112 QtCore.QObject.__init__(self)
110 113 RepSocketChannel.__init__(self, *args, **kw)
111 114
112 115
113 116 class QtKernelManager(KernelManager, QtCore.QObject):
114 117 """ A KernelManager that provides signals and slots.
115 118 """
116 119
117 120 __metaclass__ = MetaQObjectHasTraits
118 121
119 122 # Emitted when the kernel manager has started listening.
120 123 started_listening = QtCore.pyqtSignal()
121 124
122 125 # Emitted when the kernel manager has stopped listening.
123 126 stopped_listening = QtCore.pyqtSignal()
124 127
125 128 # Use Qt-specific channel classes that emit signals.
126 129 sub_channel_class = QtSubSocketChannel
127 130 xreq_channel_class = QtXReqSocketChannel
128 131 rep_channel_class = QtRepSocketChannel
129 132
130 133 #---------------------------------------------------------------------------
131 134 # 'KernelManager' interface
132 135 #---------------------------------------------------------------------------
133 136
134 137 def start_listening(self):
135 138 """ Reimplemented to emit signal.
136 139 """
137 140 super(QtKernelManager, self).start_listening()
138 141 self.started_listening.emit()
139 142
140 143 def stop_listening(self):
141 144 """ Reimplemented to emit signal.
142 145 """
143 146 super(QtKernelManager, self).stop_listening()
144 147 self.stopped_listening.emit()
@@ -1,466 +1,497
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 7 # Standard library imports.
8 8 from Queue import Queue, Empty
9 9 from subprocess import Popen
10 10 from threading import Thread
11 11 import time
12 12 import traceback
13 13
14 14 # System library imports.
15 15 import zmq
16 16 from zmq import POLLIN, POLLOUT, POLLERR
17 17 from zmq.eventloop import ioloop
18 18
19 19 # Local imports.
20 20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 21 Type
22 22 from kernel import launch_kernel
23 23 from session import Session
24 24
25 25 # Constants.
26 26 LOCALHOST = '127.0.0.1'
27 27
28 28
29 29 class MissingHandlerError(Exception):
30 30 pass
31 31
32 32
33 33 class ZmqSocketChannel(Thread):
34 34 """ The base class for the channels that use ZMQ sockets.
35 35 """
36 36
37 context = None
38 session = None
39 socket = None
40 ioloop = None
41 iostate = None
42
37 43 def __init__(self, context, session, address=None):
38 44 super(ZmqSocketChannel, self).__init__()
39 45 self.daemon = True
40 46
41 47 self.context = context
42 48 self.session = session
43 49 self.address = address
44 self.socket = None
45 50
46 51 def stop(self):
47 52 """Stop the thread's activity. Returns when the thread terminates.
48 53
49 54 The thread will raise :class:`RuntimeError` if :method:`self.start`
50 55 is called again.
51 56 """
52 57 self.join()
53 58
54 59
55 60 def get_address(self):
56 61 """ Get the channel's address. By the default, a channel is on
57 62 localhost with no port specified (a negative port number).
58 63 """
59 64 return self._address
60 65
61 66 def set_adresss(self, address):
62 67 """ Set the channel's address. Should be a tuple of form:
63 68 (ip address [str], port [int]).
64 69 or None, in which case the address is reset to its default value.
65 70 """
66 71 # FIXME: Validate address.
67 72 if self.is_alive(): # This is Thread.is_alive
68 73 raise RuntimeError("Cannot set address on a running channel!")
69 74 else:
70 75 if address is None:
71 76 address = (LOCALHOST, 0)
72 77 self._address = address
73 78
74 79 address = property(get_address, set_adresss)
75 80
81 def add_io_state(self, state):
82 """Add IO state to the eventloop.
83
84 This is thread safe as it uses the thread safe IOLoop.add_callback.
85 """
86 def add_io_state_callback():
87 if not self.iostate & state:
88 self.iostate = self.iostate | state
89 self.ioloop.update_handler(self.socket, self.iostate)
90 self.ioloop.add_callback(add_io_state_callback)
91
92 def drop_io_state(self, state):
93 """Drop IO state from the eventloop.
94
95 This is thread safe as it uses the thread safe IOLoop.add_callback.
96 """
97 def drop_io_state_callback():
98 if self.iostate & state:
99 self.iostate = self.iostate & (~state)
100 self.ioloop.update_handler(self.socket, self.iostate)
101 self.ioloop.add_callback(drop_io_state_callback)
102
76 103
77 104 class SubSocketChannel(ZmqSocketChannel):
78 105
79 106 def __init__(self, context, session, address=None):
80 107 super(SubSocketChannel, self).__init__(context, session, address)
81 108
82 109 def run(self):
83 110 self.socket = self.context.socket(zmq.SUB)
84 111 self.socket.setsockopt(zmq.SUBSCRIBE,'')
85 112 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
86 113 self.socket.connect('tcp://%s:%i' % self.address)
87 114 self.ioloop = ioloop.IOLoop()
115 self.iostate = POLLIN|POLLERR
88 116 self.ioloop.add_handler(self.socket, self._handle_events,
89 POLLIN|POLLERR)
117 self.iostate)
90 118 self.ioloop.start()
91 119
92 120 def stop(self):
93 121 self.ioloop.stop()
94 122 super(SubSocketChannel, self).stop()
95 123
96 124 def _handle_events(self, socket, events):
97 125 # Turn on and off POLLOUT depending on if we have made a request
98 126 if events & POLLERR:
99 127 self._handle_err()
100 128 if events & POLLIN:
101 129 self._handle_recv()
102 130
103 131 def _handle_err(self):
104 132 # We don't want to let this go silently, so eventually we should log.
105 133 raise zmq.ZMQError()
106 134
107 135 def _handle_recv(self):
108 136 # Get all of the messages we can
109 137 while True:
110 138 try:
111 139 msg = self.socket.recv_json(zmq.NOBLOCK)
112 140 except zmq.ZMQError:
113 141 # Check the errno?
114 142 # Will this tigger POLLERR?
115 143 break
116 144 else:
117 145 self.call_handlers(msg)
118 146
119 147 def call_handlers(self, msg):
120 148 """This method is called in the ioloop thread when a message arrives.
121 149
122 150 Subclasses should override this method to handle incoming messages.
123 151 It is important to remember that this method is called in the thread
124 152 so that some logic must be done to ensure that the application leve
125 153 handlers are called in the application thread.
126 154 """
127 155 raise NotImplementedError('call_handlers must be defined in a subclass.')
128 156
129 157 def flush(self, timeout=1.0):
130 158 """Immediately processes all pending messages on the SUB channel.
131 159
132 160 This method is thread safe.
133 161
134 162 Parameters
135 163 ----------
136 164 timeout : float, optional
137 165 The maximum amount of time to spend flushing, in seconds. The
138 166 default is one second.
139 167 """
140 168 # We do the IOLoop callback process twice to ensure that the IOLoop
141 169 # gets to perform at least one full poll.
142 170 stop_time = time.time() + timeout
143 171 for i in xrange(2):
144 172 self._flushed = False
145 173 self.ioloop.add_callback(self._flush)
146 174 while not self._flushed and time.time() < stop_time:
147 175 time.sleep(0.01)
148 176
149 177 def _flush(self):
150 178 """Called in this thread by the IOLoop to indicate that all events have
151 179 been processed.
152 180 """
153 181 self._flushed = True
154 182
155 183
156 184 class XReqSocketChannel(ZmqSocketChannel):
157 185
158 186 handler_queue = None
159 187 command_queue = None
160 188 handlers = None
161 189 _overriden_call_handler = None
162 190
163 191 def __init__(self, context, session, address=None):
164 192 self.handlers = {}
165 193 self.handler_queue = Queue()
166 194 self.command_queue = Queue()
167 195 super(XReqSocketChannel, self).__init__(context, session, address)
168 196
169 197 def run(self):
170 198 self.socket = self.context.socket(zmq.XREQ)
171 199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
172 200 self.socket.connect('tcp://%s:%i' % self.address)
173 201 self.ioloop = ioloop.IOLoop()
202 self.iostate = POLLERR|POLLIN
174 203 self.ioloop.add_handler(self.socket, self._handle_events,
175 POLLIN|POLLOUT|POLLERR)
204 self.iostate)
176 205 self.ioloop.start()
177 206
178 207 def stop(self):
179 208 self.ioloop.stop()
180 209 super(XReqSocketChannel, self).stop()
181 210
182 211 def _handle_events(self, socket, events):
183 # Turn on and off POLLOUT depending on if we have made a request
184 212 if events & POLLERR:
185 213 self._handle_err()
186 214 if events & POLLOUT:
187 215 self._handle_send()
188 216 if events & POLLIN:
189 217 self._handle_recv()
190 218
191 219 def _handle_recv(self):
192 220 msg = self.socket.recv_json()
193 221 self.call_handlers(msg)
194 222
195 223 def _handle_send(self):
196 224 try:
197 225 msg = self.command_queue.get(False)
198 226 except Empty:
199 227 pass
200 228 else:
201 229 self.socket.send_json(msg)
230 if self.command_queue.empty():
231 self.drop_io_state(POLLOUT)
202 232
203 233 def _handle_err(self):
204 234 # We don't want to let this go silently, so eventually we should log.
205 235 raise zmq.ZMQError()
206 236
207 237 def _queue_request(self, msg, callback):
208 238 handler = self._find_handler(msg['msg_type'], callback)
209 239 self.handler_queue.put(handler)
210 240 self.command_queue.put(msg)
241 self.add_io_state(POLLOUT)
211 242
212 243 def execute(self, code, callback=None):
213 244 # Create class for content/msg creation. Related to, but possibly
214 245 # not in Session.
215 246 content = dict(code=code)
216 247 msg = self.session.msg('execute_request', content)
217 248 self._queue_request(msg, callback)
218 249 return msg['header']['msg_id']
219 250
220 251 def complete(self, text, line, block=None, callback=None):
221 252 content = dict(text=text, line=line)
222 253 msg = self.session.msg('complete_request', content)
223 254 self._queue_request(msg, callback)
224 255 return msg['header']['msg_id']
225 256
226 257 def object_info(self, oname, callback=None):
227 258 content = dict(oname=oname)
228 259 msg = self.session.msg('object_info_request', content)
229 260 self._queue_request(msg, callback)
230 261 return msg['header']['msg_id']
231 262
232 263 def _find_handler(self, name, callback):
233 264 if callback is not None:
234 265 return callback
235 266 handler = self.handlers.get(name)
236 267 if handler is None:
237 268 raise MissingHandlerError(
238 269 'No handler defined for method: %s' % name)
239 270 return handler
240 271
241 272 def override_call_handler(self, func):
242 273 """Permanently override the call_handler.
243 274
244 275 The function func will be called as::
245 276
246 277 func(handler, msg)
247 278
248 279 And must call::
249 280
250 281 handler(msg)
251 282
252 283 in the main thread.
253 284 """
254 285 assert callable(func), "not a callable: %r" % func
255 286 self._overriden_call_handler = func
256 287
257 288 def call_handlers(self, msg):
258 289 try:
259 290 handler = self.handler_queue.get(False)
260 291 except Empty:
261 292 print "Message received with no handler!!!"
262 293 print msg
263 294 else:
264 295 self.call_handler(handler, msg)
265 296
266 297 def call_handler(self, handler, msg):
267 298 if self._overriden_call_handler is not None:
268 299 self._overriden_call_handler(handler, msg)
269 300 elif hasattr(self, '_call_handler'):
270 301 call_handler = getattr(self, '_call_handler')
271 302 call_handler(handler, msg)
272 303 else:
273 304 raise RuntimeError('no handler!')
274 305
275 306
276 307 class RepSocketChannel(ZmqSocketChannel):
277 308
278 309 def on_raw_input(self):
279 310 pass
280 311
281 312
282 313 class KernelManager(HasTraits):
283 314 """ Manages a kernel for a frontend.
284 315
285 316 The SUB channel is for the frontend to receive messages published by the
286 317 kernel.
287 318
288 319 The REQ channel is for the frontend to make requests of the kernel.
289 320
290 321 The REP channel is for the kernel to request stdin (raw_input) from the
291 322 frontend.
292 323 """
293 324
294 325 # Whether the kernel manager is currently listening on its channels.
295 326 is_listening = Bool(False)
296 327
297 328 # The PyZMQ Context to use for communication with the kernel.
298 329 context = Instance(zmq.Context, ())
299 330
300 331 # The Session to use for communication with the kernel.
301 332 session = Instance(Session, ())
302 333
303 334 # The classes to use for the various channels.
304 335 sub_channel_class = Type(SubSocketChannel)
305 336 xreq_channel_class = Type(XReqSocketChannel)
306 337 rep_channel_class = Type(RepSocketChannel)
307 338
308 339 # Protected traits.
309 340 _kernel = Instance(Popen)
310 341 _sub_channel = Any
311 342 _xreq_channel = Any
312 343 _rep_channel = Any
313 344
314 345 #--------------------------------------------------------------------------
315 346 # Channel management methods:
316 347 #--------------------------------------------------------------------------
317 348
318 349 def start_listening(self):
319 350 """Starts listening on the specified ports. If already listening, raises
320 351 a RuntimeError.
321 352 """
322 353 if self.is_listening:
323 354 raise RuntimeError("Cannot start listening. Already listening!")
324 355 else:
325 356 self.is_listening = True
326 357 self.sub_channel.start()
327 358 self.xreq_channel.start()
328 359 self.rep_channel.start()
329 360
330 361 @property
331 362 def is_alive(self):
332 363 """ Returns whether the kernel is alive. """
333 364 if self.is_listening:
334 365 # TODO: check if alive.
335 366 return True
336 367 else:
337 368 return False
338 369
339 370 def stop_listening(self):
340 371 """Stops listening. If not listening, does nothing. """
341 372 if self.is_listening:
342 373 self.is_listening = False
343 374 self.sub_channel.stop()
344 375 self.xreq_channel.stop()
345 376 self.rep_channel.stop()
346 377
347 378 #--------------------------------------------------------------------------
348 379 # Kernel process management methods:
349 380 #--------------------------------------------------------------------------
350 381
351 382 def start_kernel(self):
352 383 """Starts a kernel process and configures the manager to use it.
353 384
354 385 If ports have been specified via the address attributes, they are used.
355 386 Otherwise, open ports are chosen by the OS and the channel port
356 387 attributes are configured as appropriate.
357 388 """
358 389 xreq, sub = self.xreq_address, self.sub_address
359 390 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
360 391 raise RuntimeError("Can only launch a kernel on localhost."
361 392 "Make sure that the '*_address' attributes are "
362 393 "configured properly.")
363 394
364 395 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
365 396 self.set_kernel(kernel)
366 397 self.xreq_address = (LOCALHOST, xrep)
367 398 self.sub_address = (LOCALHOST, pub)
368 399
369 400 def set_kernel(self, kernel):
370 401 """Sets the kernel manager's kernel to an existing kernel process.
371 402
372 403 It is *not* necessary to a set a kernel to communicate with it via the
373 404 channels, and those objects must be configured separately. It
374 405 *is* necessary to set a kernel if you want to use the manager (or
375 406 frontends that use the manager) to signal and/or kill the kernel.
376 407
377 408 Parameters:
378 409 -----------
379 410 kernel : Popen
380 411 An existing kernel process.
381 412 """
382 413 self._kernel = kernel
383 414
384 415 @property
385 416 def has_kernel(self):
386 417 """Returns whether a kernel process has been specified for the kernel
387 418 manager.
388 419
389 420 A kernel process can be set via 'start_kernel' or 'set_kernel'.
390 421 """
391 422 return self._kernel is not None
392 423
393 424 def kill_kernel(self):
394 425 """ Kill the running kernel. """
395 426 if self._kernel:
396 427 self._kernel.kill()
397 428 self._kernel = None
398 429 else:
399 430 raise RuntimeError("Cannot kill kernel. No kernel is running!")
400 431
401 432 def signal_kernel(self, signum):
402 433 """ Sends a signal to the kernel. """
403 434 if self._kernel:
404 435 self._kernel.send_signal(signum)
405 436 else:
406 437 raise RuntimeError("Cannot signal kernel. No kernel is running!")
407 438
408 439 #--------------------------------------------------------------------------
409 440 # Channels used for communication with the kernel:
410 441 #--------------------------------------------------------------------------
411 442
412 443 @property
413 444 def sub_channel(self):
414 445 """Get the SUB socket channel object."""
415 446 if self._sub_channel is None:
416 447 self._sub_channel = self.sub_channel_class(self.context,
417 448 self.session)
418 449 return self._sub_channel
419 450
420 451 @property
421 452 def xreq_channel(self):
422 453 """Get the REQ socket channel object to make requests of the kernel."""
423 454 if self._xreq_channel is None:
424 455 self._xreq_channel = self.xreq_channel_class(self.context,
425 456 self.session)
426 457 return self._xreq_channel
427 458
428 459 @property
429 460 def rep_channel(self):
430 461 """Get the REP socket channel object to handle stdin (raw_input)."""
431 462 if self._rep_channel is None:
432 463 self._rep_channel = self.rep_channel_class(self.context,
433 464 self.session)
434 465 return self._rep_channel
435 466
436 467 #--------------------------------------------------------------------------
437 468 # Delegates for the Channel address attributes:
438 469 #--------------------------------------------------------------------------
439 470
440 471 def get_sub_address(self):
441 472 return self.sub_channel.address
442 473
443 474 def set_sub_address(self, address):
444 475 self.sub_channel.address = address
445 476
446 477 sub_address = property(get_sub_address, set_sub_address,
447 478 doc="The address used by SUB socket channel.")
448 479
449 480 def get_xreq_address(self):
450 481 return self.xreq_channel.address
451 482
452 483 def set_xreq_address(self, address):
453 484 self.xreq_channel.address = address
454 485
455 486 xreq_address = property(get_xreq_address, set_xreq_address,
456 487 doc="The address used by XREQ socket channel.")
457 488
458 489 def get_rep_address(self):
459 490 return self.rep_channel.address
460 491
461 492 def set_rep_address(self, address):
462 493 self.rep_channel.address = address
463 494
464 495 rep_address = property(get_rep_address, set_rep_address,
465 496 doc="The address used by REP socket channel.")
466 497
General Comments 0
You need to be logged in to leave comments. Login now