##// END OF EJS Templates
Cleaned up KernelManager.__init__ for better traits usage.
Brian Granger -
Show More
@@ -1,591 +1,571 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 Todo
4 4 ====
5 5
6 6 * Create logger to handle debugging and console messages.
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2008-2010 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 # Standard library imports.
21 21 from Queue import Queue, Empty
22 22 from subprocess import Popen
23 23 from threading import Thread
24 24 import time
25 25
26 26 # System library imports.
27 27 import zmq
28 28 from zmq import POLLIN, POLLOUT, POLLERR
29 29 from zmq.eventloop import ioloop
30 30
31 31 # Local imports.
32 32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
33 33 from kernel import launch_kernel
34 34 from session import Session
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Constants and exceptions
38 38 #-----------------------------------------------------------------------------
39 39
40 40 LOCALHOST = '127.0.0.1'
41 41
42 42 class InvalidPortNumber(Exception):
43 43 pass
44 44
45 45 #-----------------------------------------------------------------------------
46 46 # ZMQ Socket Channel classes
47 47 #-----------------------------------------------------------------------------
48 48
49 49 class ZmqSocketChannel(Thread):
50 50 """The base class for the channels that use ZMQ sockets.
51 51 """
52 52 context = None
53 53 session = None
54 54 socket = None
55 55 ioloop = None
56 56 iostate = None
57 57 _address = None
58 58
59 59 def __init__(self, context, session, address):
60 60 """Create a channel
61 61
62 62 Parameters
63 63 ----------
64 64 context : :class:`zmq.Context`
65 65 The ZMQ context to use.
66 66 session : :class:`session.Session`
67 67 The session to use.
68 68 address : tuple
69 69 Standard (ip, port) tuple that the kernel is listening on.
70 70 """
71 71 super(ZmqSocketChannel, self).__init__()
72 72 self.daemon = True
73 73
74 74 self.context = context
75 75 self.session = session
76 76 if address[1] == 0:
77 77 message = 'The port number for a channel cannot be 0.'
78 78 raise InvalidPortNumber(message)
79 79 self._address = address
80 80
81 81 def stop(self):
82 82 """Stop the channel's activity.
83 83
84 84 This calls :method:`Thread.join` and returns when the thread
85 85 terminates. :class:`RuntimeError` will be raised if
86 86 :method:`self.start` is called again.
87 87 """
88 88 self.join()
89 89
90 90 @property
91 91 def address(self):
92 92 """Get the channel's address as an (ip, port) tuple.
93 93
94 94 By the default, the address is (localhost, 0), where 0 means a random
95 95 port.
96 96 """
97 97 return self._address
98 98
99 99 def add_io_state(self, state):
100 100 """Add IO state to the eventloop.
101 101
102 102 Parameters
103 103 ----------
104 104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
105 105 The IO state flag to set.
106 106
107 107 This is thread safe as it uses the thread safe IOLoop.add_callback.
108 108 """
109 109 def add_io_state_callback():
110 110 if not self.iostate & state:
111 111 self.iostate = self.iostate | state
112 112 self.ioloop.update_handler(self.socket, self.iostate)
113 113 self.ioloop.add_callback(add_io_state_callback)
114 114
115 115 def drop_io_state(self, state):
116 116 """Drop IO state from the eventloop.
117 117
118 118 Parameters
119 119 ----------
120 120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
121 121 The IO state flag to set.
122 122
123 123 This is thread safe as it uses the thread safe IOLoop.add_callback.
124 124 """
125 125 def drop_io_state_callback():
126 126 if self.iostate & state:
127 127 self.iostate = self.iostate & (~state)
128 128 self.ioloop.update_handler(self.socket, self.iostate)
129 129 self.ioloop.add_callback(drop_io_state_callback)
130 130
131 131
132 132 class XReqSocketChannel(ZmqSocketChannel):
133 133 """The XREQ channel for issues request/replies to the kernel.
134 134 """
135 135
136 136 command_queue = None
137 137
138 138 def __init__(self, context, session, address):
139 139 self.command_queue = Queue()
140 140 super(XReqSocketChannel, self).__init__(context, session, address)
141 141
142 142 def run(self):
143 143 """The thread's main activity. Call start() instead."""
144 144 self.socket = self.context.socket(zmq.XREQ)
145 145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
146 146 self.socket.connect('tcp://%s:%i' % self.address)
147 147 self.ioloop = ioloop.IOLoop()
148 148 self.iostate = POLLERR|POLLIN
149 149 self.ioloop.add_handler(self.socket, self._handle_events,
150 150 self.iostate)
151 151 self.ioloop.start()
152 152
153 153 def stop(self):
154 154 self.ioloop.stop()
155 155 super(XReqSocketChannel, self).stop()
156 156
157 157 def call_handlers(self, msg):
158 158 """This method is called in the ioloop thread when a message arrives.
159 159
160 160 Subclasses should override this method to handle incoming messages.
161 161 It is important to remember that this method is called in the thread
162 162 so that some logic must be done to ensure that the application leve
163 163 handlers are called in the application thread.
164 164 """
165 165 raise NotImplementedError('call_handlers must be defined in a subclass.')
166 166
167 167 def execute(self, code):
168 168 """Execute code in the kernel.
169 169
170 170 Parameters
171 171 ----------
172 172 code : str
173 173 A string of Python code.
174 174
175 175 Returns
176 176 -------
177 177 The msg_id of the message sent.
178 178 """
179 179 # Create class for content/msg creation. Related to, but possibly
180 180 # not in Session.
181 181 content = dict(code=code)
182 182 msg = self.session.msg('execute_request', content)
183 183 self._queue_request(msg)
184 184 return msg['header']['msg_id']
185 185
186 186 def complete(self, text, line, block=None):
187 187 """Tab complete text, line, block in the kernel's namespace.
188 188
189 189 Parameters
190 190 ----------
191 191 text : str
192 192 The text to complete.
193 193 line : str
194 194 The full line of text that is the surrounding context for the
195 195 text to complete.
196 196 block : str
197 197 The full block of code in which the completion is being requested.
198 198
199 199 Returns
200 200 -------
201 201 The msg_id of the message sent.
202 202 """
203 203 content = dict(text=text, line=line)
204 204 msg = self.session.msg('complete_request', content)
205 205 self._queue_request(msg)
206 206 return msg['header']['msg_id']
207 207
208 208 def object_info(self, oname):
209 209 """Get metadata information about an object.
210 210
211 211 Parameters
212 212 ----------
213 213 oname : str
214 214 A string specifying the object name.
215 215
216 216 Returns
217 217 -------
218 218 The msg_id of the message sent.
219 219 """
220 220 content = dict(oname=oname)
221 221 msg = self.session.msg('object_info_request', content)
222 222 self._queue_request(msg)
223 223 return msg['header']['msg_id']
224 224
225 225 def _handle_events(self, socket, events):
226 226 if events & POLLERR:
227 227 self._handle_err()
228 228 if events & POLLOUT:
229 229 self._handle_send()
230 230 if events & POLLIN:
231 231 self._handle_recv()
232 232
233 233 def _handle_recv(self):
234 234 msg = self.socket.recv_json()
235 235 self.call_handlers(msg)
236 236
237 237 def _handle_send(self):
238 238 try:
239 239 msg = self.command_queue.get(False)
240 240 except Empty:
241 241 pass
242 242 else:
243 243 self.socket.send_json(msg)
244 244 if self.command_queue.empty():
245 245 self.drop_io_state(POLLOUT)
246 246
247 247 def _handle_err(self):
248 248 # We don't want to let this go silently, so eventually we should log.
249 249 raise zmq.ZMQError()
250 250
251 251 def _queue_request(self, msg):
252 252 self.command_queue.put(msg)
253 253 self.add_io_state(POLLOUT)
254 254
255 255
256 256 class SubSocketChannel(ZmqSocketChannel):
257 257 """The SUB channel which listens for messages that the kernel publishes.
258 258 """
259 259
260 260 def __init__(self, context, session, address):
261 261 super(SubSocketChannel, self).__init__(context, session, address)
262 262
263 263 def run(self):
264 264 """The thread's main activity. Call start() instead."""
265 265 self.socket = self.context.socket(zmq.SUB)
266 266 self.socket.setsockopt(zmq.SUBSCRIBE,'')
267 267 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
268 268 self.socket.connect('tcp://%s:%i' % self.address)
269 269 self.ioloop = ioloop.IOLoop()
270 270 self.iostate = POLLIN|POLLERR
271 271 self.ioloop.add_handler(self.socket, self._handle_events,
272 272 self.iostate)
273 273 self.ioloop.start()
274 274
275 275 def stop(self):
276 276 self.ioloop.stop()
277 277 super(SubSocketChannel, self).stop()
278 278
279 279 def call_handlers(self, msg):
280 280 """This method is called in the ioloop thread when a message arrives.
281 281
282 282 Subclasses should override this method to handle incoming messages.
283 283 It is important to remember that this method is called in the thread
284 284 so that some logic must be done to ensure that the application leve
285 285 handlers are called in the application thread.
286 286 """
287 287 raise NotImplementedError('call_handlers must be defined in a subclass.')
288 288
289 289 def flush(self, timeout=1.0):
290 290 """Immediately processes all pending messages on the SUB channel.
291 291
292 292 Callers should use this method to ensure that :method:`call_handlers`
293 293 has been called for all messages that have been received on the
294 294 0MQ SUB socket of this channel.
295 295
296 296 This method is thread safe.
297 297
298 298 Parameters
299 299 ----------
300 300 timeout : float, optional
301 301 The maximum amount of time to spend flushing, in seconds. The
302 302 default is one second.
303 303 """
304 304 # We do the IOLoop callback process twice to ensure that the IOLoop
305 305 # gets to perform at least one full poll.
306 306 stop_time = time.time() + timeout
307 307 for i in xrange(2):
308 308 self._flushed = False
309 309 self.ioloop.add_callback(self._flush)
310 310 while not self._flushed and time.time() < stop_time:
311 311 time.sleep(0.01)
312 312
313 313 def _handle_events(self, socket, events):
314 314 # Turn on and off POLLOUT depending on if we have made a request
315 315 if events & POLLERR:
316 316 self._handle_err()
317 317 if events & POLLIN:
318 318 self._handle_recv()
319 319
320 320 def _handle_err(self):
321 321 # We don't want to let this go silently, so eventually we should log.
322 322 raise zmq.ZMQError()
323 323
324 324 def _handle_recv(self):
325 325 # Get all of the messages we can
326 326 while True:
327 327 try:
328 328 msg = self.socket.recv_json(zmq.NOBLOCK)
329 329 except zmq.ZMQError:
330 330 # Check the errno?
331 331 # Will this trigger POLLERR?
332 332 break
333 333 else:
334 334 self.call_handlers(msg)
335 335
336 336 def _flush(self):
337 337 """Callback for :method:`self.flush`."""
338 338 self._flushed = True
339 339
340 340
341 341 class RepSocketChannel(ZmqSocketChannel):
342 342 """A reply channel to handle raw_input requests that the kernel makes."""
343 343
344 344 msg_queue = None
345 345
346 346 def __init__(self, context, session, address):
347 347 self.msg_queue = Queue()
348 348 super(RepSocketChannel, self).__init__(context, session, address)
349 349
350 350 def run(self):
351 351 """The thread's main activity. Call start() instead."""
352 352 self.socket = self.context.socket(zmq.XREQ)
353 353 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
354 354 self.socket.connect('tcp://%s:%i' % self.address)
355 355 self.ioloop = ioloop.IOLoop()
356 356 self.iostate = POLLERR|POLLIN
357 357 self.ioloop.add_handler(self.socket, self._handle_events,
358 358 self.iostate)
359 359 self.ioloop.start()
360 360
361 361 def stop(self):
362 362 self.ioloop.stop()
363 363 super(RepSocketChannel, self).stop()
364 364
365 365 def call_handlers(self, msg):
366 366 """This method is called in the ioloop thread when a message arrives.
367 367
368 368 Subclasses should override this method to handle incoming messages.
369 369 It is important to remember that this method is called in the thread
370 370 so that some logic must be done to ensure that the application leve
371 371 handlers are called in the application thread.
372 372 """
373 373 raise NotImplementedError('call_handlers must be defined in a subclass.')
374 374
375 375 def input(self, string):
376 376 """Send a string of raw input to the kernel."""
377 377 content = dict(value=string)
378 378 msg = self.session.msg('input_reply', content)
379 379 self._queue_reply(msg)
380 380
381 381 def _handle_events(self, socket, events):
382 382 if events & POLLERR:
383 383 self._handle_err()
384 384 if events & POLLOUT:
385 385 self._handle_send()
386 386 if events & POLLIN:
387 387 self._handle_recv()
388 388
389 389 def _handle_recv(self):
390 390 msg = self.socket.recv_json()
391 391 self.call_handlers(msg)
392 392
393 393 def _handle_send(self):
394 394 try:
395 395 msg = self.msg_queue.get(False)
396 396 except Empty:
397 397 pass
398 398 else:
399 399 self.socket.send_json(msg)
400 400 if self.msg_queue.empty():
401 401 self.drop_io_state(POLLOUT)
402 402
403 403 def _handle_err(self):
404 404 # We don't want to let this go silently, so eventually we should log.
405 405 raise zmq.ZMQError()
406 406
407 407 def _queue_reply(self, msg):
408 408 self.msg_queue.put(msg)
409 409 self.add_io_state(POLLOUT)
410 410
411 411
412 412 #-----------------------------------------------------------------------------
413 413 # Main kernel manager class
414 414 #-----------------------------------------------------------------------------
415 415
416 416 class KernelManager(HasTraits):
417 417 """ Manages a kernel for a frontend.
418 418
419 419 The SUB channel is for the frontend to receive messages published by the
420 420 kernel.
421 421
422 422 The REQ channel is for the frontend to make requests of the kernel.
423 423
424 424 The REP channel is for the kernel to request stdin (raw_input) from the
425 425 frontend.
426 426 """
427 427 # The PyZMQ Context to use for communication with the kernel.
428 context = Instance(zmq.Context)
428 context = Instance(zmq.Context,(),{})
429 429
430 430 # The Session to use for communication with the kernel.
431 session = Instance(Session)
431 session = Instance(Session,(),{})
432 432
433 433 # The kernel process with which the KernelManager is communicating.
434 434 kernel = Instance(Popen)
435 435
436 436 # The classes to use for the various channels.
437 437 xreq_channel_class = Type(XReqSocketChannel)
438 438 sub_channel_class = Type(SubSocketChannel)
439 439 rep_channel_class = Type(RepSocketChannel)
440 440
441 441 # Protected traits.
442 _xreq_address = TCPAddress
443 _sub_address = TCPAddress
444 _rep_address = TCPAddress
442 xreq_address = TCPAddress((LOCALHOST, 0))
443 sub_address = TCPAddress((LOCALHOST, 0))
444 rep_address = TCPAddress((LOCALHOST, 0))
445 445 _xreq_channel = Any
446 446 _sub_channel = Any
447 447 _rep_channel = Any
448 448
449 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
450 context=None, session=None):
451 super(KernelManager, self).__init__()
452 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
453 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
454 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
455 self.context = zmq.Context() if context is None else context
456 self.session = Session() if session is None else session
457 super(KernelManager, self).__init__()
449 def __init__(self, **kwargs):
450 super(KernelManager, self).__init__(**kwargs)
458 451
459 452 #--------------------------------- -----------------------------------------
460 453 # Channel management methods:
461 454 #--------------------------------------------------------------------------
462 455
463 456 def start_channels(self):
464 457 """Starts the channels for this kernel.
465 458
466 459 This will create the channels if they do not exist and then start
467 460 them. If port numbers of 0 are being used (random ports) then you
468 461 must first call :method:`start_kernel`. If the channels have been
469 462 stopped and you call this, :class:`RuntimeError` will be raised.
470 463 """
471 464 self.xreq_channel.start()
472 465 self.sub_channel.start()
473 466 self.rep_channel.start()
474 467
475 468 def stop_channels(self):
476 469 """Stops the channels for this kernel.
477 470
478 471 This stops the channels by joining their threads. If the channels
479 472 were not started, :class:`RuntimeError` will be raised.
480 473 """
481 474 self.xreq_channel.stop()
482 475 self.sub_channel.stop()
483 476 self.rep_channel.stop()
484 477
485 478 @property
486 479 def channels_running(self):
487 480 """Are all of the channels created and running?"""
488 481 return self.xreq_channel.is_alive() \
489 482 and self.sub_channel.is_alive() \
490 483 and self.rep_channel.is_alive()
491 484
492 485 #--------------------------------------------------------------------------
493 486 # Kernel process management methods:
494 487 #--------------------------------------------------------------------------
495 488
496 489 def start_kernel(self):
497 490 """Starts a kernel process and configures the manager to use it.
498 491
499 492 If random ports (port=0) are being used, this method must be called
500 493 before the channels are created.
501 494 """
502 495 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
503 496 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
504 497 raise RuntimeError("Can only launch a kernel on localhost."
505 498 "Make sure that the '*_address' attributes are "
506 499 "configured properly.")
507 500
508 501 self.kernel, xrep, pub, req = launch_kernel(
509 502 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1])
510 self._xreq_address = (LOCALHOST, xrep)
511 self._sub_address = (LOCALHOST, pub)
512 self._rep_address = (LOCALHOST, req)
503 self.xreq_address = (LOCALHOST, xrep)
504 self.sub_address = (LOCALHOST, pub)
505 self.rep_address = (LOCALHOST, req)
513 506
514 507 @property
515 508 def has_kernel(self):
516 509 """Returns whether a kernel process has been specified for the kernel
517 510 manager.
518 511 """
519 512 return self.kernel is not None
520 513
521 514 def kill_kernel(self):
522 515 """ Kill the running kernel. """
523 516 if self.kernel is not None:
524 517 self.kernel.kill()
525 518 self.kernel = None
526 519 else:
527 520 raise RuntimeError("Cannot kill kernel. No kernel is running!")
528 521
529 522 def signal_kernel(self, signum):
530 523 """ Sends a signal to the kernel. """
531 524 if self.kernel is not None:
532 525 self.kernel.send_signal(signum)
533 526 else:
534 527 raise RuntimeError("Cannot signal kernel. No kernel is running!")
535 528
536 529 @property
537 530 def is_alive(self):
538 531 """Is the kernel process still running?"""
539 532 if self.kernel is not None:
540 533 if self.kernel.poll() is None:
541 534 return True
542 535 else:
543 536 return False
544 537 else:
545 538 # We didn't start the kernel with this KernelManager so we don't
546 539 # know if it is running. We should use a heartbeat for this case.
547 540 return True
548 541
549 542 #--------------------------------------------------------------------------
550 543 # Channels used for communication with the kernel:
551 544 #--------------------------------------------------------------------------
552 545
553 546 @property
554 547 def xreq_channel(self):
555 548 """Get the REQ socket channel object to make requests of the kernel."""
556 549 if self._xreq_channel is None:
557 550 self._xreq_channel = self.xreq_channel_class(self.context,
558 551 self.session,
559 552 self.xreq_address)
560 553 return self._xreq_channel
561 554
562 555 @property
563 556 def sub_channel(self):
564 557 """Get the SUB socket channel object."""
565 558 if self._sub_channel is None:
566 559 self._sub_channel = self.sub_channel_class(self.context,
567 560 self.session,
568 561 self.sub_address)
569 562 return self._sub_channel
570 563
571 564 @property
572 565 def rep_channel(self):
573 566 """Get the REP socket channel object to handle stdin (raw_input)."""
574 567 if self._rep_channel is None:
575 568 self._rep_channel = self.rep_channel_class(self.context,
576 569 self.session,
577 570 self.rep_address)
578 571 return self._rep_channel
579
580 @property
581 def xreq_address(self):
582 return self._xreq_address
583
584 @property
585 def sub_address(self):
586 return self._sub_address
587
588 @property
589 def rep_address(self):
590 return self._rep_address
591
General Comments 0
You need to be logged in to leave comments. Login now