##// END OF EJS Templates
* Fixed heartbeat thread not stopping cleanly....
epatters -
Show More
@@ -1,715 +1,723 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 Todo
4 4 ====
5 5
6 6 * Create logger to handle debugging and console messages.
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2008-2010 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 # Standard library imports.
21 21 from Queue import Queue, Empty
22 22 from subprocess import Popen
23 23 from threading import Thread
24 24 import time
25 25
26 26 # System library imports.
27 27 import zmq
28 28 from zmq import POLLIN, POLLOUT, POLLERR
29 29 from zmq.eventloop import ioloop
30 30
31 31 # Local imports.
32 32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
33 33 from session import Session
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Constants and exceptions
37 37 #-----------------------------------------------------------------------------
38 38
39 39 LOCALHOST = '127.0.0.1'
40 40
41 41 class InvalidPortNumber(Exception):
42 42 pass
43 43
44 44 #-----------------------------------------------------------------------------
45 45 # ZMQ Socket Channel classes
46 46 #-----------------------------------------------------------------------------
47 47
48 48 class ZmqSocketChannel(Thread):
49 49 """The base class for the channels that use ZMQ sockets.
50 50 """
51 51 context = None
52 52 session = None
53 53 socket = None
54 54 ioloop = None
55 55 iostate = None
56 56 _address = None
57 57
58 58 def __init__(self, context, session, address):
59 59 """Create a channel
60 60
61 61 Parameters
62 62 ----------
63 63 context : :class:`zmq.Context`
64 64 The ZMQ context to use.
65 65 session : :class:`session.Session`
66 66 The session to use.
67 67 address : tuple
68 68 Standard (ip, port) tuple that the kernel is listening on.
69 69 """
70 70 super(ZmqSocketChannel, self).__init__()
71 71 self.daemon = True
72 72
73 73 self.context = context
74 74 self.session = session
75 75 if address[1] == 0:
76 76 message = 'The port number for a channel cannot be 0.'
77 77 raise InvalidPortNumber(message)
78 78 self._address = address
79 79
80 80 def stop(self):
81 81 """Stop the channel's activity.
82 82
83 83 This calls :method:`Thread.join` and returns when the thread
84 84 terminates. :class:`RuntimeError` will be raised if
85 85 :method:`self.start` is called again.
86 86 """
87 87 self.join()
88 88
89 89 @property
90 90 def address(self):
91 91 """Get the channel's address as an (ip, port) tuple.
92 92
93 93 By the default, the address is (localhost, 0), where 0 means a random
94 94 port.
95 95 """
96 96 return self._address
97 97
98 98 def add_io_state(self, state):
99 99 """Add IO state to the eventloop.
100 100
101 101 Parameters
102 102 ----------
103 103 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 104 The IO state flag to set.
105 105
106 106 This is thread safe as it uses the thread safe IOLoop.add_callback.
107 107 """
108 108 def add_io_state_callback():
109 109 if not self.iostate & state:
110 110 self.iostate = self.iostate | state
111 111 self.ioloop.update_handler(self.socket, self.iostate)
112 112 self.ioloop.add_callback(add_io_state_callback)
113 113
114 114 def drop_io_state(self, state):
115 115 """Drop IO state from the eventloop.
116 116
117 117 Parameters
118 118 ----------
119 119 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 120 The IO state flag to set.
121 121
122 122 This is thread safe as it uses the thread safe IOLoop.add_callback.
123 123 """
124 124 def drop_io_state_callback():
125 125 if self.iostate & state:
126 126 self.iostate = self.iostate & (~state)
127 127 self.ioloop.update_handler(self.socket, self.iostate)
128 128 self.ioloop.add_callback(drop_io_state_callback)
129 129
130 130
131 131 class XReqSocketChannel(ZmqSocketChannel):
132 132 """The XREQ channel for issues request/replies to the kernel.
133 133 """
134 134
135 135 command_queue = None
136 136
137 137 def __init__(self, context, session, address):
138 138 self.command_queue = Queue()
139 139 super(XReqSocketChannel, self).__init__(context, session, address)
140 140
141 141 def run(self):
142 142 """The thread's main activity. Call start() instead."""
143 143 self.socket = self.context.socket(zmq.XREQ)
144 144 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
145 145 self.socket.connect('tcp://%s:%i' % self.address)
146 146 self.ioloop = ioloop.IOLoop()
147 147 self.iostate = POLLERR|POLLIN
148 148 self.ioloop.add_handler(self.socket, self._handle_events,
149 149 self.iostate)
150 150 self.ioloop.start()
151 151
152 152 def stop(self):
153 153 self.ioloop.stop()
154 154 super(XReqSocketChannel, self).stop()
155 155
156 156 def call_handlers(self, msg):
157 157 """This method is called in the ioloop thread when a message arrives.
158 158
159 159 Subclasses should override this method to handle incoming messages.
160 160 It is important to remember that this method is called in the thread
161 161 so that some logic must be done to ensure that the application leve
162 162 handlers are called in the application thread.
163 163 """
164 164 raise NotImplementedError('call_handlers must be defined in a subclass.')
165 165
166 166 def execute(self, code, silent=False):
167 167 """Execute code in the kernel.
168 168
169 169 Parameters
170 170 ----------
171 171 code : str
172 172 A string of Python code.
173 173 silent : bool, optional (default False)
174 174 If set, the kernel will execute the code as quietly possible.
175 175
176 176 Returns
177 177 -------
178 178 The msg_id of the message sent.
179 179 """
180 180 # Create class for content/msg creation. Related to, but possibly
181 181 # not in Session.
182 182 content = dict(code=code, silent=silent)
183 183 msg = self.session.msg('execute_request', content)
184 184 self._queue_request(msg)
185 185 return msg['header']['msg_id']
186 186
187 187 def complete(self, text, line, cursor_pos, block=None):
188 188 """Tab complete text in the kernel's namespace.
189 189
190 190 Parameters
191 191 ----------
192 192 text : str
193 193 The text to complete.
194 194 line : str
195 195 The full line of text that is the surrounding context for the
196 196 text to complete.
197 197 cursor_pos : int
198 198 The position of the cursor in the line where the completion was
199 199 requested.
200 200 block : str, optional
201 201 The full block of code in which the completion is being requested.
202 202
203 203 Returns
204 204 -------
205 205 The msg_id of the message sent.
206 206 """
207 207 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
208 208 msg = self.session.msg('complete_request', content)
209 209 self._queue_request(msg)
210 210 return msg['header']['msg_id']
211 211
212 212 def object_info(self, oname):
213 213 """Get metadata information about an object.
214 214
215 215 Parameters
216 216 ----------
217 217 oname : str
218 218 A string specifying the object name.
219 219
220 220 Returns
221 221 -------
222 222 The msg_id of the message sent.
223 223 """
224 224 content = dict(oname=oname)
225 225 msg = self.session.msg('object_info_request', content)
226 226 self._queue_request(msg)
227 227 return msg['header']['msg_id']
228 228
229 229 def history(self, index=None, raw=False, output=True):
230 230 """Get the history list.
231 231
232 232 Parameters
233 233 ----------
234 234 index : n or (n1, n2) or None
235 235 If n, then the last entries. If a tuple, then all in
236 236 range(n1, n2). If None, then all entries. Raises IndexError if
237 237 the format of index is incorrect.
238 238 raw : bool
239 239 If True, return the raw input.
240 240 output : bool
241 241 If True, then return the output as well.
242 242
243 243 Returns
244 244 -------
245 245 The msg_id of the message sent.
246 246 """
247 247 content = dict(index=index, raw=raw, output=output)
248 248 msg = self.session.msg('history_request', content)
249 249 self._queue_request(msg)
250 250 return msg['header']['msg_id']
251 251
252 252 def prompt(self):
253 253 """Requests a prompt number from the kernel.
254 254
255 255 Returns
256 256 -------
257 257 The msg_id of the message sent.
258 258 """
259 259 msg = self.session.msg('prompt_request')
260 260 self._queue_request(msg)
261 261 return msg['header']['msg_id']
262 262
263 263 def _handle_events(self, socket, events):
264 264 if events & POLLERR:
265 265 self._handle_err()
266 266 if events & POLLOUT:
267 267 self._handle_send()
268 268 if events & POLLIN:
269 269 self._handle_recv()
270 270
271 271 def _handle_recv(self):
272 272 msg = self.socket.recv_json()
273 273 self.call_handlers(msg)
274 274
275 275 def _handle_send(self):
276 276 try:
277 277 msg = self.command_queue.get(False)
278 278 except Empty:
279 279 pass
280 280 else:
281 281 self.socket.send_json(msg)
282 282 if self.command_queue.empty():
283 283 self.drop_io_state(POLLOUT)
284 284
285 285 def _handle_err(self):
286 286 # We don't want to let this go silently, so eventually we should log.
287 287 raise zmq.ZMQError()
288 288
289 289 def _queue_request(self, msg):
290 290 self.command_queue.put(msg)
291 291 self.add_io_state(POLLOUT)
292 292
293 293
294 294 class SubSocketChannel(ZmqSocketChannel):
295 295 """The SUB channel which listens for messages that the kernel publishes.
296 296 """
297 297
298 298 def __init__(self, context, session, address):
299 299 super(SubSocketChannel, self).__init__(context, session, address)
300 300
301 301 def run(self):
302 302 """The thread's main activity. Call start() instead."""
303 303 self.socket = self.context.socket(zmq.SUB)
304 304 self.socket.setsockopt(zmq.SUBSCRIBE,'')
305 305 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
306 306 self.socket.connect('tcp://%s:%i' % self.address)
307 307 self.ioloop = ioloop.IOLoop()
308 308 self.iostate = POLLIN|POLLERR
309 309 self.ioloop.add_handler(self.socket, self._handle_events,
310 310 self.iostate)
311 311 self.ioloop.start()
312 312
313 313 def stop(self):
314 314 self.ioloop.stop()
315 315 super(SubSocketChannel, self).stop()
316 316
317 317 def call_handlers(self, msg):
318 318 """This method is called in the ioloop thread when a message arrives.
319 319
320 320 Subclasses should override this method to handle incoming messages.
321 321 It is important to remember that this method is called in the thread
322 322 so that some logic must be done to ensure that the application leve
323 323 handlers are called in the application thread.
324 324 """
325 325 raise NotImplementedError('call_handlers must be defined in a subclass.')
326 326
327 327 def flush(self, timeout=1.0):
328 328 """Immediately processes all pending messages on the SUB channel.
329 329
330 330 Callers should use this method to ensure that :method:`call_handlers`
331 331 has been called for all messages that have been received on the
332 332 0MQ SUB socket of this channel.
333 333
334 334 This method is thread safe.
335 335
336 336 Parameters
337 337 ----------
338 338 timeout : float, optional
339 339 The maximum amount of time to spend flushing, in seconds. The
340 340 default is one second.
341 341 """
342 342 # We do the IOLoop callback process twice to ensure that the IOLoop
343 343 # gets to perform at least one full poll.
344 344 stop_time = time.time() + timeout
345 345 for i in xrange(2):
346 346 self._flushed = False
347 347 self.ioloop.add_callback(self._flush)
348 348 while not self._flushed and time.time() < stop_time:
349 349 time.sleep(0.01)
350 350
351 351 def _handle_events(self, socket, events):
352 352 # Turn on and off POLLOUT depending on if we have made a request
353 353 if events & POLLERR:
354 354 self._handle_err()
355 355 if events & POLLIN:
356 356 self._handle_recv()
357 357
358 358 def _handle_err(self):
359 359 # We don't want to let this go silently, so eventually we should log.
360 360 raise zmq.ZMQError()
361 361
362 362 def _handle_recv(self):
363 363 # Get all of the messages we can
364 364 while True:
365 365 try:
366 366 msg = self.socket.recv_json(zmq.NOBLOCK)
367 367 except zmq.ZMQError:
368 368 # Check the errno?
369 369 # Will this trigger POLLERR?
370 370 break
371 371 else:
372 372 self.call_handlers(msg)
373 373
374 374 def _flush(self):
375 375 """Callback for :method:`self.flush`."""
376 376 self._flushed = True
377 377
378 378
379 379 class RepSocketChannel(ZmqSocketChannel):
380 380 """A reply channel to handle raw_input requests that the kernel makes."""
381 381
382 382 msg_queue = None
383 383
384 384 def __init__(self, context, session, address):
385 385 self.msg_queue = Queue()
386 386 super(RepSocketChannel, self).__init__(context, session, address)
387 387
388 388 def run(self):
389 389 """The thread's main activity. Call start() instead."""
390 390 self.socket = self.context.socket(zmq.XREQ)
391 391 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
392 392 self.socket.connect('tcp://%s:%i' % self.address)
393 393 self.ioloop = ioloop.IOLoop()
394 394 self.iostate = POLLERR|POLLIN
395 395 self.ioloop.add_handler(self.socket, self._handle_events,
396 396 self.iostate)
397 397 self.ioloop.start()
398 398
399 399 def stop(self):
400 400 self.ioloop.stop()
401 401 super(RepSocketChannel, self).stop()
402 402
403 403 def call_handlers(self, msg):
404 404 """This method is called in the ioloop thread when a message arrives.
405 405
406 406 Subclasses should override this method to handle incoming messages.
407 407 It is important to remember that this method is called in the thread
408 408 so that some logic must be done to ensure that the application leve
409 409 handlers are called in the application thread.
410 410 """
411 411 raise NotImplementedError('call_handlers must be defined in a subclass.')
412 412
413 413 def input(self, string):
414 414 """Send a string of raw input to the kernel."""
415 415 content = dict(value=string)
416 416 msg = self.session.msg('input_reply', content)
417 417 self._queue_reply(msg)
418 418
419 419 def _handle_events(self, socket, events):
420 420 if events & POLLERR:
421 421 self._handle_err()
422 422 if events & POLLOUT:
423 423 self._handle_send()
424 424 if events & POLLIN:
425 425 self._handle_recv()
426 426
427 427 def _handle_recv(self):
428 428 msg = self.socket.recv_json()
429 429 self.call_handlers(msg)
430 430
431 431 def _handle_send(self):
432 432 try:
433 433 msg = self.msg_queue.get(False)
434 434 except Empty:
435 435 pass
436 436 else:
437 437 self.socket.send_json(msg)
438 438 if self.msg_queue.empty():
439 439 self.drop_io_state(POLLOUT)
440 440
441 441 def _handle_err(self):
442 442 # We don't want to let this go silently, so eventually we should log.
443 443 raise zmq.ZMQError()
444 444
445 445 def _queue_reply(self, msg):
446 446 self.msg_queue.put(msg)
447 447 self.add_io_state(POLLOUT)
448 448
449 449
450 450 class HBSocketChannel(ZmqSocketChannel):
451 """The heartbeat channel which monitors the kernel heartbeat.
452 """
451 """The heartbeat channel which monitors the kernel heartbeat."""
453 452
454 453 time_to_dead = 5.0
455 454 socket = None
456 455 poller = None
457 456
458 457 def __init__(self, context, session, address):
459 458 super(HBSocketChannel, self).__init__(context, session, address)
459 self._running = False
460 460
461 461 def _create_socket(self):
462 462 self.socket = self.context.socket(zmq.REQ)
463 463 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
464 464 self.socket.connect('tcp://%s:%i' % self.address)
465 465 self.poller = zmq.Poller()
466 466 self.poller.register(self.socket, zmq.POLLIN)
467 467
468 468 def run(self):
469 469 """The thread's main activity. Call start() instead."""
470 470 self._create_socket()
471 471
472 while True:
472 self._running = True
473 while self._running:
473 474 since_last_heartbeat = 0.0
474 475 request_time = time.time()
475 476 try:
476 477 self.socket.send_json('ping')
477 478 except zmq.ZMQError, e:
478 479 if e.errno == zmq.EFSM:
479 480 time.sleep(self.time_to_dead)
480 481 self._create_socket()
481 482 else:
482 483 raise
483 484 else:
484 485 while True:
485 486 try:
486 487 reply = self.socket.recv_json(zmq.NOBLOCK)
487 488 except zmq.ZMQError, e:
488 489 if e.errno == zmq.EAGAIN:
489 until_dead = self.time_to_dead-(time.time()-request_time)
490 until_dead = self.time_to_dead - (time.time() -
491 request_time)
490 492 self.poller.poll(until_dead)
491 493 since_last_heartbeat = time.time() - request_time
492 494 if since_last_heartbeat > self.time_to_dead:
493 495 self.call_handlers(since_last_heartbeat)
494 496 break
495 497 else:
496 # We should probably log this instead
498 # FIXME: We should probably log this instead.
497 499 raise
498 500 else:
499 until_dead = self.time_to_dead-(time.time()-request_time)
501 until_dead = self.time_to_dead - (time.time() -
502 request_time)
500 503 if until_dead > 0.0:
501 504 time.sleep(until_dead)
502 505 break
503 506
507 def stop(self):
508 self._running = False
509 super(HBSocketChannel, self).stop()
510
504 511 def call_handlers(self, since_last_heartbeat):
505 512 """This method is called in the ioloop thread when a message arrives.
506 513
507 514 Subclasses should override this method to handle incoming messages.
508 515 It is important to remember that this method is called in the thread
509 516 so that some logic must be done to ensure that the application leve
510 517 handlers are called in the application thread.
511 518 """
512 519 raise NotImplementedError('call_handlers must be defined in a subclass.')
513 520
514 521
515 522 #-----------------------------------------------------------------------------
516 523 # Main kernel manager class
517 524 #-----------------------------------------------------------------------------
518 525
519 526 class KernelManager(HasTraits):
520 527 """ Manages a kernel for a frontend.
521 528
522 529 The SUB channel is for the frontend to receive messages published by the
523 530 kernel.
524 531
525 532 The REQ channel is for the frontend to make requests of the kernel.
526 533
527 534 The REP channel is for the kernel to request stdin (raw_input) from the
528 535 frontend.
529 536 """
530 537 # The PyZMQ Context to use for communication with the kernel.
531 538 context = Instance(zmq.Context,(),{})
532 539
533 540 # The Session to use for communication with the kernel.
534 541 session = Instance(Session,(),{})
535 542
536 543 # The kernel process with which the KernelManager is communicating.
537 544 kernel = Instance(Popen)
538 545
539 546 # The addresses for the communication channels.
540 547 xreq_address = TCPAddress((LOCALHOST, 0))
541 548 sub_address = TCPAddress((LOCALHOST, 0))
542 549 rep_address = TCPAddress((LOCALHOST, 0))
543 550 hb_address = TCPAddress((LOCALHOST, 0))
544 551
545 552 # The classes to use for the various channels.
546 553 xreq_channel_class = Type(XReqSocketChannel)
547 554 sub_channel_class = Type(SubSocketChannel)
548 555 rep_channel_class = Type(RepSocketChannel)
549 556 hb_channel_class = Type(HBSocketChannel)
550 557
551 558 # Protected traits.
552 559 _launch_args = Any
553 560 _xreq_channel = Any
554 561 _sub_channel = Any
555 562 _rep_channel = Any
556 563 _hb_channel = Any
557 564
558 565 #--------------------------------------------------------------------------
559 566 # Channel management methods:
560 567 #--------------------------------------------------------------------------
561 568
562 569 def start_channels(self):
563 570 """Starts the channels for this kernel.
564 571
565 572 This will create the channels if they do not exist and then start
566 573 them. If port numbers of 0 are being used (random ports) then you
567 574 must first call :method:`start_kernel`. If the channels have been
568 575 stopped and you call this, :class:`RuntimeError` will be raised.
569 576 """
570 577 self.xreq_channel.start()
571 578 self.sub_channel.start()
572 579 self.rep_channel.start()
573 580 self.hb_channel.start()
574 581
575 582 def stop_channels(self):
576 583 """Stops the channels for this kernel.
577 584
578 585 This stops the channels by joining their threads. If the channels
579 586 were not started, :class:`RuntimeError` will be raised.
580 587 """
581 588 self.xreq_channel.stop()
582 589 self.sub_channel.stop()
583 590 self.rep_channel.stop()
584 591 self.hb_channel.stop()
585 592
586 593 @property
587 594 def channels_running(self):
588 595 """Are all of the channels created and running?"""
589 596 return self.xreq_channel.is_alive() \
590 597 and self.sub_channel.is_alive() \
591 598 and self.rep_channel.is_alive() \
592 599 and self.hb_channel.is_alive()
593 600
594 601 #--------------------------------------------------------------------------
595 602 # Kernel process management methods:
596 603 #--------------------------------------------------------------------------
597 604
598 605 def start_kernel(self, **kw):
599 606 """Starts a kernel process and configures the manager to use it.
600 607
601 608 If random ports (port=0) are being used, this method must be called
602 609 before the channels are created.
603 610
604 611 Parameters:
605 612 -----------
606 613 ipython : bool, optional (default True)
607 614 Whether to use an IPython kernel instead of a plain Python kernel.
608 615 """
609 616 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
610 617 self.rep_address, self.hb_address
611 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST or hb[0] != LOCALHOST:
618 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
619 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
612 620 raise RuntimeError("Can only launch a kernel on localhost."
613 621 "Make sure that the '*_address' attributes are "
614 622 "configured properly.")
615 623
616 624 self._launch_args = kw.copy()
617 625 if kw.pop('ipython', True):
618 626 from ipkernel import launch_kernel as launch
619 627 else:
620 628 from pykernel import launch_kernel as launch
621 629 self.kernel, xrep, pub, req, hb = launch(
622 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1],
623 hb_port=hb[1], **kw)
630 xrep_port=xreq[1], pub_port=sub[1],
631 req_port=rep[1], hb_port=hb[1], **kw)
624 632 self.xreq_address = (LOCALHOST, xrep)
625 633 self.sub_address = (LOCALHOST, pub)
626 634 self.rep_address = (LOCALHOST, req)
627 635 self.hb_address = (LOCALHOST, hb)
628 636
629 637 def restart_kernel(self):
630 638 """Restarts a kernel with the same arguments that were used to launch
631 639 it. If the old kernel was launched with random ports, the same ports
632 640 will be used for the new kernel.
633 641 """
634 642 if self._launch_args is None:
635 643 raise RuntimeError("Cannot restart the kernel. "
636 644 "No previous call to 'start_kernel'.")
637 645 else:
638 646 if self.has_kernel:
639 647 self.kill_kernel()
640 self.start_kernel(*self._launch_args)
648 self.start_kernel(**self._launch_args)
641 649
642 650 @property
643 651 def has_kernel(self):
644 652 """Returns whether a kernel process has been specified for the kernel
645 653 manager.
646 654 """
647 655 return self.kernel is not None
648 656
649 657 def kill_kernel(self):
650 658 """ Kill the running kernel. """
651 659 if self.kernel is not None:
652 660 self.kernel.kill()
653 661 self.kernel = None
654 662 else:
655 663 raise RuntimeError("Cannot kill kernel. No kernel is running!")
656 664
657 665 def signal_kernel(self, signum):
658 666 """ Sends a signal to the kernel. """
659 667 if self.kernel is not None:
660 668 self.kernel.send_signal(signum)
661 669 else:
662 670 raise RuntimeError("Cannot signal kernel. No kernel is running!")
663 671
664 672 @property
665 673 def is_alive(self):
666 674 """Is the kernel process still running?"""
667 675 if self.kernel is not None:
668 676 if self.kernel.poll() is None:
669 677 return True
670 678 else:
671 679 return False
672 680 else:
673 681 # We didn't start the kernel with this KernelManager so we don't
674 682 # know if it is running. We should use a heartbeat for this case.
675 683 return True
676 684
677 685 #--------------------------------------------------------------------------
678 686 # Channels used for communication with the kernel:
679 687 #--------------------------------------------------------------------------
680 688
681 689 @property
682 690 def xreq_channel(self):
683 691 """Get the REQ socket channel object to make requests of the kernel."""
684 692 if self._xreq_channel is None:
685 693 self._xreq_channel = self.xreq_channel_class(self.context,
686 694 self.session,
687 695 self.xreq_address)
688 696 return self._xreq_channel
689 697
690 698 @property
691 699 def sub_channel(self):
692 700 """Get the SUB socket channel object."""
693 701 if self._sub_channel is None:
694 702 self._sub_channel = self.sub_channel_class(self.context,
695 703 self.session,
696 704 self.sub_address)
697 705 return self._sub_channel
698 706
699 707 @property
700 708 def rep_channel(self):
701 709 """Get the REP socket channel object to handle stdin (raw_input)."""
702 710 if self._rep_channel is None:
703 711 self._rep_channel = self.rep_channel_class(self.context,
704 712 self.session,
705 713 self.rep_address)
706 714 return self._rep_channel
707 715
708 716 @property
709 717 def hb_channel(self):
710 718 """Get the REP socket channel object to handle stdin (raw_input)."""
711 719 if self._hb_channel is None:
712 720 self._hb_channel = self.hb_channel_class(self.context,
713 721 self.session,
714 722 self.hb_address)
715 723 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now