##// END OF EJS Templates
added reset:<bool> to shutdown_request content; shutdown_message broadcast on PUB for multiclient notification
MinRK -
Show More
@@ -1,626 +1,627
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24
25 25 # System library imports.
26 26 import zmq
27 27
28 28 # Local imports.
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.utils import io
31 31 from IPython.utils.jsonutil import json_clean
32 32 from IPython.lib import pylabtools
33 33 from IPython.utils.traitlets import Instance, Float
34 34 from entry_point import (base_launch_kernel, make_argument_parser, make_kernel,
35 35 start_kernel)
36 36 from iostream import OutStream
37 37 from session import Session, Message
38 38 from zmqshell import ZMQInteractiveShell
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Main kernel class
42 42 #-----------------------------------------------------------------------------
43 43
44 44 class Kernel(Configurable):
45 45
46 46 #---------------------------------------------------------------------------
47 47 # Kernel interface
48 48 #---------------------------------------------------------------------------
49 49
50 50 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
51 51 session = Instance(Session)
52 52 reply_socket = Instance('zmq.Socket')
53 53 pub_socket = Instance('zmq.Socket')
54 54 req_socket = Instance('zmq.Socket')
55 55
56 56 # Private interface
57 57
58 58 # Time to sleep after flushing the stdout/err buffers in each execute
59 59 # cycle. While this introduces a hard limit on the minimal latency of the
60 60 # execute cycle, it helps prevent output synchronization problems for
61 61 # clients.
62 62 # Units are in seconds. The minimum zmq latency on local host is probably
63 63 # ~150 microseconds, set this to 500us for now. We may need to increase it
64 64 # a little if it's not enough after more interactive testing.
65 65 _execute_sleep = Float(0.0005, config=True)
66 66
67 67 # Frequency of the kernel's event loop.
68 68 # Units are in seconds, kernel subclasses for GUI toolkits may need to
69 69 # adapt to milliseconds.
70 70 _poll_interval = Float(0.05, config=True)
71 71
72 72 # If the shutdown was requested over the network, we leave here the
73 73 # necessary reply message so it can be sent by our registered atexit
74 74 # handler. This ensures that the reply is only sent to clients truly at
75 75 # the end of our shutdown process (which happens after the underlying
76 76 # IPython shell's own shutdown).
77 77 _shutdown_message = None
78 78
79 79 # This is a dict of port number that the kernel is listening on. It is set
80 80 # by record_ports and used by connect_request.
81 81 _recorded_ports = None
82 82
83 83 def __init__(self, **kwargs):
84 84 super(Kernel, self).__init__(**kwargs)
85 85
86 86 # Before we even start up the shell, register *first* our exit handlers
87 87 # so they come before the shell's
88 88 atexit.register(self._at_shutdown)
89 89
90 90 # Initialize the InteractiveShell subclass
91 91 self.shell = ZMQInteractiveShell.instance()
92 92 self.shell.displayhook.session = self.session
93 93 self.shell.displayhook.pub_socket = self.pub_socket
94 94
95 95 # TMP - hack while developing
96 96 self.shell._reply_content = None
97 97
98 98 # Build dict of handlers for message types
99 99 msg_types = [ 'execute_request', 'complete_request',
100 100 'object_info_request', 'history_request',
101 101 'connect_request', 'shutdown_request']
102 102 self.handlers = {}
103 103 for msg_type in msg_types:
104 104 self.handlers[msg_type] = getattr(self, msg_type)
105 105
106 106 def do_one_iteration(self):
107 107 """Do one iteration of the kernel's evaluation loop.
108 108 """
109 109 try:
110 110 ident = self.reply_socket.recv(zmq.NOBLOCK)
111 111 except zmq.ZMQError, e:
112 112 if e.errno == zmq.EAGAIN:
113 113 return
114 114 else:
115 115 raise
116 116 # FIXME: Bug in pyzmq/zmq?
117 117 # assert self.reply_socket.rcvmore(), "Missing message part."
118 118 msg = self.reply_socket.recv_json()
119 119
120 120 # Print some info about this message and leave a '--->' marker, so it's
121 121 # easier to trace visually the message chain when debugging. Each
122 122 # handler prints its message at the end.
123 123 # Eventually we'll move these from stdout to a logger.
124 124 io.raw_print('\n*** MESSAGE TYPE:', msg['msg_type'], '***')
125 125 io.raw_print(' Content: ', msg['content'],
126 126 '\n --->\n ', sep='', end='')
127 127
128 128 # Find and call actual handler for message
129 129 handler = self.handlers.get(msg['msg_type'], None)
130 130 if handler is None:
131 131 io.raw_print_err("UNKNOWN MESSAGE TYPE:", msg)
132 132 else:
133 133 handler(ident, msg)
134 134
135 135 # Check whether we should exit, in case the incoming message set the
136 136 # exit flag on
137 137 if self.shell.exit_now:
138 138 io.raw_print('\nExiting IPython kernel...')
139 139 # We do a normal, clean exit, which allows any actions registered
140 140 # via atexit (such as history saving) to take place.
141 141 sys.exit(0)
142 142
143 143
144 144 def start(self):
145 145 """ Start the kernel main loop.
146 146 """
147 147 while True:
148 148 time.sleep(self._poll_interval)
149 149 self.do_one_iteration()
150 150
151 151 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
152 152 """Record the ports that this kernel is using.
153 153
154 154 The creator of the Kernel instance must call this methods if they
155 155 want the :meth:`connect_request` method to return the port numbers.
156 156 """
157 157 self._recorded_ports = {
158 158 'xrep_port' : xrep_port,
159 159 'pub_port' : pub_port,
160 160 'req_port' : req_port,
161 161 'hb_port' : hb_port
162 162 }
163 163
164 164 #---------------------------------------------------------------------------
165 165 # Kernel request handlers
166 166 #---------------------------------------------------------------------------
167 167
168 168 def _publish_pyin(self, code, parent):
169 169 """Publish the code request on the pyin stream."""
170 170
171 171 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
172 172 self.pub_socket.send_json(pyin_msg)
173 173
174 174 def execute_request(self, ident, parent):
175 175
176 176 status_msg = self.session.msg(
177 177 u'status',
178 178 {u'execution_state':u'busy'},
179 179 parent=parent
180 180 )
181 181 self.pub_socket.send_json(status_msg)
182 182
183 183 try:
184 184 content = parent[u'content']
185 185 code = content[u'code']
186 186 silent = content[u'silent']
187 187 except:
188 188 io.raw_print_err("Got bad msg: ")
189 189 io.raw_print_err(Message(parent))
190 190 return
191 191
192 192 shell = self.shell # we'll need this a lot here
193 193
194 194 # Replace raw_input. Note that is not sufficient to replace
195 195 # raw_input in the user namespace.
196 196 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
197 197 __builtin__.raw_input = raw_input
198 198
199 199 # Set the parent message of the display hook and out streams.
200 200 shell.displayhook.set_parent(parent)
201 201 sys.stdout.set_parent(parent)
202 202 sys.stderr.set_parent(parent)
203 203
204 204 # Re-broadcast our input for the benefit of listening clients, and
205 205 # start computing output
206 206 if not silent:
207 207 self._publish_pyin(code, parent)
208 208
209 209 reply_content = {}
210 210 try:
211 211 if silent:
212 212 # runcode uses 'exec' mode, so no displayhook will fire, and it
213 213 # doesn't call logging or history manipulations. Print
214 214 # statements in that code will obviously still execute.
215 215 shell.runcode(code)
216 216 else:
217 217 # FIXME: runlines calls the exception handler itself.
218 218 shell._reply_content = None
219 219
220 220 # For now leave this here until we're sure we can stop using it
221 221 #shell.runlines(code)
222 222
223 223 # Experimental: cell mode! Test more before turning into
224 224 # default and removing the hacks around runlines.
225 225 shell.run_cell(code)
226 226 except:
227 227 status = u'error'
228 228 # FIXME: this code right now isn't being used yet by default,
229 229 # because the runlines() call above directly fires off exception
230 230 # reporting. This code, therefore, is only active in the scenario
231 231 # where runlines itself has an unhandled exception. We need to
232 232 # uniformize this, for all exception construction to come from a
233 233 # single location in the codbase.
234 234 etype, evalue, tb = sys.exc_info()
235 235 tb_list = traceback.format_exception(etype, evalue, tb)
236 236 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
237 237 else:
238 238 status = u'ok'
239 239
240 240 reply_content[u'status'] = status
241 241 # Compute the execution counter so clients can display prompts
242 242 reply_content['execution_count'] = shell.displayhook.prompt_count
243 243
244 244 # FIXME - fish exception info out of shell, possibly left there by
245 245 # runlines. We'll need to clean up this logic later.
246 246 if shell._reply_content is not None:
247 247 reply_content.update(shell._reply_content)
248 248
249 249 # At this point, we can tell whether the main code execution succeeded
250 250 # or not. If it did, we proceed to evaluate user_variables/expressions
251 251 if reply_content['status'] == 'ok':
252 252 reply_content[u'user_variables'] = \
253 253 shell.user_variables(content[u'user_variables'])
254 254 reply_content[u'user_expressions'] = \
255 255 shell.user_expressions(content[u'user_expressions'])
256 256 else:
257 257 # If there was an error, don't even try to compute variables or
258 258 # expressions
259 259 reply_content[u'user_variables'] = {}
260 260 reply_content[u'user_expressions'] = {}
261 261
262 262 # Payloads should be retrieved regardless of outcome, so we can both
263 263 # recover partial output (that could have been generated early in a
264 264 # block, before an error) and clear the payload system always.
265 265 reply_content[u'payload'] = shell.payload_manager.read_payload()
266 266 # Be agressive about clearing the payload because we don't want
267 267 # it to sit in memory until the next execute_request comes in.
268 268 shell.payload_manager.clear_payload()
269 269
270 270 # Send the reply.
271 271 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
272 272 io.raw_print(reply_msg)
273 273
274 274 # Flush output before sending the reply.
275 275 sys.stdout.flush()
276 276 sys.stderr.flush()
277 277 # FIXME: on rare occasions, the flush doesn't seem to make it to the
278 278 # clients... This seems to mitigate the problem, but we definitely need
279 279 # to better understand what's going on.
280 280 if self._execute_sleep:
281 281 time.sleep(self._execute_sleep)
282 282
283 283 self.reply_socket.send(ident, zmq.SNDMORE)
284 284 self.reply_socket.send_json(reply_msg)
285 285 if reply_msg['content']['status'] == u'error':
286 286 self._abort_queue()
287 287
288 288 status_msg = self.session.msg(
289 289 u'status',
290 290 {u'execution_state':u'idle'},
291 291 parent=parent
292 292 )
293 293 self.pub_socket.send_json(status_msg)
294 294
295 295 def complete_request(self, ident, parent):
296 296 txt, matches = self._complete(parent)
297 297 matches = {'matches' : matches,
298 298 'matched_text' : txt,
299 299 'status' : 'ok'}
300 300 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
301 301 matches, parent, ident)
302 302 io.raw_print(completion_msg)
303 303
304 304 def object_info_request(self, ident, parent):
305 305 object_info = self.shell.object_inspect(parent['content']['oname'])
306 306 # Before we send this object over, we scrub it for JSON usage
307 307 oinfo = json_clean(object_info)
308 308 msg = self.session.send(self.reply_socket, 'object_info_reply',
309 309 oinfo, parent, ident)
310 310 io.raw_print(msg)
311 311
312 312 def history_request(self, ident, parent):
313 313 output = parent['content']['output']
314 314 index = parent['content']['index']
315 315 raw = parent['content']['raw']
316 316 hist = self.shell.get_history(index=index, raw=raw, output=output)
317 317 content = {'history' : hist}
318 318 msg = self.session.send(self.reply_socket, 'history_reply',
319 319 content, parent, ident)
320 320 io.raw_print(msg)
321 321
322 322 def connect_request(self, ident, parent):
323 323 if self._recorded_ports is not None:
324 324 content = self._recorded_ports.copy()
325 325 else:
326 326 content = {}
327 327 msg = self.session.send(self.reply_socket, 'connect_reply',
328 328 content, parent, ident)
329 329 io.raw_print(msg)
330 330
331 331 def shutdown_request(self, ident, parent):
332 332 self.shell.exit_now = True
333 self._shutdown_message = self.session.msg(u'shutdown_reply', {}, parent)
333 self._shutdown_message = self.session.msg(u'shutdown_reply', parent['content'], parent)
334 334 sys.exit(0)
335 335
336 336 #---------------------------------------------------------------------------
337 337 # Protected interface
338 338 #---------------------------------------------------------------------------
339 339
340 340 def _abort_queue(self):
341 341 while True:
342 342 try:
343 343 ident = self.reply_socket.recv(zmq.NOBLOCK)
344 344 except zmq.ZMQError, e:
345 345 if e.errno == zmq.EAGAIN:
346 346 break
347 347 else:
348 348 assert self.reply_socket.rcvmore(), \
349 349 "Unexpected missing message part."
350 350 msg = self.reply_socket.recv_json()
351 351 io.raw_print("Aborting:\n", Message(msg))
352 352 msg_type = msg['msg_type']
353 353 reply_type = msg_type.split('_')[0] + '_reply'
354 354 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
355 355 io.raw_print(reply_msg)
356 356 self.reply_socket.send(ident,zmq.SNDMORE)
357 357 self.reply_socket.send_json(reply_msg)
358 358 # We need to wait a bit for requests to come in. This can probably
359 359 # be set shorter for true asynchronous clients.
360 360 time.sleep(0.1)
361 361
362 362 def _raw_input(self, prompt, ident, parent):
363 363 # Flush output before making the request.
364 364 sys.stderr.flush()
365 365 sys.stdout.flush()
366 366
367 367 # Send the input request.
368 368 content = dict(prompt=prompt)
369 369 msg = self.session.msg(u'input_request', content, parent)
370 370 self.req_socket.send_json(msg)
371 371
372 372 # Await a response.
373 373 reply = self.req_socket.recv_json()
374 374 try:
375 375 value = reply['content']['value']
376 376 except:
377 377 io.raw_print_err("Got bad raw_input reply: ")
378 378 io.raw_print_err(Message(parent))
379 379 value = ''
380 380 return value
381 381
382 382 def _complete(self, msg):
383 383 c = msg['content']
384 384 try:
385 385 cpos = int(c['cursor_pos'])
386 386 except:
387 387 # If we don't get something that we can convert to an integer, at
388 388 # least attempt the completion guessing the cursor is at the end of
389 389 # the text, if there's any, and otherwise of the line
390 390 cpos = len(c['text'])
391 391 if cpos==0:
392 392 cpos = len(c['line'])
393 393 return self.shell.complete(c['text'], c['line'], cpos)
394 394
395 395 def _object_info(self, context):
396 396 symbol, leftover = self._symbol_from_context(context)
397 397 if symbol is not None and not leftover:
398 398 doc = getattr(symbol, '__doc__', '')
399 399 else:
400 400 doc = ''
401 401 object_info = dict(docstring = doc)
402 402 return object_info
403 403
404 404 def _symbol_from_context(self, context):
405 405 if not context:
406 406 return None, context
407 407
408 408 base_symbol_string = context[0]
409 409 symbol = self.shell.user_ns.get(base_symbol_string, None)
410 410 if symbol is None:
411 411 symbol = __builtin__.__dict__.get(base_symbol_string, None)
412 412 if symbol is None:
413 413 return None, context
414 414
415 415 context = context[1:]
416 416 for i, name in enumerate(context):
417 417 new_symbol = getattr(symbol, name, None)
418 418 if new_symbol is None:
419 419 return symbol, context[i:]
420 420 else:
421 421 symbol = new_symbol
422 422
423 423 return symbol, []
424 424
425 425 def _at_shutdown(self):
426 426 """Actions taken at shutdown by the kernel, called by python's atexit.
427 427 """
428 428 # io.rprint("Kernel at_shutdown") # dbg
429 429 if self._shutdown_message is not None:
430 430 self.reply_socket.send_json(self._shutdown_message)
431 self.pub_socket.send_json(self._shutdown_message)
431 432 io.raw_print(self._shutdown_message)
432 433 # A very short sleep to give zmq time to flush its message buffers
433 434 # before Python truly shuts down.
434 435 time.sleep(0.01)
435 436
436 437
437 438 class QtKernel(Kernel):
438 439 """A Kernel subclass with Qt support."""
439 440
440 441 def start(self):
441 442 """Start a kernel with QtPy4 event loop integration."""
442 443
443 444 from PyQt4 import QtCore
444 445 from IPython.lib.guisupport import get_app_qt4, start_event_loop_qt4
445 446
446 447 self.app = get_app_qt4([" "])
447 448 self.app.setQuitOnLastWindowClosed(False)
448 449 self.timer = QtCore.QTimer()
449 450 self.timer.timeout.connect(self.do_one_iteration)
450 451 # Units for the timer are in milliseconds
451 452 self.timer.start(1000*self._poll_interval)
452 453 start_event_loop_qt4(self.app)
453 454
454 455
455 456 class WxKernel(Kernel):
456 457 """A Kernel subclass with Wx support."""
457 458
458 459 def start(self):
459 460 """Start a kernel with wx event loop support."""
460 461
461 462 import wx
462 463 from IPython.lib.guisupport import start_event_loop_wx
463 464
464 465 doi = self.do_one_iteration
465 466 # Wx uses milliseconds
466 467 poll_interval = int(1000*self._poll_interval)
467 468
468 469 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
469 470 # We make the Frame hidden when we create it in the main app below.
470 471 class TimerFrame(wx.Frame):
471 472 def __init__(self, func):
472 473 wx.Frame.__init__(self, None, -1)
473 474 self.timer = wx.Timer(self)
474 475 # Units for the timer are in milliseconds
475 476 self.timer.Start(poll_interval)
476 477 self.Bind(wx.EVT_TIMER, self.on_timer)
477 478 self.func = func
478 479
479 480 def on_timer(self, event):
480 481 self.func()
481 482
482 483 # We need a custom wx.App to create our Frame subclass that has the
483 484 # wx.Timer to drive the ZMQ event loop.
484 485 class IPWxApp(wx.App):
485 486 def OnInit(self):
486 487 self.frame = TimerFrame(doi)
487 488 self.frame.Show(False)
488 489 return True
489 490
490 491 # The redirect=False here makes sure that wx doesn't replace
491 492 # sys.stdout/stderr with its own classes.
492 493 self.app = IPWxApp(redirect=False)
493 494 start_event_loop_wx(self.app)
494 495
495 496
496 497 class TkKernel(Kernel):
497 498 """A Kernel subclass with Tk support."""
498 499
499 500 def start(self):
500 501 """Start a Tk enabled event loop."""
501 502
502 503 import Tkinter
503 504 doi = self.do_one_iteration
504 505 # Tk uses milliseconds
505 506 poll_interval = int(1000*self._poll_interval)
506 507 # For Tkinter, we create a Tk object and call its withdraw method.
507 508 class Timer(object):
508 509 def __init__(self, func):
509 510 self.app = Tkinter.Tk()
510 511 self.app.withdraw()
511 512 self.func = func
512 513
513 514 def on_timer(self):
514 515 self.func()
515 516 self.app.after(poll_interval, self.on_timer)
516 517
517 518 def start(self):
518 519 self.on_timer() # Call it once to get things going.
519 520 self.app.mainloop()
520 521
521 522 self.timer = Timer(doi)
522 523 self.timer.start()
523 524
524 525
525 526 class GTKKernel(Kernel):
526 527 """A Kernel subclass with GTK support."""
527 528
528 529 def start(self):
529 530 """Start the kernel, coordinating with the GTK event loop"""
530 531 from .gui.gtkembed import GTKEmbed
531 532
532 533 gtk_kernel = GTKEmbed(self)
533 534 gtk_kernel.start()
534 535
535 536
536 537 #-----------------------------------------------------------------------------
537 538 # Kernel main and launch functions
538 539 #-----------------------------------------------------------------------------
539 540
540 541 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, hb_port=0,
541 542 independent=False, pylab=False):
542 543 """Launches a localhost kernel, binding to the specified ports.
543 544
544 545 Parameters
545 546 ----------
546 547 xrep_port : int, optional
547 548 The port to use for XREP channel.
548 549
549 550 pub_port : int, optional
550 551 The port to use for the SUB channel.
551 552
552 553 req_port : int, optional
553 554 The port to use for the REQ (raw input) channel.
554 555
555 556 hb_port : int, optional
556 557 The port to use for the hearbeat REP channel.
557 558
558 559 independent : bool, optional (default False)
559 560 If set, the kernel process is guaranteed to survive if this process
560 561 dies. If not set, an effort is made to ensure that the kernel is killed
561 562 when this process dies. Note that in this case it is still good practice
562 563 to kill kernels manually before exiting.
563 564
564 565 pylab : bool or string, optional (default False)
565 566 If not False, the kernel will be launched with pylab enabled. If a
566 567 string is passed, matplotlib will use the specified backend. Otherwise,
567 568 matplotlib's default backend will be used.
568 569
569 570 Returns
570 571 -------
571 572 A tuple of form:
572 573 (kernel_process, xrep_port, pub_port, req_port)
573 574 where kernel_process is a Popen object and the ports are integers.
574 575 """
575 576 extra_arguments = []
576 577 if pylab:
577 578 extra_arguments.append('--pylab')
578 579 if isinstance(pylab, basestring):
579 580 extra_arguments.append(pylab)
580 581 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
581 582 xrep_port, pub_port, req_port, hb_port,
582 583 independent, extra_arguments)
583 584
584 585
585 586 def main():
586 587 """ The IPython kernel main entry point.
587 588 """
588 589 parser = make_argument_parser()
589 590 parser.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
590 591 const='auto', help = \
591 592 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
592 593 given, the GUI backend is matplotlib's, otherwise use one of: \
593 594 ['tk', 'gtk', 'qt', 'wx', 'inline'].")
594 595 namespace = parser.parse_args()
595 596
596 597 kernel_class = Kernel
597 598
598 599 kernel_classes = {
599 600 'qt' : QtKernel,
600 601 'qt4': QtKernel,
601 602 'inline': Kernel,
602 603 'wx' : WxKernel,
603 604 'tk' : TkKernel,
604 605 'gtk': GTKKernel,
605 606 }
606 607 if namespace.pylab:
607 608 if namespace.pylab == 'auto':
608 609 gui, backend = pylabtools.find_gui_and_backend()
609 610 else:
610 611 gui, backend = pylabtools.find_gui_and_backend(namespace.pylab)
611 612 kernel_class = kernel_classes.get(gui)
612 613 if kernel_class is None:
613 614 raise ValueError('GUI is not supported: %r' % gui)
614 615 pylabtools.activate_matplotlib(backend)
615 616
616 617 kernel = make_kernel(namespace, kernel_class, OutStream)
617 618
618 619 if namespace.pylab:
619 620 pylabtools.import_pylab(kernel.shell.user_ns, backend,
620 621 shell=kernel.shell)
621 622
622 623 start_kernel(namespace, kernel)
623 624
624 625
625 626 if __name__ == '__main__':
626 627 main()
@@ -1,905 +1,905
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 from Queue import Queue, Empty
21 21 from subprocess import Popen
22 22 import signal
23 23 import sys
24 24 from threading import Thread
25 25 import time
26 26
27 27 # System library imports.
28 28 import zmq
29 29 from zmq import POLLIN, POLLOUT, POLLERR
30 30 from zmq.eventloop import ioloop
31 31
32 32 # Local imports.
33 33 from IPython.utils import io
34 34 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
35 35 from session import Session
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Constants and exceptions
39 39 #-----------------------------------------------------------------------------
40 40
41 41 LOCALHOST = '127.0.0.1'
42 42
43 43 class InvalidPortNumber(Exception):
44 44 pass
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Utility functions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 # some utilities to validate message structure, these might get moved elsewhere
51 51 # if they prove to have more generic utility
52 52
53 53 def validate_string_list(lst):
54 54 """Validate that the input is a list of strings.
55 55
56 56 Raises ValueError if not."""
57 57 if not isinstance(lst, list):
58 58 raise ValueError('input %r must be a list' % lst)
59 59 for x in lst:
60 60 if not isinstance(x, basestring):
61 61 raise ValueError('element %r in list must be a string' % x)
62 62
63 63
64 64 def validate_string_dict(dct):
65 65 """Validate that the input is a dict with string keys and values.
66 66
67 67 Raises ValueError if not."""
68 68 for k,v in dct.iteritems():
69 69 if not isinstance(k, basestring):
70 70 raise ValueError('key %r in dict must be a string' % k)
71 71 if not isinstance(v, basestring):
72 72 raise ValueError('value %r in dict must be a string' % v)
73 73
74 74
75 75 #-----------------------------------------------------------------------------
76 76 # ZMQ Socket Channel classes
77 77 #-----------------------------------------------------------------------------
78 78
79 79 class ZmqSocketChannel(Thread):
80 80 """The base class for the channels that use ZMQ sockets.
81 81 """
82 82 context = None
83 83 session = None
84 84 socket = None
85 85 ioloop = None
86 86 iostate = None
87 87 _address = None
88 88
89 89 def __init__(self, context, session, address):
90 90 """Create a channel
91 91
92 92 Parameters
93 93 ----------
94 94 context : :class:`zmq.Context`
95 95 The ZMQ context to use.
96 96 session : :class:`session.Session`
97 97 The session to use.
98 98 address : tuple
99 99 Standard (ip, port) tuple that the kernel is listening on.
100 100 """
101 101 super(ZmqSocketChannel, self).__init__()
102 102 self.daemon = True
103 103
104 104 self.context = context
105 105 self.session = session
106 106 if address[1] == 0:
107 107 message = 'The port number for a channel cannot be 0.'
108 108 raise InvalidPortNumber(message)
109 109 self._address = address
110 110
111 111 def stop(self):
112 112 """Stop the channel's activity.
113 113
114 114 This calls :method:`Thread.join` and returns when the thread
115 115 terminates. :class:`RuntimeError` will be raised if
116 116 :method:`self.start` is called again.
117 117 """
118 118 self.join()
119 119
120 120 @property
121 121 def address(self):
122 122 """Get the channel's address as an (ip, port) tuple.
123 123
124 124 By the default, the address is (localhost, 0), where 0 means a random
125 125 port.
126 126 """
127 127 return self._address
128 128
129 129 def add_io_state(self, state):
130 130 """Add IO state to the eventloop.
131 131
132 132 Parameters
133 133 ----------
134 134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 135 The IO state flag to set.
136 136
137 137 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 138 """
139 139 def add_io_state_callback():
140 140 if not self.iostate & state:
141 141 self.iostate = self.iostate | state
142 142 self.ioloop.update_handler(self.socket, self.iostate)
143 143 self.ioloop.add_callback(add_io_state_callback)
144 144
145 145 def drop_io_state(self, state):
146 146 """Drop IO state from the eventloop.
147 147
148 148 Parameters
149 149 ----------
150 150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 151 The IO state flag to set.
152 152
153 153 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 154 """
155 155 def drop_io_state_callback():
156 156 if self.iostate & state:
157 157 self.iostate = self.iostate & (~state)
158 158 self.ioloop.update_handler(self.socket, self.iostate)
159 159 self.ioloop.add_callback(drop_io_state_callback)
160 160
161 161
162 162 class XReqSocketChannel(ZmqSocketChannel):
163 163 """The XREQ channel for issues request/replies to the kernel.
164 164 """
165 165
166 166 command_queue = None
167 167
168 168 def __init__(self, context, session, address):
169 169 super(XReqSocketChannel, self).__init__(context, session, address)
170 170 self.command_queue = Queue()
171 171 self.ioloop = ioloop.IOLoop()
172 172
173 173 def run(self):
174 174 """The thread's main activity. Call start() instead."""
175 175 self.socket = self.context.socket(zmq.XREQ)
176 176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 177 self.socket.connect('tcp://%s:%i' % self.address)
178 178 self.iostate = POLLERR|POLLIN
179 179 self.ioloop.add_handler(self.socket, self._handle_events,
180 180 self.iostate)
181 181 self.ioloop.start()
182 182
183 183 def stop(self):
184 184 self.ioloop.stop()
185 185 super(XReqSocketChannel, self).stop()
186 186
187 187 def call_handlers(self, msg):
188 188 """This method is called in the ioloop thread when a message arrives.
189 189
190 190 Subclasses should override this method to handle incoming messages.
191 191 It is important to remember that this method is called in the thread
192 192 so that some logic must be done to ensure that the application leve
193 193 handlers are called in the application thread.
194 194 """
195 195 raise NotImplementedError('call_handlers must be defined in a subclass.')
196 196
197 197 def execute(self, code, silent=False,
198 198 user_variables=None, user_expressions=None):
199 199 """Execute code in the kernel.
200 200
201 201 Parameters
202 202 ----------
203 203 code : str
204 204 A string of Python code.
205 205
206 206 silent : bool, optional (default False)
207 207 If set, the kernel will execute the code as quietly possible.
208 208
209 209 user_variables : list, optional
210 210 A list of variable names to pull from the user's namespace. They
211 211 will come back as a dict with these names as keys and their
212 212 :func:`repr` as values.
213 213
214 214 user_expressions : dict, optional
215 215 A dict with string keys and to pull from the user's
216 216 namespace. They will come back as a dict with these names as keys
217 217 and their :func:`repr` as values.
218 218
219 219 Returns
220 220 -------
221 221 The msg_id of the message sent.
222 222 """
223 223 if user_variables is None:
224 224 user_variables = []
225 225 if user_expressions is None:
226 226 user_expressions = {}
227 227
228 228 # Don't waste network traffic if inputs are invalid
229 229 if not isinstance(code, basestring):
230 230 raise ValueError('code %r must be a string' % code)
231 231 validate_string_list(user_variables)
232 232 validate_string_dict(user_expressions)
233 233
234 234 # Create class for content/msg creation. Related to, but possibly
235 235 # not in Session.
236 236 content = dict(code=code, silent=silent,
237 237 user_variables=user_variables,
238 238 user_expressions=user_expressions)
239 239 msg = self.session.msg('execute_request', content)
240 240 self._queue_request(msg)
241 241 return msg['header']['msg_id']
242 242
243 243 def complete(self, text, line, cursor_pos, block=None):
244 244 """Tab complete text in the kernel's namespace.
245 245
246 246 Parameters
247 247 ----------
248 248 text : str
249 249 The text to complete.
250 250 line : str
251 251 The full line of text that is the surrounding context for the
252 252 text to complete.
253 253 cursor_pos : int
254 254 The position of the cursor in the line where the completion was
255 255 requested.
256 256 block : str, optional
257 257 The full block of code in which the completion is being requested.
258 258
259 259 Returns
260 260 -------
261 261 The msg_id of the message sent.
262 262 """
263 263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 264 msg = self.session.msg('complete_request', content)
265 265 self._queue_request(msg)
266 266 return msg['header']['msg_id']
267 267
268 268 def object_info(self, oname):
269 269 """Get metadata information about an object.
270 270
271 271 Parameters
272 272 ----------
273 273 oname : str
274 274 A string specifying the object name.
275 275
276 276 Returns
277 277 -------
278 278 The msg_id of the message sent.
279 279 """
280 280 content = dict(oname=oname)
281 281 msg = self.session.msg('object_info_request', content)
282 282 self._queue_request(msg)
283 283 return msg['header']['msg_id']
284 284
285 285 def history(self, index=None, raw=False, output=True):
286 286 """Get the history list.
287 287
288 288 Parameters
289 289 ----------
290 290 index : n or (n1, n2) or None
291 291 If n, then the last entries. If a tuple, then all in
292 292 range(n1, n2). If None, then all entries. Raises IndexError if
293 293 the format of index is incorrect.
294 294 raw : bool
295 295 If True, return the raw input.
296 296 output : bool
297 297 If True, then return the output as well.
298 298
299 299 Returns
300 300 -------
301 301 The msg_id of the message sent.
302 302 """
303 303 content = dict(index=index, raw=raw, output=output)
304 304 msg = self.session.msg('history_request', content)
305 305 self._queue_request(msg)
306 306 return msg['header']['msg_id']
307 307
308 def shutdown(self):
308 def shutdown(self, restart=False):
309 309 """Request an immediate kernel shutdown.
310 310
311 311 Upon receipt of the (empty) reply, client code can safely assume that
312 312 the kernel has shut down and it's safe to forcefully terminate it if
313 313 it's still alive.
314 314
315 315 The kernel will send the reply via a function registered with Python's
316 316 atexit module, ensuring it's truly done as the kernel is done with all
317 317 normal operation.
318 318 """
319 319 # Send quit message to kernel. Once we implement kernel-side setattr,
320 320 # this should probably be done that way, but for now this will do.
321 msg = self.session.msg('shutdown_request', {})
321 msg = self.session.msg('shutdown_request', {'restart':restart})
322 322 self._queue_request(msg)
323 323 return msg['header']['msg_id']
324 324
325 325 def _handle_events(self, socket, events):
326 326 if events & POLLERR:
327 327 self._handle_err()
328 328 if events & POLLOUT:
329 329 self._handle_send()
330 330 if events & POLLIN:
331 331 self._handle_recv()
332 332
333 333 def _handle_recv(self):
334 334 msg = self.socket.recv_json()
335 335 self.call_handlers(msg)
336 336
337 337 def _handle_send(self):
338 338 try:
339 339 msg = self.command_queue.get(False)
340 340 except Empty:
341 341 pass
342 342 else:
343 343 self.socket.send_json(msg)
344 344 if self.command_queue.empty():
345 345 self.drop_io_state(POLLOUT)
346 346
347 347 def _handle_err(self):
348 348 # We don't want to let this go silently, so eventually we should log.
349 349 raise zmq.ZMQError()
350 350
351 351 def _queue_request(self, msg):
352 352 self.command_queue.put(msg)
353 353 self.add_io_state(POLLOUT)
354 354
355 355
356 356 class SubSocketChannel(ZmqSocketChannel):
357 357 """The SUB channel which listens for messages that the kernel publishes.
358 358 """
359 359
360 360 def __init__(self, context, session, address):
361 361 super(SubSocketChannel, self).__init__(context, session, address)
362 362 self.ioloop = ioloop.IOLoop()
363 363
364 364 def run(self):
365 365 """The thread's main activity. Call start() instead."""
366 366 self.socket = self.context.socket(zmq.SUB)
367 367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
368 368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
369 369 self.socket.connect('tcp://%s:%i' % self.address)
370 370 self.iostate = POLLIN|POLLERR
371 371 self.ioloop.add_handler(self.socket, self._handle_events,
372 372 self.iostate)
373 373 self.ioloop.start()
374 374
375 375 def stop(self):
376 376 self.ioloop.stop()
377 377 super(SubSocketChannel, self).stop()
378 378
379 379 def call_handlers(self, msg):
380 380 """This method is called in the ioloop thread when a message arrives.
381 381
382 382 Subclasses should override this method to handle incoming messages.
383 383 It is important to remember that this method is called in the thread
384 384 so that some logic must be done to ensure that the application leve
385 385 handlers are called in the application thread.
386 386 """
387 387 raise NotImplementedError('call_handlers must be defined in a subclass.')
388 388
389 389 def flush(self, timeout=1.0):
390 390 """Immediately processes all pending messages on the SUB channel.
391 391
392 392 Callers should use this method to ensure that :method:`call_handlers`
393 393 has been called for all messages that have been received on the
394 394 0MQ SUB socket of this channel.
395 395
396 396 This method is thread safe.
397 397
398 398 Parameters
399 399 ----------
400 400 timeout : float, optional
401 401 The maximum amount of time to spend flushing, in seconds. The
402 402 default is one second.
403 403 """
404 404 # We do the IOLoop callback process twice to ensure that the IOLoop
405 405 # gets to perform at least one full poll.
406 406 stop_time = time.time() + timeout
407 407 for i in xrange(2):
408 408 self._flushed = False
409 409 self.ioloop.add_callback(self._flush)
410 410 while not self._flushed and time.time() < stop_time:
411 411 time.sleep(0.01)
412 412
413 413 def _handle_events(self, socket, events):
414 414 # Turn on and off POLLOUT depending on if we have made a request
415 415 if events & POLLERR:
416 416 self._handle_err()
417 417 if events & POLLIN:
418 418 self._handle_recv()
419 419
420 420 def _handle_err(self):
421 421 # We don't want to let this go silently, so eventually we should log.
422 422 raise zmq.ZMQError()
423 423
424 424 def _handle_recv(self):
425 425 # Get all of the messages we can
426 426 while True:
427 427 try:
428 428 msg = self.socket.recv_json(zmq.NOBLOCK)
429 429 except zmq.ZMQError:
430 430 # Check the errno?
431 431 # Will this trigger POLLERR?
432 432 break
433 433 else:
434 434 self.call_handlers(msg)
435 435
436 436 def _flush(self):
437 437 """Callback for :method:`self.flush`."""
438 438 self._flushed = True
439 439
440 440
441 441 class RepSocketChannel(ZmqSocketChannel):
442 442 """A reply channel to handle raw_input requests that the kernel makes."""
443 443
444 444 msg_queue = None
445 445
446 446 def __init__(self, context, session, address):
447 447 super(RepSocketChannel, self).__init__(context, session, address)
448 448 self.ioloop = ioloop.IOLoop()
449 449 self.msg_queue = Queue()
450 450
451 451 def run(self):
452 452 """The thread's main activity. Call start() instead."""
453 453 self.socket = self.context.socket(zmq.XREQ)
454 454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
455 455 self.socket.connect('tcp://%s:%i' % self.address)
456 456 self.iostate = POLLERR|POLLIN
457 457 self.ioloop.add_handler(self.socket, self._handle_events,
458 458 self.iostate)
459 459 self.ioloop.start()
460 460
461 461 def stop(self):
462 462 self.ioloop.stop()
463 463 super(RepSocketChannel, self).stop()
464 464
465 465 def call_handlers(self, msg):
466 466 """This method is called in the ioloop thread when a message arrives.
467 467
468 468 Subclasses should override this method to handle incoming messages.
469 469 It is important to remember that this method is called in the thread
470 470 so that some logic must be done to ensure that the application leve
471 471 handlers are called in the application thread.
472 472 """
473 473 raise NotImplementedError('call_handlers must be defined in a subclass.')
474 474
475 475 def input(self, string):
476 476 """Send a string of raw input to the kernel."""
477 477 content = dict(value=string)
478 478 msg = self.session.msg('input_reply', content)
479 479 self._queue_reply(msg)
480 480
481 481 def _handle_events(self, socket, events):
482 482 if events & POLLERR:
483 483 self._handle_err()
484 484 if events & POLLOUT:
485 485 self._handle_send()
486 486 if events & POLLIN:
487 487 self._handle_recv()
488 488
489 489 def _handle_recv(self):
490 490 msg = self.socket.recv_json()
491 491 self.call_handlers(msg)
492 492
493 493 def _handle_send(self):
494 494 try:
495 495 msg = self.msg_queue.get(False)
496 496 except Empty:
497 497 pass
498 498 else:
499 499 self.socket.send_json(msg)
500 500 if self.msg_queue.empty():
501 501 self.drop_io_state(POLLOUT)
502 502
503 503 def _handle_err(self):
504 504 # We don't want to let this go silently, so eventually we should log.
505 505 raise zmq.ZMQError()
506 506
507 507 def _queue_reply(self, msg):
508 508 self.msg_queue.put(msg)
509 509 self.add_io_state(POLLOUT)
510 510
511 511
512 512 class HBSocketChannel(ZmqSocketChannel):
513 513 """The heartbeat channel which monitors the kernel heartbeat.
514 514
515 515 Note that the heartbeat channel is paused by default. As long as you start
516 516 this channel, the kernel manager will ensure that it is paused and un-paused
517 517 as appropriate.
518 518 """
519 519
520 520 time_to_dead = 3.0
521 521 socket = None
522 522 poller = None
523 523 _running = None
524 524 _pause = None
525 525
526 526 def __init__(self, context, session, address):
527 527 super(HBSocketChannel, self).__init__(context, session, address)
528 528 self._running = False
529 529 self._pause = True
530 530
531 531 def _create_socket(self):
532 532 self.socket = self.context.socket(zmq.REQ)
533 533 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
534 534 self.socket.connect('tcp://%s:%i' % self.address)
535 535 self.poller = zmq.Poller()
536 536 self.poller.register(self.socket, zmq.POLLIN)
537 537
538 538 def run(self):
539 539 """The thread's main activity. Call start() instead."""
540 540 self._create_socket()
541 541 self._running = True
542 542 while self._running:
543 543 if self._pause:
544 544 time.sleep(self.time_to_dead)
545 545 else:
546 546 since_last_heartbeat = 0.0
547 547 request_time = time.time()
548 548 try:
549 549 #io.rprint('Ping from HB channel') # dbg
550 550 self.socket.send_json('ping')
551 551 except zmq.ZMQError, e:
552 552 #io.rprint('*** HB Error:', e) # dbg
553 553 if e.errno == zmq.EFSM:
554 554 #io.rprint('sleep...', self.time_to_dead) # dbg
555 555 time.sleep(self.time_to_dead)
556 556 self._create_socket()
557 557 else:
558 558 raise
559 559 else:
560 560 while True:
561 561 try:
562 562 self.socket.recv_json(zmq.NOBLOCK)
563 563 except zmq.ZMQError, e:
564 564 #io.rprint('*** HB Error 2:', e) # dbg
565 565 if e.errno == zmq.EAGAIN:
566 566 before_poll = time.time()
567 567 until_dead = self.time_to_dead - (before_poll -
568 568 request_time)
569 569
570 570 # When the return value of poll() is an empty
571 571 # list, that is when things have gone wrong
572 572 # (zeromq bug). As long as it is not an empty
573 573 # list, poll is working correctly even if it
574 574 # returns quickly. Note: poll timeout is in
575 575 # milliseconds.
576 576 self.poller.poll(1000*until_dead)
577 577
578 578 since_last_heartbeat = time.time()-request_time
579 579 if since_last_heartbeat > self.time_to_dead:
580 580 self.call_handlers(since_last_heartbeat)
581 581 break
582 582 else:
583 583 # FIXME: We should probably log this instead.
584 584 raise
585 585 else:
586 586 until_dead = self.time_to_dead - (time.time() -
587 587 request_time)
588 588 if until_dead > 0.0:
589 589 #io.rprint('sleep...', self.time_to_dead) # dbg
590 590 time.sleep(until_dead)
591 591 break
592 592
593 593 def pause(self):
594 594 """Pause the heartbeat."""
595 595 self._pause = True
596 596
597 597 def unpause(self):
598 598 """Unpause the heartbeat."""
599 599 self._pause = False
600 600
601 601 def is_beating(self):
602 602 """Is the heartbeat running and not paused."""
603 603 if self.is_alive() and not self._pause:
604 604 return True
605 605 else:
606 606 return False
607 607
608 608 def stop(self):
609 609 self._running = False
610 610 super(HBSocketChannel, self).stop()
611 611
612 612 def call_handlers(self, since_last_heartbeat):
613 613 """This method is called in the ioloop thread when a message arrives.
614 614
615 615 Subclasses should override this method to handle incoming messages.
616 616 It is important to remember that this method is called in the thread
617 617 so that some logic must be done to ensure that the application leve
618 618 handlers are called in the application thread.
619 619 """
620 620 raise NotImplementedError('call_handlers must be defined in a subclass.')
621 621
622 622
623 623 #-----------------------------------------------------------------------------
624 624 # Main kernel manager class
625 625 #-----------------------------------------------------------------------------
626 626
627 627 class KernelManager(HasTraits):
628 628 """ Manages a kernel for a frontend.
629 629
630 630 The SUB channel is for the frontend to receive messages published by the
631 631 kernel.
632 632
633 633 The REQ channel is for the frontend to make requests of the kernel.
634 634
635 635 The REP channel is for the kernel to request stdin (raw_input) from the
636 636 frontend.
637 637 """
638 638 # The PyZMQ Context to use for communication with the kernel.
639 639 context = Instance(zmq.Context,(),{})
640 640
641 641 # The Session to use for communication with the kernel.
642 642 session = Instance(Session,(),{})
643 643
644 644 # The kernel process with which the KernelManager is communicating.
645 645 kernel = Instance(Popen)
646 646
647 647 # The addresses for the communication channels.
648 648 xreq_address = TCPAddress((LOCALHOST, 0))
649 649 sub_address = TCPAddress((LOCALHOST, 0))
650 650 rep_address = TCPAddress((LOCALHOST, 0))
651 651 hb_address = TCPAddress((LOCALHOST, 0))
652 652
653 653 # The classes to use for the various channels.
654 654 xreq_channel_class = Type(XReqSocketChannel)
655 655 sub_channel_class = Type(SubSocketChannel)
656 656 rep_channel_class = Type(RepSocketChannel)
657 657 hb_channel_class = Type(HBSocketChannel)
658 658
659 659 # Protected traits.
660 660 _launch_args = Any
661 661 _xreq_channel = Any
662 662 _sub_channel = Any
663 663 _rep_channel = Any
664 664 _hb_channel = Any
665 665
666 666 def __init__(self, **kwargs):
667 667 super(KernelManager, self).__init__(**kwargs)
668 668 # Uncomment this to try closing the context.
669 669 # atexit.register(self.context.close)
670 670
671 671 #--------------------------------------------------------------------------
672 672 # Channel management methods:
673 673 #--------------------------------------------------------------------------
674 674
675 675 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
676 676 """Starts the channels for this kernel.
677 677
678 678 This will create the channels if they do not exist and then start
679 679 them. If port numbers of 0 are being used (random ports) then you
680 680 must first call :method:`start_kernel`. If the channels have been
681 681 stopped and you call this, :class:`RuntimeError` will be raised.
682 682 """
683 683 if xreq:
684 684 self.xreq_channel.start()
685 685 if sub:
686 686 self.sub_channel.start()
687 687 if rep:
688 688 self.rep_channel.start()
689 689 if hb:
690 690 self.hb_channel.start()
691 691
692 692 def stop_channels(self):
693 693 """Stops all the running channels for this kernel.
694 694 """
695 695 if self.xreq_channel.is_alive():
696 696 self.xreq_channel.stop()
697 697 if self.sub_channel.is_alive():
698 698 self.sub_channel.stop()
699 699 if self.rep_channel.is_alive():
700 700 self.rep_channel.stop()
701 701 if self.hb_channel.is_alive():
702 702 self.hb_channel.stop()
703 703
704 704 @property
705 705 def channels_running(self):
706 706 """Are any of the channels created and running?"""
707 707 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
708 708 self.rep_channel.is_alive() or self.hb_channel.is_alive())
709 709
710 710 #--------------------------------------------------------------------------
711 711 # Kernel process management methods:
712 712 #--------------------------------------------------------------------------
713 713
714 714 def start_kernel(self, **kw):
715 715 """Starts a kernel process and configures the manager to use it.
716 716
717 717 If random ports (port=0) are being used, this method must be called
718 718 before the channels are created.
719 719
720 720 Parameters:
721 721 -----------
722 722 ipython : bool, optional (default True)
723 723 Whether to use an IPython kernel instead of a plain Python kernel.
724 724 """
725 725 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
726 726 self.rep_address, self.hb_address
727 727 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
728 728 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
729 729 raise RuntimeError("Can only launch a kernel on localhost."
730 730 "Make sure that the '*_address' attributes are "
731 731 "configured properly.")
732 732
733 733 self._launch_args = kw.copy()
734 734 if kw.pop('ipython', True):
735 735 from ipkernel import launch_kernel
736 736 else:
737 737 from pykernel import launch_kernel
738 738 self.kernel, xrep, pub, req, hb = launch_kernel(
739 739 xrep_port=xreq[1], pub_port=sub[1],
740 740 req_port=rep[1], hb_port=hb[1], **kw)
741 741 self.xreq_address = (LOCALHOST, xrep)
742 742 self.sub_address = (LOCALHOST, pub)
743 743 self.rep_address = (LOCALHOST, req)
744 744 self.hb_address = (LOCALHOST, hb)
745 745
746 def shutdown_kernel(self):
746 def shutdown_kernel(self, restart=False):
747 747 """ Attempts to the stop the kernel process cleanly. If the kernel
748 748 cannot be stopped, it is killed, if possible.
749 749 """
750 750 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
751 751 if sys.platform == 'win32':
752 752 self.kill_kernel()
753 753 return
754 754
755 755 # Pause the heart beat channel if it exists.
756 756 if self._hb_channel is not None:
757 757 self._hb_channel.pause()
758 758
759 759 # Don't send any additional kernel kill messages immediately, to give
760 760 # the kernel a chance to properly execute shutdown actions. Wait for at
761 761 # most 1s, checking every 0.1s.
762 self.xreq_channel.shutdown()
762 self.xreq_channel.shutdown(restart=restart)
763 763 for i in range(10):
764 764 if self.is_alive:
765 765 time.sleep(0.1)
766 766 else:
767 767 break
768 768 else:
769 769 # OK, we've waited long enough.
770 770 if self.has_kernel:
771 771 self.kill_kernel()
772 772
773 773 def restart_kernel(self, now=False):
774 774 """Restarts a kernel with the same arguments that were used to launch
775 775 it. If the old kernel was launched with random ports, the same ports
776 776 will be used for the new kernel.
777 777
778 778 Parameters
779 779 ----------
780 780 now : bool, optional
781 781 If True, the kernel is forcefully restarted *immediately*, without
782 782 having a chance to do any cleanup action. Otherwise the kernel is
783 783 given 1s to clean up before a forceful restart is issued.
784 784
785 785 In all cases the kernel is restarted, the only difference is whether
786 786 it is given a chance to perform a clean shutdown or not.
787 787 """
788 788 if self._launch_args is None:
789 789 raise RuntimeError("Cannot restart the kernel. "
790 790 "No previous call to 'start_kernel'.")
791 791 else:
792 792 if self.has_kernel:
793 793 if now:
794 794 self.kill_kernel()
795 795 else:
796 self.shutdown_kernel()
796 self.shutdown_kernel(restart=True)
797 797 self.start_kernel(**self._launch_args)
798 798
799 799 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
800 800 # unless there is some delay here.
801 801 if sys.platform == 'win32':
802 802 time.sleep(0.2)
803 803
804 804 @property
805 805 def has_kernel(self):
806 806 """Returns whether a kernel process has been specified for the kernel
807 807 manager.
808 808 """
809 809 return self.kernel is not None
810 810
811 811 def kill_kernel(self):
812 812 """ Kill the running kernel. """
813 813 if self.has_kernel:
814 814 # Pause the heart beat channel if it exists.
815 815 if self._hb_channel is not None:
816 816 self._hb_channel.pause()
817 817
818 818 # Attempt to kill the kernel.
819 819 try:
820 820 self.kernel.kill()
821 821 except OSError, e:
822 822 # In Windows, we will get an Access Denied error if the process
823 823 # has already terminated. Ignore it.
824 824 if not (sys.platform == 'win32' and e.winerror == 5):
825 825 raise
826 826 self.kernel = None
827 827 else:
828 828 raise RuntimeError("Cannot kill kernel. No kernel is running!")
829 829
830 830 def interrupt_kernel(self):
831 831 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
832 832 well supported on all platforms.
833 833 """
834 834 if self.has_kernel:
835 835 if sys.platform == 'win32':
836 836 from parentpoller import ParentPollerWindows as Poller
837 837 Poller.send_interrupt(self.kernel.win32_interrupt_event)
838 838 else:
839 839 self.kernel.send_signal(signal.SIGINT)
840 840 else:
841 841 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
842 842
843 843 def signal_kernel(self, signum):
844 844 """ Sends a signal to the kernel. Note that since only SIGTERM is
845 845 supported on Windows, this function is only useful on Unix systems.
846 846 """
847 847 if self.has_kernel:
848 848 self.kernel.send_signal(signum)
849 849 else:
850 850 raise RuntimeError("Cannot signal kernel. No kernel is running!")
851 851
852 852 @property
853 853 def is_alive(self):
854 854 """Is the kernel process still running?"""
855 855 # FIXME: not using a heartbeat means this method is broken for any
856 856 # remote kernel, it's only capable of handling local kernels.
857 857 if self.has_kernel:
858 858 if self.kernel.poll() is None:
859 859 return True
860 860 else:
861 861 return False
862 862 else:
863 863 # We didn't start the kernel with this KernelManager so we don't
864 864 # know if it is running. We should use a heartbeat for this case.
865 865 return True
866 866
867 867 #--------------------------------------------------------------------------
868 868 # Channels used for communication with the kernel:
869 869 #--------------------------------------------------------------------------
870 870
871 871 @property
872 872 def xreq_channel(self):
873 873 """Get the REQ socket channel object to make requests of the kernel."""
874 874 if self._xreq_channel is None:
875 875 self._xreq_channel = self.xreq_channel_class(self.context,
876 876 self.session,
877 877 self.xreq_address)
878 878 return self._xreq_channel
879 879
880 880 @property
881 881 def sub_channel(self):
882 882 """Get the SUB socket channel object."""
883 883 if self._sub_channel is None:
884 884 self._sub_channel = self.sub_channel_class(self.context,
885 885 self.session,
886 886 self.sub_address)
887 887 return self._sub_channel
888 888
889 889 @property
890 890 def rep_channel(self):
891 891 """Get the REP socket channel object to handle stdin (raw_input)."""
892 892 if self._rep_channel is None:
893 893 self._rep_channel = self.rep_channel_class(self.context,
894 894 self.session,
895 895 self.rep_address)
896 896 return self._rep_channel
897 897
898 898 @property
899 899 def hb_channel(self):
900 900 """Get the REP socket channel object to handle stdin (raw_input)."""
901 901 if self._hb_channel is None:
902 902 self._hb_channel = self.hb_channel_class(self.context,
903 903 self.session,
904 904 self.hb_address)
905 905 return self._hb_channel
@@ -1,294 +1,296
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 # Standard library imports.
18 18 import __builtin__
19 19 from code import CommandCompiler
20 20 import sys
21 21 import time
22 22 import traceback
23 23
24 24 # System library imports.
25 25 import zmq
26 26
27 27 # Local imports.
28 28 from IPython.utils.traitlets import HasTraits, Instance
29 29 from completer import KernelCompleter
30 30 from entry_point import base_launch_kernel, make_default_main
31 31 from session import Session, Message
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Main kernel class
35 35 #-----------------------------------------------------------------------------
36 36
37 37 class Kernel(HasTraits):
38 38
39 39 # Private interface
40 40
41 41 # This is a dict of port number that the kernel is listening on. It is set
42 42 # by record_ports and used by connect_request.
43 43 _recorded_ports = None
44 44
45 45 #---------------------------------------------------------------------------
46 46 # Kernel interface
47 47 #---------------------------------------------------------------------------
48 48
49 49 session = Instance(Session)
50 50 reply_socket = Instance('zmq.Socket')
51 51 pub_socket = Instance('zmq.Socket')
52 52 req_socket = Instance('zmq.Socket')
53 53
54 54 def __init__(self, **kwargs):
55 55 super(Kernel, self).__init__(**kwargs)
56 56 self.user_ns = {}
57 57 self.history = []
58 58 self.compiler = CommandCompiler()
59 59 self.completer = KernelCompleter(self.user_ns)
60 60
61 61 # Build dict of handlers for message types
62 62 msg_types = [ 'execute_request', 'complete_request',
63 63 'object_info_request', 'shutdown_request' ]
64 64 self.handlers = {}
65 65 for msg_type in msg_types:
66 66 self.handlers[msg_type] = getattr(self, msg_type)
67 67
68 68 def start(self):
69 69 """ Start the kernel main loop.
70 70 """
71 71 while True:
72 72 ident = self.reply_socket.recv()
73 73 assert self.reply_socket.rcvmore(), "Missing message part."
74 74 msg = self.reply_socket.recv_json()
75 75 omsg = Message(msg)
76 76 print>>sys.__stdout__
77 77 print>>sys.__stdout__, omsg
78 78 handler = self.handlers.get(omsg.msg_type, None)
79 79 if handler is None:
80 80 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
81 81 else:
82 82 handler(ident, omsg)
83 83
84 84 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
85 85 """Record the ports that this kernel is using.
86 86
87 87 The creator of the Kernel instance must call this methods if they
88 88 want the :meth:`connect_request` method to return the port numbers.
89 89 """
90 90 self._recorded_ports = {
91 91 'xrep_port' : xrep_port,
92 92 'pub_port' : pub_port,
93 93 'req_port' : req_port,
94 94 'hb_port' : hb_port
95 95 }
96 96
97 97 #---------------------------------------------------------------------------
98 98 # Kernel request handlers
99 99 #---------------------------------------------------------------------------
100 100
101 101 def execute_request(self, ident, parent):
102 102 try:
103 103 code = parent[u'content'][u'code']
104 104 except:
105 105 print>>sys.__stderr__, "Got bad msg: "
106 106 print>>sys.__stderr__, Message(parent)
107 107 return
108 108 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
109 109 self.pub_socket.send_json(pyin_msg)
110 110
111 111 try:
112 112 comp_code = self.compiler(code, '<zmq-kernel>')
113 113
114 114 # Replace raw_input. Note that is not sufficient to replace
115 115 # raw_input in the user namespace.
116 116 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
117 117 __builtin__.raw_input = raw_input
118 118
119 119 # Set the parent message of the display hook and out streams.
120 120 sys.displayhook.set_parent(parent)
121 121 sys.stdout.set_parent(parent)
122 122 sys.stderr.set_parent(parent)
123 123
124 124 exec comp_code in self.user_ns, self.user_ns
125 125 except:
126 126 etype, evalue, tb = sys.exc_info()
127 127 tb = traceback.format_exception(etype, evalue, tb)
128 128 exc_content = {
129 129 u'status' : u'error',
130 130 u'traceback' : tb,
131 131 u'ename' : unicode(etype.__name__),
132 132 u'evalue' : unicode(evalue)
133 133 }
134 134 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
135 135 self.pub_socket.send_json(exc_msg)
136 136 reply_content = exc_content
137 137 else:
138 138 reply_content = { 'status' : 'ok', 'payload' : {} }
139 139
140 140 # Flush output before sending the reply.
141 141 sys.stderr.flush()
142 142 sys.stdout.flush()
143 143
144 144 # Send the reply.
145 145 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
146 146 print>>sys.__stdout__, Message(reply_msg)
147 147 self.reply_socket.send(ident, zmq.SNDMORE)
148 148 self.reply_socket.send_json(reply_msg)
149 149 if reply_msg['content']['status'] == u'error':
150 150 self._abort_queue()
151 151
152 152 def complete_request(self, ident, parent):
153 153 matches = {'matches' : self._complete(parent),
154 154 'status' : 'ok'}
155 155 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
156 156 matches, parent, ident)
157 157 print >> sys.__stdout__, completion_msg
158 158
159 159 def object_info_request(self, ident, parent):
160 160 context = parent['content']['oname'].split('.')
161 161 object_info = self._object_info(context)
162 162 msg = self.session.send(self.reply_socket, 'object_info_reply',
163 163 object_info, parent, ident)
164 164 print >> sys.__stdout__, msg
165 165
166 166 def shutdown_request(self, ident, parent):
167 167 content = dict(parent['content'])
168 168 msg = self.session.send(self.reply_socket, 'shutdown_reply',
169 169 content, parent, ident)
170 msg = self.session.send(self.pub_socket, 'shutdown_reply',
171 content, parent, ident)
170 172 print >> sys.__stdout__, msg
171 173 time.sleep(0.1)
172 174 sys.exit(0)
173 175
174 176 #---------------------------------------------------------------------------
175 177 # Protected interface
176 178 #---------------------------------------------------------------------------
177 179
178 180 def _abort_queue(self):
179 181 while True:
180 182 try:
181 183 ident = self.reply_socket.recv(zmq.NOBLOCK)
182 184 except zmq.ZMQError, e:
183 185 if e.errno == zmq.EAGAIN:
184 186 break
185 187 else:
186 188 assert self.reply_socket.rcvmore(), "Missing message part."
187 189 msg = self.reply_socket.recv_json()
188 190 print>>sys.__stdout__, "Aborting:"
189 191 print>>sys.__stdout__, Message(msg)
190 192 msg_type = msg['msg_type']
191 193 reply_type = msg_type.split('_')[0] + '_reply'
192 194 reply_msg = self.session.msg(reply_type, {'status':'aborted'}, msg)
193 195 print>>sys.__stdout__, Message(reply_msg)
194 196 self.reply_socket.send(ident,zmq.SNDMORE)
195 197 self.reply_socket.send_json(reply_msg)
196 198 # We need to wait a bit for requests to come in. This can probably
197 199 # be set shorter for true asynchronous clients.
198 200 time.sleep(0.1)
199 201
200 202 def _raw_input(self, prompt, ident, parent):
201 203 # Flush output before making the request.
202 204 sys.stderr.flush()
203 205 sys.stdout.flush()
204 206
205 207 # Send the input request.
206 208 content = dict(prompt=prompt)
207 209 msg = self.session.msg(u'input_request', content, parent)
208 210 self.req_socket.send_json(msg)
209 211
210 212 # Await a response.
211 213 reply = self.req_socket.recv_json()
212 214 try:
213 215 value = reply['content']['value']
214 216 except:
215 217 print>>sys.__stderr__, "Got bad raw_input reply: "
216 218 print>>sys.__stderr__, Message(parent)
217 219 value = ''
218 220 return value
219 221
220 222 def _complete(self, msg):
221 223 return self.completer.complete(msg.content.line, msg.content.text)
222 224
223 225 def _object_info(self, context):
224 226 symbol, leftover = self._symbol_from_context(context)
225 227 if symbol is not None and not leftover:
226 228 doc = getattr(symbol, '__doc__', '')
227 229 else:
228 230 doc = ''
229 231 object_info = dict(docstring = doc)
230 232 return object_info
231 233
232 234 def _symbol_from_context(self, context):
233 235 if not context:
234 236 return None, context
235 237
236 238 base_symbol_string = context[0]
237 239 symbol = self.user_ns.get(base_symbol_string, None)
238 240 if symbol is None:
239 241 symbol = __builtin__.__dict__.get(base_symbol_string, None)
240 242 if symbol is None:
241 243 return None, context
242 244
243 245 context = context[1:]
244 246 for i, name in enumerate(context):
245 247 new_symbol = getattr(symbol, name, None)
246 248 if new_symbol is None:
247 249 return symbol, context[i:]
248 250 else:
249 251 symbol = new_symbol
250 252
251 253 return symbol, []
252 254
253 255 #-----------------------------------------------------------------------------
254 256 # Kernel main and launch functions
255 257 #-----------------------------------------------------------------------------
256 258
257 259 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, hb_port=0,
258 260 independent=False):
259 261 """ Launches a localhost kernel, binding to the specified ports.
260 262
261 263 Parameters
262 264 ----------
263 265 xrep_port : int, optional
264 266 The port to use for XREP channel.
265 267
266 268 pub_port : int, optional
267 269 The port to use for the SUB channel.
268 270
269 271 req_port : int, optional
270 272 The port to use for the REQ (raw input) channel.
271 273
272 274 hb_port : int, optional
273 275 The port to use for the hearbeat REP channel.
274 276
275 277 independent : bool, optional (default False)
276 278 If set, the kernel process is guaranteed to survive if this process
277 279 dies. If not set, an effort is made to ensure that the kernel is killed
278 280 when this process dies. Note that in this case it is still good practice
279 281 to kill kernels manually before exiting.
280 282
281 283 Returns
282 284 -------
283 285 A tuple of form:
284 286 (kernel_process, xrep_port, pub_port, req_port)
285 287 where kernel_process is a Popen object and the ports are integers.
286 288 """
287 289 return base_launch_kernel('from IPython.zmq.pykernel import main; main()',
288 290 xrep_port, pub_port, req_port, hb_port,
289 291 independent)
290 292
291 293 main = make_default_main(Kernel)
292 294
293 295 if __name__ == '__main__':
294 296 main()
General Comments 0
You need to be logged in to leave comments. Login now