##// END OF EJS Templates
General cleanup of kernelmanager.py....
Brian Granger -
Show More
@@ -1,492 +1,458 b''
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 7 # Standard library imports.
8 8 from Queue import Queue, Empty
9 9 from subprocess import Popen
10 10 from threading import Thread
11 11 import time
12 12 import traceback
13 13
14 14 # System library imports.
15 15 import zmq
16 16 from zmq import POLLIN, POLLOUT, POLLERR
17 17 from zmq.eventloop import ioloop
18 18
19 19 # Local imports.
20 20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 21 Type
22 22 from kernel import launch_kernel
23 23 from session import Session
24 24
25 25 # Constants.
26 26 LOCALHOST = '127.0.0.1'
27 27
28 28
29 29 class MissingHandlerError(Exception):
30 30 pass
31 31
32 32
33 33 class ZmqSocketChannel(Thread):
34 34 """ The base class for the channels that use ZMQ sockets.
35 35 """
36 36
37 37 def __init__(self, context, session, address=None):
38 38 super(ZmqSocketChannel, self).__init__()
39 39 self.daemon = True
40 40
41 41 self.context = context
42 42 self.session = session
43 43 self.address = address
44 44 self.socket = None
45 45
46 46 def stop(self):
47 47 """Stop the thread's activity. Returns when the thread terminates.
48 48
49 49 The thread will raise :class:`RuntimeError` if :method:`self.start`
50 50 is called again.
51 51 """
52 52 self.join()
53 53
54 54
55 55 def get_address(self):
56 56 """ Get the channel's address. By the default, a channel is on
57 57 localhost with no port specified (a negative port number).
58 58 """
59 59 return self._address
60 60
61 61 def set_adresss(self, address):
62 62 """ Set the channel's address. Should be a tuple of form:
63 63 (ip address [str], port [int]).
64 64 or None, in which case the address is reset to its default value.
65 65 """
66 66 # FIXME: Validate address.
67 67 if self.is_alive(): # This is Thread.is_alive
68 68 raise RuntimeError("Cannot set address on a running channel!")
69 69 else:
70 70 if address is None:
71 71 address = (LOCALHOST, 0)
72 72 self._address = address
73 73
74 74 address = property(get_address, set_adresss)
75 75
76 76
77 77 class SubSocketChannel(ZmqSocketChannel):
78 78
79 handlers = None
80 _overriden_call_handler = None
81
82 79 def __init__(self, context, session, address=None):
83 self.handlers = {}
84 80 super(SubSocketChannel, self).__init__(context, session, address)
85 81
86 82 def run(self):
87 83 self.socket = self.context.socket(zmq.SUB)
88 84 self.socket.setsockopt(zmq.SUBSCRIBE,'')
89 85 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
90 86 self.socket.connect('tcp://%s:%i' % self.address)
91 87 self.ioloop = ioloop.IOLoop()
92 88 self.ioloop.add_handler(self.socket, self._handle_events,
93 89 POLLIN|POLLERR)
94 90 self.ioloop.start()
95 91
96 92 def stop(self):
97 93 self.ioloop.stop()
98 94 super(SubSocketChannel, self).stop()
99 95
100 96 def _handle_events(self, socket, events):
101 97 # Turn on and off POLLOUT depending on if we have made a request
102 98 if events & POLLERR:
103 99 self._handle_err()
104 100 if events & POLLIN:
105 101 self._handle_recv()
106 102
107 103 def _handle_err(self):
104 # We don't want to let this go silently, so eventually we should log.
108 105 raise zmq.ZmqError()
109 106
110 107 def _handle_recv(self):
111 108 msg = self.socket.recv_json()
112 109 self.call_handlers(msg)
113 110
114 def override_call_handler(self, func):
115 """Permanently override the call_handler.
116
117 The function func will be called as::
118
119 func(handler, msg)
120
121 And must call::
122
123 handler(msg)
124
125 in the main thread.
126 """
127 assert callable(func), "not a callable: %r" % func
128 self._overriden_call_handler = func
129
130 111 def call_handlers(self, msg):
131 handler = self.handlers.get(msg['msg_type'], None)
132 if handler is not None:
133 try:
134 self.call_handler(handler, msg)
135 except:
136 # XXX: This should be logged at least
137 traceback.print_last()
112 """This method is called in the ioloop thread when a message arrives.
138 113
139 def call_handler(self, handler, msg):
140 if self._overriden_call_handler is not None:
141 self._overriden_call_handler(handler, msg)
142 elif hasattr(self, '_call_handler'):
143 call_handler = getattr(self, '_call_handler')
144 call_handler(handler, msg)
145 else:
146 raise RuntimeError('no handler!')
147
148 def add_handler(self, callback, msg_type):
149 """Register a callback for msg type."""
150 self.handlers[msg_type] = callback
151
152 def remove_handler(self, msg_type):
153 """Remove the callback for msg type."""
154 self.handlers.pop(msg_type, None)
114 Subclasses should override this method to handle incoming messages.
115 It is important to remember that this method is called in the thread
116 so that some logic must be done to ensure that the application leve
117 handlers are called in the application thread.
118 """
119 raise NotImplementedError('call_handlers must be defined in a subclass.')
155 120
156 121 def flush(self, timeout=1.0):
157 122 """Immediately processes all pending messages on the SUB channel.
158 123
159 124 This method is thread safe.
160 125
161 126 Parameters
162 127 ----------
163 128 timeout : float, optional
164 129 The maximum amount of time to spend flushing, in seconds. The
165 130 default is one second.
166 131 """
167 132 # We do the IOLoop callback process twice to ensure that the IOLoop
168 133 # gets to perform at least one full poll.
169 134 stop_time = time.time() + timeout
170 135 for i in xrange(2):
171 136 self._flushed = False
172 137 self.ioloop.add_callback(self._flush)
173 138 while not self._flushed and time.time() < stop_time:
174 139 time.sleep(0.01)
175 140
176 141 def _flush(self):
177 142 """Called in this thread by the IOLoop to indicate that all events have
178 143 been processed.
179 144 """
180 145 self._flushed = True
181 146
182 147
183 148 class XReqSocketChannel(ZmqSocketChannel):
184 149
185 150 handler_queue = None
186 151 command_queue = None
187 152 handlers = None
188 153 _overriden_call_handler = None
189 154
190 155 def __init__(self, context, session, address=None):
191 156 self.handlers = {}
192 157 self.handler_queue = Queue()
193 158 self.command_queue = Queue()
194 159 super(XReqSocketChannel, self).__init__(context, session, address)
195 160
196 161 def run(self):
197 162 self.socket = self.context.socket(zmq.XREQ)
198 163 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
199 164 self.socket.connect('tcp://%s:%i' % self.address)
200 165 self.ioloop = ioloop.IOLoop()
201 166 self.ioloop.add_handler(self.socket, self._handle_events,
202 167 POLLIN|POLLOUT|POLLERR)
203 168 self.ioloop.start()
204 169
205 170 def stop(self):
206 171 self.ioloop.stop()
207 172 super(XReqSocketChannel, self).stop()
208 173
209 174 def _handle_events(self, socket, events):
210 175 # Turn on and off POLLOUT depending on if we have made a request
211 176 if events & POLLERR:
212 177 self._handle_err()
213 178 if events & POLLOUT:
214 179 self._handle_send()
215 180 if events & POLLIN:
216 181 self._handle_recv()
217 182
218 183 def _handle_recv(self):
219 184 msg = self.socket.recv_json()
220 185 self.call_handlers(msg)
221 186
222 187 def _handle_send(self):
223 188 try:
224 189 msg = self.command_queue.get(False)
225 190 except Empty:
226 191 pass
227 192 else:
228 193 self.socket.send_json(msg)
229 194
230 195 def _handle_err(self):
196 # We don't want to let this go silently, so eventually we should log.
231 197 raise zmq.ZmqError()
232 198
233 199 def _queue_request(self, msg, callback):
234 200 handler = self._find_handler(msg['msg_type'], callback)
235 201 self.handler_queue.put(handler)
236 202 self.command_queue.put(msg)
237 203
238 204 def execute(self, code, callback=None):
239 205 # Create class for content/msg creation. Related to, but possibly
240 206 # not in Session.
241 207 content = dict(code=code)
242 208 msg = self.session.msg('execute_request', content)
243 209 self._queue_request(msg, callback)
244 210 return msg['header']['msg_id']
245 211
246 212 def complete(self, text, line, block=None, callback=None):
247 213 content = dict(text=text, line=line)
248 214 msg = self.session.msg('complete_request', content)
249 215 self._queue_request(msg, callback)
250 216 return msg['header']['msg_id']
251 217
252 218 def object_info(self, oname, callback=None):
253 219 content = dict(oname=oname)
254 220 msg = self.session.msg('object_info_request', content)
255 221 self._queue_request(msg, callback)
256 222 return msg['header']['msg_id']
257 223
258 224 def _find_handler(self, name, callback):
259 225 if callback is not None:
260 226 return callback
261 227 handler = self.handlers.get(name)
262 228 if handler is None:
263 229 raise MissingHandlerError(
264 230 'No handler defined for method: %s' % name)
265 231 return handler
266 232
267 233 def override_call_handler(self, func):
268 234 """Permanently override the call_handler.
269 235
270 236 The function func will be called as::
271 237
272 238 func(handler, msg)
273 239
274 240 And must call::
275 241
276 242 handler(msg)
277 243
278 244 in the main thread.
279 245 """
280 246 assert callable(func), "not a callable: %r" % func
281 247 self._overriden_call_handler = func
282 248
283 249 def call_handlers(self, msg):
284 250 try:
285 251 handler = self.handler_queue.get(False)
286 252 except Empty:
287 253 print "Message received with no handler!!!"
288 254 print msg
289 255 else:
290 256 self.call_handler(handler, msg)
291 257
292 258 def call_handler(self, handler, msg):
293 259 if self._overriden_call_handler is not None:
294 260 self._overriden_call_handler(handler, msg)
295 261 elif hasattr(self, '_call_handler'):
296 262 call_handler = getattr(self, '_call_handler')
297 263 call_handler(handler, msg)
298 264 else:
299 265 raise RuntimeError('no handler!')
300 266
301 267
302 268 class RepSocketChannel(ZmqSocketChannel):
303 269
304 270 def on_raw_input(self):
305 271 pass
306 272
307 273
308 274 class KernelManager(HasTraits):
309 275 """ Manages a kernel for a frontend.
310 276
311 277 The SUB channel is for the frontend to receive messages published by the
312 278 kernel.
313 279
314 280 The REQ channel is for the frontend to make requests of the kernel.
315 281
316 282 The REP channel is for the kernel to request stdin (raw_input) from the
317 283 frontend.
318 284 """
319 285
320 286 # Whether the kernel manager is currently listening on its channels.
321 287 is_listening = Bool(False)
322 288
323 289 # The PyZMQ Context to use for communication with the kernel.
324 290 context = Instance(zmq.Context, ())
325 291
326 292 # The Session to use for communication with the kernel.
327 293 session = Instance(Session, ())
328 294
329 295 # The classes to use for the various channels.
330 296 sub_channel_class = Type(SubSocketChannel)
331 297 xreq_channel_class = Type(XReqSocketChannel)
332 298 rep_channel_class = Type(RepSocketChannel)
333 299
334 300 # Protected traits.
335 301 _kernel = Instance(Popen)
336 302 _sub_channel = Any
337 303 _xreq_channel = Any
338 304 _rep_channel = Any
339 305
340 306 #--------------------------------------------------------------------------
341 307 # Channel management methods:
342 308 #--------------------------------------------------------------------------
343 309
344 310 def start_listening(self):
345 311 """Starts listening on the specified ports. If already listening, raises
346 312 a RuntimeError.
347 313 """
348 314 if self.is_listening:
349 315 raise RuntimeError("Cannot start listening. Already listening!")
350 316 else:
351 317 self.is_listening = True
352 318 self.sub_channel.start()
353 319 self.xreq_channel.start()
354 320 self.rep_channel.start()
355 321
356 322 @property
357 323 def is_alive(self):
358 324 """ Returns whether the kernel is alive. """
359 325 if self.is_listening:
360 326 # TODO: check if alive.
361 327 return True
362 328 else:
363 329 return False
364 330
365 331 def stop_listening(self):
366 332 """Stops listening. If not listening, does nothing. """
367 333 if self.is_listening:
368 334 self.is_listening = False
369 335 self.sub_channel.stop()
370 336 self.xreq_channel.stop()
371 337 self.rep_channel.stop()
372 338
373 339 #--------------------------------------------------------------------------
374 340 # Kernel process management methods:
375 341 #--------------------------------------------------------------------------
376 342
377 343 def start_kernel(self):
378 344 """Starts a kernel process and configures the manager to use it.
379 345
380 346 If ports have been specified via the address attributes, they are used.
381 347 Otherwise, open ports are chosen by the OS and the channel port
382 348 attributes are configured as appropriate.
383 349 """
384 350 xreq, sub = self.xreq_address, self.sub_address
385 351 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
386 352 raise RuntimeError("Can only launch a kernel on localhost."
387 353 "Make sure that the '*_address' attributes are "
388 354 "configured properly.")
389 355
390 356 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
391 357 self.set_kernel(kernel)
392 358 self.xreq_address = (LOCALHOST, xrep)
393 359 self.sub_address = (LOCALHOST, pub)
394 360
395 361 def set_kernel(self, kernel):
396 362 """Sets the kernel manager's kernel to an existing kernel process.
397 363
398 364 It is *not* necessary to a set a kernel to communicate with it via the
399 365 channels, and those objects must be configured separately. It
400 366 *is* necessary to set a kernel if you want to use the manager (or
401 367 frontends that use the manager) to signal and/or kill the kernel.
402 368
403 369 Parameters:
404 370 -----------
405 371 kernel : Popen
406 372 An existing kernel process.
407 373 """
408 374 self._kernel = kernel
409 375
410 376 @property
411 377 def has_kernel(self):
412 378 """Returns whether a kernel process has been specified for the kernel
413 379 manager.
414 380
415 381 A kernel process can be set via 'start_kernel' or 'set_kernel'.
416 382 """
417 383 return self._kernel is not None
418 384
419 385 def kill_kernel(self):
420 386 """ Kill the running kernel. """
421 387 if self._kernel:
422 388 self._kernel.kill()
423 389 self._kernel = None
424 390 else:
425 391 raise RuntimeError("Cannot kill kernel. No kernel is running!")
426 392
427 393 def signal_kernel(self, signum):
428 394 """ Sends a signal to the kernel. """
429 395 if self._kernel:
430 396 self._kernel.send_signal(signum)
431 397 else:
432 398 raise RuntimeError("Cannot signal kernel. No kernel is running!")
433 399
434 400 #--------------------------------------------------------------------------
435 401 # Channels used for communication with the kernel:
436 402 #--------------------------------------------------------------------------
437 403
438 404 @property
439 405 def sub_channel(self):
440 406 """Get the SUB socket channel object."""
441 407 if self._sub_channel is None:
442 408 self._sub_channel = self.sub_channel_class(self.context,
443 409 self.session)
444 410 return self._sub_channel
445 411
446 412 @property
447 413 def xreq_channel(self):
448 414 """Get the REQ socket channel object to make requests of the kernel."""
449 415 if self._xreq_channel is None:
450 416 self._xreq_channel = self.xreq_channel_class(self.context,
451 417 self.session)
452 418 return self._xreq_channel
453 419
454 420 @property
455 421 def rep_channel(self):
456 422 """Get the REP socket channel object to handle stdin (raw_input)."""
457 423 if self._rep_channel is None:
458 424 self._rep_channel = self.rep_channel_class(self.context,
459 425 self.session)
460 426 return self._rep_channel
461 427
462 428 #--------------------------------------------------------------------------
463 429 # Delegates for the Channel address attributes:
464 430 #--------------------------------------------------------------------------
465 431
466 432 def get_sub_address(self):
467 433 return self.sub_channel.address
468 434
469 435 def set_sub_address(self, address):
470 436 self.sub_channel.address = address
471 437
472 438 sub_address = property(get_sub_address, set_sub_address,
473 439 doc="The address used by SUB socket channel.")
474 440
475 441 def get_xreq_address(self):
476 442 return self.xreq_channel.address
477 443
478 444 def set_xreq_address(self, address):
479 445 self.xreq_channel.address = address
480 446
481 447 xreq_address = property(get_xreq_address, set_xreq_address,
482 448 doc="The address used by XREQ socket channel.")
483 449
484 450 def get_rep_address(self):
485 451 return self.rep_channel.address
486 452
487 453 def set_rep_address(self, address):
488 454 self.rep_channel.address = address
489 455
490 456 rep_address = property(get_rep_address, set_rep_address,
491 457 doc="The address used by REP socket channel.")
492 458
General Comments 0
You need to be logged in to leave comments. Login now