##// END OF EJS Templates
allow aborting tasks
MinRK -
Show More
@@ -1,871 +1,864 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 from signal import (
26 26 signal, default_int_handler, SIGINT, SIG_IGN
27 27 )
28 28 # System library imports.
29 29 import zmq
30 30
31 31 # Local imports.
32 32 from IPython.core import pylabtools
33 33 from IPython.config.configurable import Configurable
34 34 from IPython.config.application import boolean_flag, catch_config_error
35 35 from IPython.core.application import ProfileDir
36 36 from IPython.core.error import StdinNotImplementedError
37 37 from IPython.core.shellapp import (
38 38 InteractiveShellApp, shell_flags, shell_aliases
39 39 )
40 40 from IPython.utils import io
41 41 from IPython.utils import py3compat
42 42 from IPython.utils.frame import extract_module_locals
43 43 from IPython.utils.jsonutil import json_clean
44 44 from IPython.utils.traitlets import (
45 Any, Instance, Float, Dict, CaselessStrEnum, List
45 Any, Instance, Float, Dict, CaselessStrEnum, List, Set
46 46 )
47 47
48 48 from entry_point import base_launch_kernel
49 49 from kernelapp import KernelApp, kernel_flags, kernel_aliases
50 50 from session import Session, Message
51 51 from zmqshell import ZMQInteractiveShell
52 52
53 53
54 54 #-----------------------------------------------------------------------------
55 55 # Main kernel class
56 56 #-----------------------------------------------------------------------------
57 57
58 58 class Kernel(Configurable):
59 59
60 60 #---------------------------------------------------------------------------
61 61 # Kernel interface
62 62 #---------------------------------------------------------------------------
63 63
64 64 # attribute to override with a GUI
65 65 eventloop = Any(None)
66 66
67 67 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
68 68 session = Instance(Session)
69 69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 70 shell_sockets = List()
71 71 control_socket = Instance('zmq.Socket')
72 72 iopub_socket = Instance('zmq.Socket')
73 73 log = Instance(logging.Logger)
74 74
75 75 user_module = Instance('types.ModuleType')
76 76 def _user_module_changed(self, name, old, new):
77 77 if self.shell is not None:
78 78 self.shell.user_module = new
79 79
80 80 user_ns = Dict(default_value=None)
81 81 def _user_ns_changed(self, name, old, new):
82 82 if self.shell is not None:
83 83 self.shell.user_ns = new
84 84 self.shell.init_user_ns()
85 85
86 86 # Private interface
87
87
88 88 # Time to sleep after flushing the stdout/err buffers in each execute
89 89 # cycle. While this introduces a hard limit on the minimal latency of the
90 90 # execute cycle, it helps prevent output synchronization problems for
91 91 # clients.
92 92 # Units are in seconds. The minimum zmq latency on local host is probably
93 93 # ~150 microseconds, set this to 500us for now. We may need to increase it
94 94 # a little if it's not enough after more interactive testing.
95 95 _execute_sleep = Float(0.0005, config=True)
96 96
97 97 # Frequency of the kernel's event loop.
98 98 # Units are in seconds, kernel subclasses for GUI toolkits may need to
99 99 # adapt to milliseconds.
100 100 _poll_interval = Float(0.05, config=True)
101 101
102 102 # If the shutdown was requested over the network, we leave here the
103 103 # necessary reply message so it can be sent by our registered atexit
104 104 # handler. This ensures that the reply is only sent to clients truly at
105 105 # the end of our shutdown process (which happens after the underlying
106 106 # IPython shell's own shutdown).
107 107 _shutdown_message = None
108 108
109 109 # This is a dict of port number that the kernel is listening on. It is set
110 110 # by record_ports and used by connect_request.
111 111 _recorded_ports = Dict()
112
113 # set of aborted msg_ids
114 aborted = Set()
112 115
113 116
114 117
115 118 def __init__(self, **kwargs):
116 119 super(Kernel, self).__init__(**kwargs)
117 120
118 121 # Before we even start up the shell, register *first* our exit handlers
119 122 # so they come before the shell's
120 123 atexit.register(self._at_shutdown)
121 124
122 125 # Initialize the InteractiveShell subclass
123 126 self.shell = ZMQInteractiveShell.instance(config=self.config,
124 127 profile_dir = self.profile_dir,
125 128 user_module = self.user_module,
126 129 user_ns = self.user_ns,
127 130 )
128 131 self.shell.displayhook.session = self.session
129 132 self.shell.displayhook.pub_socket = self.iopub_socket
130 133 self.shell.display_pub.session = self.session
131 134 self.shell.display_pub.pub_socket = self.iopub_socket
132 135
133 136 # TMP - hack while developing
134 137 self.shell._reply_content = None
135 138
136 139 # Build dict of handlers for message types
137 140 msg_types = [ 'execute_request', 'complete_request',
138 141 'object_info_request', 'history_request',
139 142 'connect_request', 'shutdown_request',
140 143 'apply_request',
141 144 ]
142 145 self.handlers = {}
143 146 for msg_type in msg_types:
144 147 self.handlers[msg_type] = getattr(self, msg_type)
145 148
146 149 control_msg_types = [ 'clear_request', 'abort_request' ]
147 150 self.control_handlers = {}
148 151 for msg_type in control_msg_types:
149 152 self.control_handlers[msg_type] = getattr(self, msg_type)
150 153
151 154 def do_one_iteration(self):
152 155 """Do one iteration of the kernel's evaluation loop.
153 156 """
154 157
155 158 # always flush control socket first
156 159 while True:
157 160 if self.control_socket is None:
158 161 break
159 162 try:
160 163 idents,msg = self.session.recv(self.control_socket, zmq.NOBLOCK)
161 164 except Exception:
162 165 self.log.warn("Invalid Control Message:", exc_info=True)
163 166 continue
164 167 if msg is None:
165 168 break
166 169 self.dispatch_message(self.control_socket, idents, msg, self.control_handlers)
167 170
168 171 for socket in self.shell_sockets:
169 172 try:
170 173 idents,msg = self.session.recv(socket, zmq.NOBLOCK)
171 174 except Exception:
172 175 self.log.warn("Invalid Message:", exc_info=True)
173 176 continue
174 177
175 178 if msg is None:
176 179 continue
177 180
178 181 self.dispatch_message(socket, idents, msg, self.handlers)
179 182
180 183 def dispatch_message(self, socket, idents, msg, handlers):
181 184 msg_type = msg['header']['msg_type']
185 msg_id = msg['header']['msg_id']
182 186
183 187 # This assert will raise in versions of zeromq 2.0.7 and lesser.
184 188 # We now require 2.0.8 or above, so we can uncomment for safety.
185 189 # print(ident,msg, file=sys.__stdout__)
186 190 assert idents is not None, "Missing message part."
187 191
188 192 # Print some info about this message and leave a '--->' marker, so it's
189 193 # easier to trace visually the message chain when debugging. Each
190 194 # handler prints its message at the end.
191 195 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
192 196 self.log.debug(' Content: %s\n --->\n ', msg['content'])
193 197
198 # check if request has been aborted
199 if msg_id in self.aborted:
200 self.aborted.remove(msg_id)
201 # is it safe to assume a msg_id will not be resubmitted?
202 reply_type = msg_type.split('_')[0] + '_reply'
203 status = {'status' : 'aborted'}
204 reply_msg = self.session.send(socket, reply_type, subheader=status,
205 content=status, parent=msg, ident=idents)
206 return
207
208
194 209 # Find and call actual handler for message
195 210 handler = handlers.get(msg_type, None)
196 211 if handler is None:
197 212 self.log.error("UNKNOWN MESSAGE TYPE: %s", msg)
198 213 else:
199 214 handler(socket, idents, msg)
200 215
201 216 # Check whether we should exit, in case the incoming message set the
202 217 # exit flag on
203 218 if self.shell.exit_now:
204 219 self.log.debug('\nExiting IPython kernel...')
205 220 # We do a normal, clean exit, which allows any actions registered
206 221 # via atexit (such as history saving) to take place.
207 222 sys.exit(0)
208 223
209 224
210 225 def start(self):
211 226 """ Start the kernel main loop.
212 227 """
213 228 # a KeyboardInterrupt (SIGINT) can occur on any python statement, so
214 229 # let's ignore (SIG_IGN) them until we're in a place to handle them properly
215 230 signal(SIGINT,SIG_IGN)
216 231 poller = zmq.Poller()
217 232 for socket in self.shell_sockets:
218 233 poller.register(socket, zmq.POLLIN)
219 234 if self.control_socket:
220 235 poller.register(self.control_socket, zmq.POLLIN)
221 236
222 237 # loop while self.eventloop has not been overridden
223 238 while self.eventloop is None:
224 239 try:
225 240 # scale by extra factor of 10, because there is no
226 241 # reason for this to be anything less than ~ 0.1s
227 242 # since it is a real poller and will respond
228 243 # to events immediately
229 244
230 245 # double nested try/except, to properly catch KeyboardInterrupt
231 246 # due to pyzmq Issue #130
232 247 try:
233 248 poller.poll(10*1000*self._poll_interval)
234 249 # restore raising of KeyboardInterrupt
235 250 signal(SIGINT, default_int_handler)
236 251 self.do_one_iteration()
237 252 except:
238 253 raise
239 254 finally:
240 255 # prevent raising of KeyboardInterrupt
241 256 signal(SIGINT,SIG_IGN)
242 257 except KeyboardInterrupt:
243 258 # Ctrl-C shouldn't crash the kernel
244 259 io.raw_print("KeyboardInterrupt caught in kernel")
245 260 # stop ignoring sigint, now that we are out of our own loop,
246 261 # we don't want to prevent future code from handling it
247 262 signal(SIGINT, default_int_handler)
248 263 while self.eventloop is not None:
249 264 try:
250 265 self.eventloop(self)
251 266 except KeyboardInterrupt:
252 267 # Ctrl-C shouldn't crash the kernel
253 268 io.raw_print("KeyboardInterrupt caught in kernel")
254 269 continue
255 270 else:
256 271 # eventloop exited cleanly, this means we should stop (right?)
257 272 self.eventloop = None
258 273 break
259 274
260 275
261 276 def record_ports(self, ports):
262 277 """Record the ports that this kernel is using.
263 278
264 279 The creator of the Kernel instance must call this methods if they
265 280 want the :meth:`connect_request` method to return the port numbers.
266 281 """
267 282 self._recorded_ports = ports
268 283
269 284 #---------------------------------------------------------------------------
270 285 # Kernel request handlers
271 286 #---------------------------------------------------------------------------
272 287
273 288 def _publish_pyin(self, code, parent, execution_count):
274 289 """Publish the code request on the pyin stream."""
275 290
276 291 self.session.send(self.iopub_socket, u'pyin', {u'code':code,
277 292 u'execution_count': execution_count}, parent=parent)
278 293
279 294 def execute_request(self, socket, ident, parent):
280 295
281 296 self.session.send(self.iopub_socket,
282 297 u'status',
283 298 {u'execution_state':u'busy'},
284 299 parent=parent )
285 300
286 301 try:
287 302 content = parent[u'content']
288 303 code = content[u'code']
289 304 silent = content[u'silent']
290 305 except:
291 306 self.log.error("Got bad msg: ")
292 307 self.log.error("%s", parent)
293 308 return
294 309
295 310 shell = self.shell # we'll need this a lot here
296 311
297 312 # Replace raw_input. Note that is not sufficient to replace
298 313 # raw_input in the user namespace.
299 314 if content.get('allow_stdin', False):
300 315 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
301 316 else:
302 317 raw_input = lambda prompt='' : self._no_raw_input()
303 318
304 319 if py3compat.PY3:
305 320 __builtin__.input = raw_input
306 321 else:
307 322 __builtin__.raw_input = raw_input
308 323
309 324 # Set the parent message of the display hook and out streams.
310 325 shell.displayhook.set_parent(parent)
311 326 shell.display_pub.set_parent(parent)
312 327 sys.stdout.set_parent(parent)
313 328 sys.stderr.set_parent(parent)
314 329
315 330 # Re-broadcast our input for the benefit of listening clients, and
316 331 # start computing output
317 332 if not silent:
318 333 self._publish_pyin(code, parent, shell.execution_count)
319 334
320 335 reply_content = {}
321 336 try:
322 337 if silent:
323 338 # run_code uses 'exec' mode, so no displayhook will fire, and it
324 339 # doesn't call logging or history manipulations. Print
325 340 # statements in that code will obviously still execute.
326 341 shell.run_code(code)
327 342 else:
328 343 # FIXME: the shell calls the exception handler itself.
329 344 shell.run_cell(code, store_history=True)
330 345 except:
331 346 status = u'error'
332 347 # FIXME: this code right now isn't being used yet by default,
333 348 # because the run_cell() call above directly fires off exception
334 349 # reporting. This code, therefore, is only active in the scenario
335 350 # where runlines itself has an unhandled exception. We need to
336 351 # uniformize this, for all exception construction to come from a
337 352 # single location in the codbase.
338 353 etype, evalue, tb = sys.exc_info()
339 354 tb_list = traceback.format_exception(etype, evalue, tb)
340 355 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
341 356 else:
342 357 status = u'ok'
343 358
344 359 reply_content[u'status'] = status
345 360
346 361 # Return the execution counter so clients can display prompts
347 362 reply_content['execution_count'] = shell.execution_count -1
348 363
349 364 # FIXME - fish exception info out of shell, possibly left there by
350 365 # runlines. We'll need to clean up this logic later.
351 366 if shell._reply_content is not None:
352 367 reply_content.update(shell._reply_content)
353 368 # reset after use
354 369 shell._reply_content = None
355 370
356 371 # At this point, we can tell whether the main code execution succeeded
357 372 # or not. If it did, we proceed to evaluate user_variables/expressions
358 373 if reply_content['status'] == 'ok':
359 374 reply_content[u'user_variables'] = \
360 375 shell.user_variables(content[u'user_variables'])
361 376 reply_content[u'user_expressions'] = \
362 377 shell.user_expressions(content[u'user_expressions'])
363 378 else:
364 379 # If there was an error, don't even try to compute variables or
365 380 # expressions
366 381 reply_content[u'user_variables'] = {}
367 382 reply_content[u'user_expressions'] = {}
368 383
369 384 # Payloads should be retrieved regardless of outcome, so we can both
370 385 # recover partial output (that could have been generated early in a
371 386 # block, before an error) and clear the payload system always.
372 387 reply_content[u'payload'] = shell.payload_manager.read_payload()
373 388 # Be agressive about clearing the payload because we don't want
374 389 # it to sit in memory until the next execute_request comes in.
375 390 shell.payload_manager.clear_payload()
376 391
377 392 # Flush output before sending the reply.
378 393 sys.stdout.flush()
379 394 sys.stderr.flush()
380 395 # FIXME: on rare occasions, the flush doesn't seem to make it to the
381 396 # clients... This seems to mitigate the problem, but we definitely need
382 397 # to better understand what's going on.
383 398 if self._execute_sleep:
384 399 time.sleep(self._execute_sleep)
385 400
386 401 # Send the reply.
387 402 reply_content = json_clean(reply_content)
388 403 reply_msg = self.session.send(socket, u'execute_reply',
389 404 reply_content, parent, ident=ident)
390 405 self.log.debug("%s", reply_msg)
391 406
392 407 if reply_msg['content']['status'] == u'error':
393 self._abort_queue()
408 self._abort_queues()
394 409
395 410 self.session.send(self.iopub_socket,
396 411 u'status',
397 412 {u'execution_state':u'idle'},
398 413 parent=parent )
399 414
400 415 def complete_request(self, socket, ident, parent):
401 416 txt, matches = self._complete(parent)
402 417 matches = {'matches' : matches,
403 418 'matched_text' : txt,
404 419 'status' : 'ok'}
405 420 matches = json_clean(matches)
406 421 completion_msg = self.session.send(socket, 'complete_reply',
407 422 matches, parent, ident)
408 423 self.log.debug("%s", completion_msg)
409 424
410 425 def object_info_request(self, socket, ident, parent):
411 426 content = parent['content']
412 427 object_info = self.shell.object_inspect(content['oname'],
413 428 detail_level = content.get('detail_level', 0)
414 429 )
415 430 # Before we send this object over, we scrub it for JSON usage
416 431 oinfo = json_clean(object_info)
417 432 msg = self.session.send(socket, 'object_info_reply',
418 433 oinfo, parent, ident)
419 434 self.log.debug("%s", msg)
420 435
421 436 def history_request(self, socket, ident, parent):
422 437 # We need to pull these out, as passing **kwargs doesn't work with
423 438 # unicode keys before Python 2.6.5.
424 439 hist_access_type = parent['content']['hist_access_type']
425 440 raw = parent['content']['raw']
426 441 output = parent['content']['output']
427 442 if hist_access_type == 'tail':
428 443 n = parent['content']['n']
429 444 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
430 445 include_latest=True)
431 446
432 447 elif hist_access_type == 'range':
433 448 session = parent['content']['session']
434 449 start = parent['content']['start']
435 450 stop = parent['content']['stop']
436 451 hist = self.shell.history_manager.get_range(session, start, stop,
437 452 raw=raw, output=output)
438 453
439 454 elif hist_access_type == 'search':
440 455 pattern = parent['content']['pattern']
441 456 hist = self.shell.history_manager.search(pattern, raw=raw,
442 457 output=output)
443 458
444 459 else:
445 460 hist = []
446 461 hist = list(hist)
447 462 content = {'history' : hist}
448 463 content = json_clean(content)
449 464 msg = self.session.send(socket, 'history_reply',
450 465 content, parent, ident)
451 466 self.log.debug("Sending history reply with %i entries", len(hist))
452 467
453 468 def connect_request(self, socket, ident, parent):
454 469 if self._recorded_ports is not None:
455 470 content = self._recorded_ports.copy()
456 471 else:
457 472 content = {}
458 473 msg = self.session.send(socket, 'connect_reply',
459 474 content, parent, ident)
460 475 self.log.debug("%s", msg)
461 476
462 477 def shutdown_request(self, ident, parent):
463 478 self.shell.exit_now = True
464 479 self._shutdown_message = self.session.msg(u'shutdown_reply',
465 480 parent['content'], parent)
466 481 sys.exit(0)
467 482
468 483 #---------------------------------------------------------------------------
469 484 # Engine methods
470 485 #---------------------------------------------------------------------------
471 486
472 487 def apply_request(self, socket, ident, parent):
473 488 try:
474 489 content = parent[u'content']
475 490 bufs = parent[u'buffers']
476 491 msg_id = parent['header']['msg_id']
477 492 # bound = parent['header'].get('bound', False)
478 493 except:
479 494 self.log.error("Got bad msg: %s", parent, exc_info=True)
480 495 return
481 496 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
482 497 # self.iopub_stream.send(pyin_msg)
483 498 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
484 499 sub = {'dependencies_met' : True, 'engine' : self.ident,
485 500 'started': datetime.now()}
486 501 try:
487 502 # allow for not overriding displayhook
488 503 if hasattr(sys.displayhook, 'set_parent'):
489 504 sys.displayhook.set_parent(parent)
490 505 sys.stdout.set_parent(parent)
491 506 sys.stderr.set_parent(parent)
492 507 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
493 508 working = self.user_ns
494 509 # suffix =
495 510 prefix = "_"+str(msg_id).replace("-","")+"_"
496 511
497 512 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
498 513 # if bound:
499 514 # bound_ns = Namespace(working)
500 515 # args = [bound_ns]+list(args)
501 516
502 517 fname = getattr(f, '__name__', 'f')
503 518
504 519 fname = prefix+"f"
505 520 argname = prefix+"args"
506 521 kwargname = prefix+"kwargs"
507 522 resultname = prefix+"result"
508 523
509 524 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
510 525 # print ns
511 526 working.update(ns)
512 527 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
513 528 try:
514 529 exec code in working,working
515 530 result = working.get(resultname)
516 531 finally:
517 532 for key in ns.iterkeys():
518 533 working.pop(key)
519 534 # if bound:
520 535 # working.update(bound_ns)
521 536
522 537 packed_result,buf = serialize_object(result)
523 538 result_buf = [packed_result]+buf
524 539 except:
525 540 exc_content = self._wrap_exception('apply')
526 541 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
527 542 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
528 543 ident=asbytes('%s.pyerr'%self.prefix))
529 544 reply_content = exc_content
530 545 result_buf = []
531 546
532 547 if exc_content['ename'] == 'UnmetDependency':
533 548 sub['dependencies_met'] = False
534 549 else:
535 550 reply_content = {'status' : 'ok'}
536 551
537 552 # put 'ok'/'error' status in header, for scheduler introspection:
538 553 sub['status'] = reply_content['status']
539 554
540 555 # flush i/o
541 556 sys.stdout.flush()
542 557 sys.stderr.flush()
543 558
544 559 reply_msg = self.session.send(socket, u'apply_reply', reply_content,
545 560 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
546 561
547 562 #---------------------------------------------------------------------------
548 563 # Control messages
549 564 #---------------------------------------------------------------------------
550 565
551 def abort_queues(self):
552 for socket in self.shell_sockets:
553 if socket:
554 self.abort_queue(socket)
555
556 def abort_queue(self, socket):
557 while True:
558 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
559 if msg is None:
560 return
561
562 self.log.info("Aborting:")
563 self.log.info("%s", msg)
564 msg_type = msg['header']['msg_type']
565 reply_type = msg_type.split('_')[0] + '_reply'
566 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
567 # self.reply_socket.send(ident,zmq.SNDMORE)
568 # self.reply_socket.send_json(reply_msg)
569 reply_msg = self.session.send(socket, reply_type,
570 content={'status' : 'aborted'}, parent=msg, ident=idents)
571 self.log.debug("%s", reply_msg)
572 # We need to wait a bit for requests to come in. This can probably
573 # be set shorter for true asynchronous clients.
574 time.sleep(0.05)
575
576 566 def abort_request(self, socket, ident, parent):
577 567 """abort a specifig msg by id"""
578 568 msg_ids = parent['content'].get('msg_ids', None)
579 569 if isinstance(msg_ids, basestring):
580 570 msg_ids = [msg_ids]
581 571 if not msg_ids:
582 572 self.abort_queues()
583 573 for mid in msg_ids:
584 574 self.aborted.add(str(mid))
585 575
586 576 content = dict(status='ok')
587 577 reply_msg = self.session.send(socket, 'abort_reply', content=content,
588 578 parent=parent, ident=ident)
589 579 self.log.debug("%s", reply_msg)
590 580
591 581 def clear_request(self, socket, idents, parent):
592 582 """Clear our namespace."""
593 583 self.user_ns = {}
594 584 msg = self.session.send(socket, 'clear_reply', ident=idents, parent=parent,
595 585 content = dict(status='ok'))
596 586 self._initial_exec_lines()
597 587
598 588
599 589 #---------------------------------------------------------------------------
600 590 # Protected interface
601 591 #---------------------------------------------------------------------------
602 592
603 def _abort_queue(self):
593 def _abort_queues(self):
594 for socket in self.shell_sockets:
595 if socket:
596 self._abort_queue(socket)
597
598 def _abort_queue(self, socket):
604 599 while True:
605 try:
606 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
607 except Exception:
608 self.log.warn("Invalid Message:", exc_info=True)
609 continue
600 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
610 601 if msg is None:
611 break
612 else:
613 assert ident is not None, \
614 "Unexpected missing message part."
602 return
615 603
616 self.log.debug("Aborting:\n%s", msg)
604 self.log.info("Aborting:")
605 self.log.info("%s", msg)
617 606 msg_type = msg['header']['msg_type']
618 607 reply_type = msg_type.split('_')[0] + '_reply'
619 reply_msg = self.session.send(self.shell_socket, reply_type,
620 {'status' : 'aborted'}, msg, ident=ident)
608 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
609 # self.reply_socket.send(ident,zmq.SNDMORE)
610 # self.reply_socket.send_json(reply_msg)
611 reply_msg = self.session.send(socket, reply_type,
612 content={'status' : 'aborted'}, parent=msg, ident=idents)
621 613 self.log.debug("%s", reply_msg)
622 614 # We need to wait a bit for requests to come in. This can probably
623 615 # be set shorter for true asynchronous clients.
624 time.sleep(0.1)
616 time.sleep(0.05)
617
625 618
626 619 def _no_raw_input(self):
627 620 """Raise StdinNotImplentedError if active frontend doesn't support
628 621 stdin."""
629 622 raise StdinNotImplementedError("raw_input was called, but this "
630 623 "frontend does not support stdin.")
631 624
632 625 def _raw_input(self, prompt, ident, parent):
633 626 # Flush output before making the request.
634 627 sys.stderr.flush()
635 628 sys.stdout.flush()
636 629
637 630 # Send the input request.
638 631 content = json_clean(dict(prompt=prompt))
639 632 self.session.send(self.stdin_socket, u'input_request', content, parent,
640 633 ident=ident)
641 634
642 635 # Await a response.
643 636 while True:
644 637 try:
645 638 ident, reply = self.session.recv(self.stdin_socket, 0)
646 639 except Exception:
647 640 self.log.warn("Invalid Message:", exc_info=True)
648 641 else:
649 642 break
650 643 try:
651 644 value = reply['content']['value']
652 645 except:
653 646 self.log.error("Got bad raw_input reply: ")
654 647 self.log.error("%s", parent)
655 648 value = ''
656 649 if value == '\x04':
657 650 # EOF
658 651 raise EOFError
659 652 return value
660 653
661 654 def _complete(self, msg):
662 655 c = msg['content']
663 656 try:
664 657 cpos = int(c['cursor_pos'])
665 658 except:
666 659 # If we don't get something that we can convert to an integer, at
667 660 # least attempt the completion guessing the cursor is at the end of
668 661 # the text, if there's any, and otherwise of the line
669 662 cpos = len(c['text'])
670 663 if cpos==0:
671 664 cpos = len(c['line'])
672 665 return self.shell.complete(c['text'], c['line'], cpos)
673 666
674 667 def _object_info(self, context):
675 668 symbol, leftover = self._symbol_from_context(context)
676 669 if symbol is not None and not leftover:
677 670 doc = getattr(symbol, '__doc__', '')
678 671 else:
679 672 doc = ''
680 673 object_info = dict(docstring = doc)
681 674 return object_info
682 675
683 676 def _symbol_from_context(self, context):
684 677 if not context:
685 678 return None, context
686 679
687 680 base_symbol_string = context[0]
688 681 symbol = self.shell.user_ns.get(base_symbol_string, None)
689 682 if symbol is None:
690 683 symbol = __builtin__.__dict__.get(base_symbol_string, None)
691 684 if symbol is None:
692 685 return None, context
693 686
694 687 context = context[1:]
695 688 for i, name in enumerate(context):
696 689 new_symbol = getattr(symbol, name, None)
697 690 if new_symbol is None:
698 691 return symbol, context[i:]
699 692 else:
700 693 symbol = new_symbol
701 694
702 695 return symbol, []
703 696
704 697 def _at_shutdown(self):
705 698 """Actions taken at shutdown by the kernel, called by python's atexit.
706 699 """
707 700 # io.rprint("Kernel at_shutdown") # dbg
708 701 if self._shutdown_message is not None:
709 702 self.session.send(self.shell_socket, self._shutdown_message)
710 703 self.session.send(self.iopub_socket, self._shutdown_message)
711 704 self.log.debug("%s", self._shutdown_message)
712 705 # A very short sleep to give zmq time to flush its message buffers
713 706 # before Python truly shuts down.
714 707 time.sleep(0.01)
715 708
716 709 #-----------------------------------------------------------------------------
717 710 # Aliases and Flags for the IPKernelApp
718 711 #-----------------------------------------------------------------------------
719 712
720 713 flags = dict(kernel_flags)
721 714 flags.update(shell_flags)
722 715
723 716 addflag = lambda *args: flags.update(boolean_flag(*args))
724 717
725 718 flags['pylab'] = (
726 719 {'IPKernelApp' : {'pylab' : 'auto'}},
727 720 """Pre-load matplotlib and numpy for interactive use with
728 721 the default matplotlib backend."""
729 722 )
730 723
731 724 aliases = dict(kernel_aliases)
732 725 aliases.update(shell_aliases)
733 726
734 727 # it's possible we don't want short aliases for *all* of these:
735 728 aliases.update(dict(
736 729 pylab='IPKernelApp.pylab',
737 730 ))
738 731
739 732 #-----------------------------------------------------------------------------
740 733 # The IPKernelApp class
741 734 #-----------------------------------------------------------------------------
742 735
743 736 class IPKernelApp(KernelApp, InteractiveShellApp):
744 737 name = 'ipkernel'
745 738
746 739 aliases = Dict(aliases)
747 740 flags = Dict(flags)
748 741 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
749 742
750 743 # configurables
751 744 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
752 745 config=True,
753 746 help="""Pre-load matplotlib and numpy for interactive use,
754 747 selecting a particular matplotlib backend and loop integration.
755 748 """
756 749 )
757 750
758 751 @catch_config_error
759 752 def initialize(self, argv=None):
760 753 super(IPKernelApp, self).initialize(argv)
761 754 self.init_path()
762 755 self.init_shell()
763 756 self.init_extensions()
764 757 self.init_code()
765 758
766 759 def init_kernel(self):
767 760
768 761 kernel = Kernel(config=self.config, session=self.session,
769 762 shell_sockets=[self.shell_socket],
770 763 iopub_socket=self.iopub_socket,
771 764 stdin_socket=self.stdin_socket,
772 765 log=self.log,
773 766 profile_dir=self.profile_dir,
774 767 )
775 768 self.kernel = kernel
776 769 kernel.record_ports(self.ports)
777 770 shell = kernel.shell
778 771 if self.pylab:
779 772 try:
780 773 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
781 774 shell.enable_pylab(gui, import_all=self.pylab_import_all)
782 775 except Exception:
783 776 self.log.error("Pylab initialization failed", exc_info=True)
784 777 # print exception straight to stdout, because normally
785 778 # _showtraceback associates the reply with an execution,
786 779 # which means frontends will never draw it, as this exception
787 780 # is not associated with any execute request.
788 781
789 782 # replace pyerr-sending traceback with stdout
790 783 _showtraceback = shell._showtraceback
791 784 def print_tb(etype, evalue, stb):
792 785 print ("Error initializing pylab, pylab mode will not "
793 786 "be active", file=io.stderr)
794 787 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
795 788 shell._showtraceback = print_tb
796 789
797 790 # send the traceback over stdout
798 791 shell.showtraceback(tb_offset=0)
799 792
800 793 # restore proper _showtraceback method
801 794 shell._showtraceback = _showtraceback
802 795
803 796
804 797 def init_shell(self):
805 798 self.shell = self.kernel.shell
806 799 self.shell.configurables.append(self)
807 800
808 801
809 802 #-----------------------------------------------------------------------------
810 803 # Kernel main and launch functions
811 804 #-----------------------------------------------------------------------------
812 805
813 806 def launch_kernel(*args, **kwargs):
814 807 """Launches a localhost IPython kernel, binding to the specified ports.
815 808
816 809 This function simply calls entry_point.base_launch_kernel with the right
817 810 first command to start an ipkernel. See base_launch_kernel for arguments.
818 811
819 812 Returns
820 813 -------
821 814 A tuple of form:
822 815 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
823 816 where kernel_process is a Popen object and the ports are integers.
824 817 """
825 818 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
826 819 *args, **kwargs)
827 820
828 821
829 822 def embed_kernel(module=None, local_ns=None, **kwargs):
830 823 """Embed and start an IPython kernel in a given scope.
831 824
832 825 Parameters
833 826 ----------
834 827 module : ModuleType, optional
835 828 The module to load into IPython globals (default: caller)
836 829 local_ns : dict, optional
837 830 The namespace to load into IPython user namespace (default: caller)
838 831
839 832 kwargs : various, optional
840 833 Further keyword args are relayed to the KernelApp constructor,
841 834 allowing configuration of the Kernel. Will only have an effect
842 835 on the first embed_kernel call for a given process.
843 836
844 837 """
845 838 # get the app if it exists, or set it up if it doesn't
846 839 if IPKernelApp.initialized():
847 840 app = IPKernelApp.instance()
848 841 else:
849 842 app = IPKernelApp.instance(**kwargs)
850 843 app.initialize([])
851 844
852 845 # load the calling scope if not given
853 846 (caller_module, caller_locals) = extract_module_locals(1)
854 847 if module is None:
855 848 module = caller_module
856 849 if local_ns is None:
857 850 local_ns = caller_locals
858 851
859 852 app.kernel.user_module = module
860 853 app.kernel.user_ns = local_ns
861 854 app.start()
862 855
863 856 def main():
864 857 """Run an IPKernel as an application"""
865 858 app = IPKernelApp.instance()
866 859 app.initialize()
867 860 app.start()
868 861
869 862
870 863 if __name__ == '__main__':
871 864 main()
General Comments 0
You need to be logged in to leave comments. Login now