##// END OF EJS Templates
move apply serialization into zmq.serialize
MinRK -
Show More
@@ -0,0 +1,179 b''
1 """serialization utilities for apply messages
2
3 Authors:
4
5 * Min RK
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 # Standard library imports
19 import logging
20 import os
21 import re
22 import socket
23 import sys
24
25 try:
26 import cPickle
27 pickle = cPickle
28 except:
29 cPickle = None
30 import pickle
31
32
33 # IPython imports
34 from IPython.utils import py3compat
35 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
36 from IPython.utils.newserialized import serialize, unserialize
37
38 if py3compat.PY3:
39 buffer = memoryview
40
41 #-----------------------------------------------------------------------------
42 # Serialization Functions
43 #-----------------------------------------------------------------------------
44
45 def serialize_object(obj, threshold=64e-6):
46 """Serialize an object into a list of sendable buffers.
47
48 Parameters
49 ----------
50
51 obj : object
52 The object to be serialized
53 threshold : float
54 The threshold for not double-pickling the content.
55
56
57 Returns
58 -------
59 ('pmd', [bufs]) :
60 where pmd is the pickled metadata wrapper,
61 bufs is a list of data buffers
62 """
63 databuffers = []
64 if isinstance(obj, (list, tuple)):
65 clist = canSequence(obj)
66 slist = map(serialize, clist)
67 for s in slist:
68 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
69 databuffers.append(s.getData())
70 s.data = None
71 return pickle.dumps(slist,-1), databuffers
72 elif isinstance(obj, dict):
73 sobj = {}
74 for k in sorted(obj.iterkeys()):
75 s = serialize(can(obj[k]))
76 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
77 databuffers.append(s.getData())
78 s.data = None
79 sobj[k] = s
80 return pickle.dumps(sobj,-1),databuffers
81 else:
82 s = serialize(can(obj))
83 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
84 databuffers.append(s.getData())
85 s.data = None
86 return pickle.dumps(s,-1),databuffers
87
88
89 def unserialize_object(bufs):
90 """reconstruct an object serialized by serialize_object from data buffers."""
91 bufs = list(bufs)
92 sobj = pickle.loads(bufs.pop(0))
93 if isinstance(sobj, (list, tuple)):
94 for s in sobj:
95 if s.data is None:
96 s.data = bufs.pop(0)
97 return uncanSequence(map(unserialize, sobj)), bufs
98 elif isinstance(sobj, dict):
99 newobj = {}
100 for k in sorted(sobj.iterkeys()):
101 s = sobj[k]
102 if s.data is None:
103 s.data = bufs.pop(0)
104 newobj[k] = uncan(unserialize(s))
105 return newobj, bufs
106 else:
107 if sobj.data is None:
108 sobj.data = bufs.pop(0)
109 return uncan(unserialize(sobj)), bufs
110
111 def pack_apply_message(f, args, kwargs, threshold=64e-6):
112 """pack up a function, args, and kwargs to be sent over the wire
113 as a series of buffers. Any object whose data is larger than `threshold`
114 will not have their data copied (currently only numpy arrays support zero-copy)"""
115 msg = [pickle.dumps(can(f),-1)]
116 databuffers = [] # for large objects
117 sargs, bufs = serialize_object(args,threshold)
118 msg.append(sargs)
119 databuffers.extend(bufs)
120 skwargs, bufs = serialize_object(kwargs,threshold)
121 msg.append(skwargs)
122 databuffers.extend(bufs)
123 msg.extend(databuffers)
124 return msg
125
126 def unpack_apply_message(bufs, g=None, copy=True):
127 """unpack f,args,kwargs from buffers packed by pack_apply_message()
128 Returns: original f,args,kwargs"""
129 bufs = list(bufs) # allow us to pop
130 assert len(bufs) >= 3, "not enough buffers!"
131 if not copy:
132 for i in range(3):
133 bufs[i] = bufs[i].bytes
134 cf = pickle.loads(bufs.pop(0))
135 sargs = list(pickle.loads(bufs.pop(0)))
136 skwargs = dict(pickle.loads(bufs.pop(0)))
137 # print sargs, skwargs
138 f = uncan(cf, g)
139 for sa in sargs:
140 if sa.data is None:
141 m = bufs.pop(0)
142 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
143 # always use a buffer, until memoryviews get sorted out
144 sa.data = buffer(m)
145 # disable memoryview support
146 # if copy:
147 # sa.data = buffer(m)
148 # else:
149 # sa.data = m.buffer
150 else:
151 if copy:
152 sa.data = m
153 else:
154 sa.data = m.bytes
155
156 args = uncanSequence(map(unserialize, sargs), g)
157 kwargs = {}
158 for k in sorted(skwargs.iterkeys()):
159 sa = skwargs[k]
160 if sa.data is None:
161 m = bufs.pop(0)
162 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
163 # always use a buffer, until memoryviews get sorted out
164 sa.data = buffer(m)
165 # disable memoryview support
166 # if copy:
167 # sa.data = buffer(m)
168 # else:
169 # sa.data = m.buffer
170 else:
171 if copy:
172 sa.data = m
173 else:
174 sa.data = m.bytes
175
176 kwargs[k] = uncan(unserialize(sa), g)
177
178 return f,args,kwargs
179
@@ -1,495 +1,364 b''
1 1 """some generic utilities for dealing with classes, urls, and serialization
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23 import socket
24 24 import sys
25 25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 26 try:
27 27 from signal import SIGKILL
28 28 except ImportError:
29 29 SIGKILL=None
30 30
31 31 try:
32 32 import cPickle
33 33 pickle = cPickle
34 34 except:
35 35 cPickle = None
36 36 import pickle
37 37
38 38 # System library imports
39 39 import zmq
40 40 from zmq.log import handlers
41 41
42 42 from IPython.external.decorator import decorator
43 43
44 44 # IPython imports
45 45 from IPython.config.application import Application
46 46 from IPython.utils import py3compat
47 47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
48 48 from IPython.utils.newserialized import serialize, unserialize
49 49 from IPython.zmq.log import EnginePUBHandler
50 from IPython.zmq.serialize import (
51 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
52 )
50 53
51 54 if py3compat.PY3:
52 55 buffer = memoryview
53 56
54 57 #-----------------------------------------------------------------------------
55 58 # Classes
56 59 #-----------------------------------------------------------------------------
57 60
58 61 class Namespace(dict):
59 62 """Subclass of dict for attribute access to keys."""
60 63
61 64 def __getattr__(self, key):
62 65 """getattr aliased to getitem"""
63 66 if key in self.iterkeys():
64 67 return self[key]
65 68 else:
66 69 raise NameError(key)
67 70
68 71 def __setattr__(self, key, value):
69 72 """setattr aliased to setitem, with strict"""
70 73 if hasattr(dict, key):
71 74 raise KeyError("Cannot override dict keys %r"%key)
72 75 self[key] = value
73 76
74 77
75 78 class ReverseDict(dict):
76 79 """simple double-keyed subset of dict methods."""
77 80
78 81 def __init__(self, *args, **kwargs):
79 82 dict.__init__(self, *args, **kwargs)
80 83 self._reverse = dict()
81 84 for key, value in self.iteritems():
82 85 self._reverse[value] = key
83 86
84 87 def __getitem__(self, key):
85 88 try:
86 89 return dict.__getitem__(self, key)
87 90 except KeyError:
88 91 return self._reverse[key]
89 92
90 93 def __setitem__(self, key, value):
91 94 if key in self._reverse:
92 95 raise KeyError("Can't have key %r on both sides!"%key)
93 96 dict.__setitem__(self, key, value)
94 97 self._reverse[value] = key
95 98
96 99 def pop(self, key):
97 100 value = dict.pop(self, key)
98 101 self._reverse.pop(value)
99 102 return value
100 103
101 104 def get(self, key, default=None):
102 105 try:
103 106 return self[key]
104 107 except KeyError:
105 108 return default
106 109
107 110 #-----------------------------------------------------------------------------
108 111 # Functions
109 112 #-----------------------------------------------------------------------------
110 113
111 114 @decorator
112 115 def log_errors(f, self, *args, **kwargs):
113 116 """decorator to log unhandled exceptions raised in a method.
114 117
115 118 For use wrapping on_recv callbacks, so that exceptions
116 119 do not cause the stream to be closed.
117 120 """
118 121 try:
119 122 return f(self, *args, **kwargs)
120 123 except Exception:
121 124 self.log.error("Uncaught exception in %r" % f, exc_info=True)
122 125
123 126
124 127 def asbytes(s):
125 128 """ensure that an object is ascii bytes"""
126 129 if isinstance(s, unicode):
127 130 s = s.encode('ascii')
128 131 return s
129 132
130 133 def is_url(url):
131 134 """boolean check for whether a string is a zmq url"""
132 135 if '://' not in url:
133 136 return False
134 137 proto, addr = url.split('://', 1)
135 138 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
136 139 return False
137 140 return True
138 141
139 142 def validate_url(url):
140 143 """validate a url for zeromq"""
141 144 if not isinstance(url, basestring):
142 145 raise TypeError("url must be a string, not %r"%type(url))
143 146 url = url.lower()
144 147
145 148 proto_addr = url.split('://')
146 149 assert len(proto_addr) == 2, 'Invalid url: %r'%url
147 150 proto, addr = proto_addr
148 151 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
149 152
150 153 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
151 154 # author: Remi Sabourin
152 155 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
153 156
154 157 if proto == 'tcp':
155 158 lis = addr.split(':')
156 159 assert len(lis) == 2, 'Invalid url: %r'%url
157 160 addr,s_port = lis
158 161 try:
159 162 port = int(s_port)
160 163 except ValueError:
161 164 raise AssertionError("Invalid port %r in url: %r"%(port, url))
162 165
163 166 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
164 167
165 168 else:
166 169 # only validate tcp urls currently
167 170 pass
168 171
169 172 return True
170 173
171 174
172 175 def validate_url_container(container):
173 176 """validate a potentially nested collection of urls."""
174 177 if isinstance(container, basestring):
175 178 url = container
176 179 return validate_url(url)
177 180 elif isinstance(container, dict):
178 181 container = container.itervalues()
179 182
180 183 for element in container:
181 184 validate_url_container(element)
182 185
183 186
184 187 def split_url(url):
185 188 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
186 189 proto_addr = url.split('://')
187 190 assert len(proto_addr) == 2, 'Invalid url: %r'%url
188 191 proto, addr = proto_addr
189 192 lis = addr.split(':')
190 193 assert len(lis) == 2, 'Invalid url: %r'%url
191 194 addr,s_port = lis
192 195 return proto,addr,s_port
193 196
194 197 def disambiguate_ip_address(ip, location=None):
195 198 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
196 199 ones, based on the location (default interpretation of location is localhost)."""
197 200 if ip in ('0.0.0.0', '*'):
198 201 try:
199 202 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
200 203 except (socket.gaierror, IndexError):
201 204 # couldn't identify this machine, assume localhost
202 205 external_ips = []
203 206 if location is None or location in external_ips or not external_ips:
204 207 # If location is unspecified or cannot be determined, assume local
205 208 ip='127.0.0.1'
206 209 elif location:
207 210 return location
208 211 return ip
209 212
210 213 def disambiguate_url(url, location=None):
211 214 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
212 215 ones, based on the location (default interpretation is localhost).
213 216
214 217 This is for zeromq urls, such as tcp://*:10101."""
215 218 try:
216 219 proto,ip,port = split_url(url)
217 220 except AssertionError:
218 221 # probably not tcp url; could be ipc, etc.
219 222 return url
220 223
221 224 ip = disambiguate_ip_address(ip,location)
222 225
223 226 return "%s://%s:%s"%(proto,ip,port)
224 227
225 def serialize_object(obj, threshold=64e-6):
226 """Serialize an object into a list of sendable buffers.
227
228 Parameters
229 ----------
230
231 obj : object
232 The object to be serialized
233 threshold : float
234 The threshold for not double-pickling the content.
235
236
237 Returns
238 -------
239 ('pmd', [bufs]) :
240 where pmd is the pickled metadata wrapper,
241 bufs is a list of data buffers
242 """
243 databuffers = []
244 if isinstance(obj, (list, tuple)):
245 clist = canSequence(obj)
246 slist = map(serialize, clist)
247 for s in slist:
248 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
249 databuffers.append(s.getData())
250 s.data = None
251 return pickle.dumps(slist,-1), databuffers
252 elif isinstance(obj, dict):
253 sobj = {}
254 for k in sorted(obj.iterkeys()):
255 s = serialize(can(obj[k]))
256 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
257 databuffers.append(s.getData())
258 s.data = None
259 sobj[k] = s
260 return pickle.dumps(sobj,-1),databuffers
261 else:
262 s = serialize(can(obj))
263 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
264 databuffers.append(s.getData())
265 s.data = None
266 return pickle.dumps(s,-1),databuffers
267
268
269 def unserialize_object(bufs):
270 """reconstruct an object serialized by serialize_object from data buffers."""
271 bufs = list(bufs)
272 sobj = pickle.loads(bufs.pop(0))
273 if isinstance(sobj, (list, tuple)):
274 for s in sobj:
275 if s.data is None:
276 s.data = bufs.pop(0)
277 return uncanSequence(map(unserialize, sobj)), bufs
278 elif isinstance(sobj, dict):
279 newobj = {}
280 for k in sorted(sobj.iterkeys()):
281 s = sobj[k]
282 if s.data is None:
283 s.data = bufs.pop(0)
284 newobj[k] = uncan(unserialize(s))
285 return newobj, bufs
286 else:
287 if sobj.data is None:
288 sobj.data = bufs.pop(0)
289 return uncan(unserialize(sobj)), bufs
290
291 def pack_apply_message(f, args, kwargs, threshold=64e-6):
292 """pack up a function, args, and kwargs to be sent over the wire
293 as a series of buffers. Any object whose data is larger than `threshold`
294 will not have their data copied (currently only numpy arrays support zero-copy)"""
295 msg = [pickle.dumps(can(f),-1)]
296 databuffers = [] # for large objects
297 sargs, bufs = serialize_object(args,threshold)
298 msg.append(sargs)
299 databuffers.extend(bufs)
300 skwargs, bufs = serialize_object(kwargs,threshold)
301 msg.append(skwargs)
302 databuffers.extend(bufs)
303 msg.extend(databuffers)
304 return msg
305
306 def unpack_apply_message(bufs, g=None, copy=True):
307 """unpack f,args,kwargs from buffers packed by pack_apply_message()
308 Returns: original f,args,kwargs"""
309 bufs = list(bufs) # allow us to pop
310 assert len(bufs) >= 3, "not enough buffers!"
311 if not copy:
312 for i in range(3):
313 bufs[i] = bufs[i].bytes
314 cf = pickle.loads(bufs.pop(0))
315 sargs = list(pickle.loads(bufs.pop(0)))
316 skwargs = dict(pickle.loads(bufs.pop(0)))
317 # print sargs, skwargs
318 f = uncan(cf, g)
319 for sa in sargs:
320 if sa.data is None:
321 m = bufs.pop(0)
322 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
323 # always use a buffer, until memoryviews get sorted out
324 sa.data = buffer(m)
325 # disable memoryview support
326 # if copy:
327 # sa.data = buffer(m)
328 # else:
329 # sa.data = m.buffer
330 else:
331 if copy:
332 sa.data = m
333 else:
334 sa.data = m.bytes
335
336 args = uncanSequence(map(unserialize, sargs), g)
337 kwargs = {}
338 for k in sorted(skwargs.iterkeys()):
339 sa = skwargs[k]
340 if sa.data is None:
341 m = bufs.pop(0)
342 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
343 # always use a buffer, until memoryviews get sorted out
344 sa.data = buffer(m)
345 # disable memoryview support
346 # if copy:
347 # sa.data = buffer(m)
348 # else:
349 # sa.data = m.buffer
350 else:
351 if copy:
352 sa.data = m
353 else:
354 sa.data = m.bytes
355
356 kwargs[k] = uncan(unserialize(sa), g)
357
358 return f,args,kwargs
359 228
360 229 #--------------------------------------------------------------------------
361 230 # helpers for implementing old MEC API via view.apply
362 231 #--------------------------------------------------------------------------
363 232
364 233 def interactive(f):
365 234 """decorator for making functions appear as interactively defined.
366 235 This results in the function being linked to the user_ns as globals()
367 236 instead of the module globals().
368 237 """
369 238 f.__module__ = '__main__'
370 239 return f
371 240
372 241 @interactive
373 242 def _push(**ns):
374 243 """helper method for implementing `client.push` via `client.apply`"""
375 244 globals().update(ns)
376 245
377 246 @interactive
378 247 def _pull(keys):
379 248 """helper method for implementing `client.pull` via `client.apply`"""
380 249 user_ns = globals()
381 250 if isinstance(keys, (list,tuple, set)):
382 251 for key in keys:
383 252 if not user_ns.has_key(key):
384 253 raise NameError("name '%s' is not defined"%key)
385 254 return map(user_ns.get, keys)
386 255 else:
387 256 if not user_ns.has_key(keys):
388 257 raise NameError("name '%s' is not defined"%keys)
389 258 return user_ns.get(keys)
390 259
391 260 @interactive
392 261 def _execute(code):
393 262 """helper method for implementing `client.execute` via `client.apply`"""
394 263 exec code in globals()
395 264
396 265 #--------------------------------------------------------------------------
397 266 # extra process management utilities
398 267 #--------------------------------------------------------------------------
399 268
400 269 _random_ports = set()
401 270
402 271 def select_random_ports(n):
403 272 """Selects and return n random ports that are available."""
404 273 ports = []
405 274 for i in xrange(n):
406 275 sock = socket.socket()
407 276 sock.bind(('', 0))
408 277 while sock.getsockname()[1] in _random_ports:
409 278 sock.close()
410 279 sock = socket.socket()
411 280 sock.bind(('', 0))
412 281 ports.append(sock)
413 282 for i, sock in enumerate(ports):
414 283 port = sock.getsockname()[1]
415 284 sock.close()
416 285 ports[i] = port
417 286 _random_ports.add(port)
418 287 return ports
419 288
420 289 def signal_children(children):
421 290 """Relay interupt/term signals to children, for more solid process cleanup."""
422 291 def terminate_children(sig, frame):
423 292 log = Application.instance().log
424 293 log.critical("Got signal %i, terminating children..."%sig)
425 294 for child in children:
426 295 child.terminate()
427 296
428 297 sys.exit(sig != SIGINT)
429 298 # sys.exit(sig)
430 299 for sig in (SIGINT, SIGABRT, SIGTERM):
431 300 signal(sig, terminate_children)
432 301
433 302 def generate_exec_key(keyfile):
434 303 import uuid
435 304 newkey = str(uuid.uuid4())
436 305 with open(keyfile, 'w') as f:
437 306 # f.write('ipython-key ')
438 307 f.write(newkey+'\n')
439 308 # set user-only RW permissions (0600)
440 309 # this will have no effect on Windows
441 310 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
442 311
443 312
444 313 def integer_loglevel(loglevel):
445 314 try:
446 315 loglevel = int(loglevel)
447 316 except ValueError:
448 317 if isinstance(loglevel, str):
449 318 loglevel = getattr(logging, loglevel)
450 319 return loglevel
451 320
452 321 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
453 322 logger = logging.getLogger(logname)
454 323 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
455 324 # don't add a second PUBHandler
456 325 return
457 326 loglevel = integer_loglevel(loglevel)
458 327 lsock = context.socket(zmq.PUB)
459 328 lsock.connect(iface)
460 329 handler = handlers.PUBHandler(lsock)
461 330 handler.setLevel(loglevel)
462 331 handler.root_topic = root
463 332 logger.addHandler(handler)
464 333 logger.setLevel(loglevel)
465 334
466 335 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
467 336 logger = logging.getLogger()
468 337 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
469 338 # don't add a second PUBHandler
470 339 return
471 340 loglevel = integer_loglevel(loglevel)
472 341 lsock = context.socket(zmq.PUB)
473 342 lsock.connect(iface)
474 343 handler = EnginePUBHandler(engine, lsock)
475 344 handler.setLevel(loglevel)
476 345 logger.addHandler(handler)
477 346 logger.setLevel(loglevel)
478 347 return logger
479 348
480 349 def local_logger(logname, loglevel=logging.DEBUG):
481 350 loglevel = integer_loglevel(loglevel)
482 351 logger = logging.getLogger(logname)
483 352 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
484 353 # don't add a second StreamHandler
485 354 return
486 355 handler = logging.StreamHandler()
487 356 handler.setLevel(loglevel)
488 357 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
489 358 datefmt="%Y-%m-%d %H:%M:%S")
490 359 handler.setFormatter(formatter)
491 360
492 361 logger.addHandler(handler)
493 362 logger.setLevel(loglevel)
494 363 return logger
495 364
@@ -1,864 +1,868 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 import uuid
26
27 from datetime import datetime
25 28 from signal import (
26 29 signal, default_int_handler, SIGINT, SIG_IGN
27 30 )
28 31 # System library imports.
29 32 import zmq
30 33
31 34 # Local imports.
32 35 from IPython.core import pylabtools
33 36 from IPython.config.configurable import Configurable
34 37 from IPython.config.application import boolean_flag, catch_config_error
35 38 from IPython.core.application import ProfileDir
36 39 from IPython.core.error import StdinNotImplementedError
37 40 from IPython.core.shellapp import (
38 41 InteractiveShellApp, shell_flags, shell_aliases
39 42 )
40 43 from IPython.utils import io
41 44 from IPython.utils import py3compat
42 45 from IPython.utils.frame import extract_module_locals
43 46 from IPython.utils.jsonutil import json_clean
44 47 from IPython.utils.traitlets import (
45 48 Any, Instance, Float, Dict, CaselessStrEnum, List, Set
46 49 )
47 50
48 51 from entry_point import base_launch_kernel
49 52 from kernelapp import KernelApp, kernel_flags, kernel_aliases
53 from serialize import serialize_object, unpack_apply_message
50 54 from session import Session, Message
51 55 from zmqshell import ZMQInteractiveShell
52 56
53 57
54 58 #-----------------------------------------------------------------------------
55 59 # Main kernel class
56 60 #-----------------------------------------------------------------------------
57 61
58 62 class Kernel(Configurable):
59 63
60 64 #---------------------------------------------------------------------------
61 65 # Kernel interface
62 66 #---------------------------------------------------------------------------
63 67
64 68 # attribute to override with a GUI
65 69 eventloop = Any(None)
66 70
67 71 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
68 72 session = Instance(Session)
69 73 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 74 shell_sockets = List()
71 75 control_socket = Instance('zmq.Socket')
72 76 iopub_socket = Instance('zmq.Socket')
73 77 log = Instance(logging.Logger)
74 78
75 79 user_module = Instance('types.ModuleType')
76 80 def _user_module_changed(self, name, old, new):
77 81 if self.shell is not None:
78 82 self.shell.user_module = new
79 83
80 84 user_ns = Dict(default_value=None)
81 85 def _user_ns_changed(self, name, old, new):
82 86 if self.shell is not None:
83 87 self.shell.user_ns = new
84 88 self.shell.init_user_ns()
85 89
86 90 # Private interface
87 91
88 92 # Time to sleep after flushing the stdout/err buffers in each execute
89 93 # cycle. While this introduces a hard limit on the minimal latency of the
90 94 # execute cycle, it helps prevent output synchronization problems for
91 95 # clients.
92 96 # Units are in seconds. The minimum zmq latency on local host is probably
93 97 # ~150 microseconds, set this to 500us for now. We may need to increase it
94 98 # a little if it's not enough after more interactive testing.
95 99 _execute_sleep = Float(0.0005, config=True)
96 100
97 101 # Frequency of the kernel's event loop.
98 102 # Units are in seconds, kernel subclasses for GUI toolkits may need to
99 103 # adapt to milliseconds.
100 104 _poll_interval = Float(0.05, config=True)
101 105
102 106 # If the shutdown was requested over the network, we leave here the
103 107 # necessary reply message so it can be sent by our registered atexit
104 108 # handler. This ensures that the reply is only sent to clients truly at
105 109 # the end of our shutdown process (which happens after the underlying
106 110 # IPython shell's own shutdown).
107 111 _shutdown_message = None
108 112
109 113 # This is a dict of port number that the kernel is listening on. It is set
110 114 # by record_ports and used by connect_request.
111 115 _recorded_ports = Dict()
112 116
113 117 # set of aborted msg_ids
114 118 aborted = Set()
115 119
116 120
117 121
118 122 def __init__(self, **kwargs):
119 123 super(Kernel, self).__init__(**kwargs)
120 124
121 125 # Before we even start up the shell, register *first* our exit handlers
122 126 # so they come before the shell's
123 127 atexit.register(self._at_shutdown)
124 128
125 129 # Initialize the InteractiveShell subclass
126 130 self.shell = ZMQInteractiveShell.instance(config=self.config,
127 131 profile_dir = self.profile_dir,
128 132 user_module = self.user_module,
129 133 user_ns = self.user_ns,
130 134 )
131 135 self.shell.displayhook.session = self.session
132 136 self.shell.displayhook.pub_socket = self.iopub_socket
133 137 self.shell.display_pub.session = self.session
134 138 self.shell.display_pub.pub_socket = self.iopub_socket
135 139
136 140 # TMP - hack while developing
137 141 self.shell._reply_content = None
138 142
139 143 # Build dict of handlers for message types
140 144 msg_types = [ 'execute_request', 'complete_request',
141 145 'object_info_request', 'history_request',
142 146 'connect_request', 'shutdown_request',
143 147 'apply_request',
144 148 ]
145 149 self.handlers = {}
146 150 for msg_type in msg_types:
147 151 self.handlers[msg_type] = getattr(self, msg_type)
148 152
149 153 control_msg_types = [ 'clear_request', 'abort_request' ]
150 154 self.control_handlers = {}
151 155 for msg_type in control_msg_types:
152 156 self.control_handlers[msg_type] = getattr(self, msg_type)
153 157
154 158 def do_one_iteration(self):
155 159 """Do one iteration of the kernel's evaluation loop.
156 160 """
157 161
158 162 # always flush control socket first
159 163 while True:
160 164 if self.control_socket is None:
161 165 break
162 166 try:
163 167 idents,msg = self.session.recv(self.control_socket, zmq.NOBLOCK)
164 168 except Exception:
165 169 self.log.warn("Invalid Control Message:", exc_info=True)
166 170 continue
167 171 if msg is None:
168 172 break
169 173 self.dispatch_message(self.control_socket, idents, msg, self.control_handlers)
170 174
171 175 for socket in self.shell_sockets:
172 176 try:
173 177 idents,msg = self.session.recv(socket, zmq.NOBLOCK, copy=False)
174 178 except Exception:
175 179 self.log.warn("Invalid Message:", exc_info=True)
176 180 continue
177 181
178 182 if msg is None:
179 183 continue
180 184
181 185 self.dispatch_message(socket, idents, msg, self.handlers)
182 186
183 187 def dispatch_message(self, socket, idents, msg, handlers):
184 188 msg_type = msg['header']['msg_type']
185 189 msg_id = msg['header']['msg_id']
186 190
187 191 # This assert will raise in versions of zeromq 2.0.7 and lesser.
188 192 # We now require 2.0.8 or above, so we can uncomment for safety.
189 193 # print(ident,msg, file=sys.__stdout__)
190 194 assert idents is not None, "Missing message part."
191 195
192 196 # Print some info about this message and leave a '--->' marker, so it's
193 197 # easier to trace visually the message chain when debugging. Each
194 198 # handler prints its message at the end.
195 199 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
196 200 self.log.debug(' Content: %s\n --->\n ', msg['content'])
197 201
198 202 # check if request has been aborted
199 203 if msg_id in self.aborted:
200 204 self.aborted.remove(msg_id)
201 205 # is it safe to assume a msg_id will not be resubmitted?
202 206 reply_type = msg_type.split('_')[0] + '_reply'
203 207 status = {'status' : 'aborted'}
204 208 reply_msg = self.session.send(socket, reply_type, subheader=status,
205 209 content=status, parent=msg, ident=idents)
206 210 return
207 211
208 212
209 213 # Find and call actual handler for message
210 214 handler = handlers.get(msg_type, None)
211 215 if handler is None:
212 216 self.log.error("UNKNOWN MESSAGE TYPE: %s", msg)
213 217 else:
214 218 handler(socket, idents, msg)
215 219
216 220 # Check whether we should exit, in case the incoming message set the
217 221 # exit flag on
218 222 if self.shell.exit_now:
219 223 self.log.debug('\nExiting IPython kernel...')
220 224 # We do a normal, clean exit, which allows any actions registered
221 225 # via atexit (such as history saving) to take place.
222 226 sys.exit(0)
223 227
224 228
225 229 def start(self):
226 230 """ Start the kernel main loop.
227 231 """
228 232 # a KeyboardInterrupt (SIGINT) can occur on any python statement, so
229 233 # let's ignore (SIG_IGN) them until we're in a place to handle them properly
230 234 signal(SIGINT,SIG_IGN)
231 235 poller = zmq.Poller()
232 236 for socket in self.shell_sockets:
233 237 poller.register(socket, zmq.POLLIN)
234 238 if self.control_socket:
235 239 poller.register(self.control_socket, zmq.POLLIN)
236 240
237 241 # loop while self.eventloop has not been overridden
238 242 while self.eventloop is None:
239 243 try:
240 244 # scale by extra factor of 10, because there is no
241 245 # reason for this to be anything less than ~ 0.1s
242 246 # since it is a real poller and will respond
243 247 # to events immediately
244 248
245 249 # double nested try/except, to properly catch KeyboardInterrupt
246 250 # due to pyzmq Issue #130
247 251 try:
248 252 poller.poll(10*1000*self._poll_interval)
249 253 # restore raising of KeyboardInterrupt
250 254 signal(SIGINT, default_int_handler)
251 255 self.do_one_iteration()
252 256 except:
253 257 raise
254 258 finally:
255 259 # prevent raising of KeyboardInterrupt
256 260 signal(SIGINT,SIG_IGN)
257 261 except KeyboardInterrupt:
258 262 # Ctrl-C shouldn't crash the kernel
259 263 io.raw_print("KeyboardInterrupt caught in kernel")
260 264 # stop ignoring sigint, now that we are out of our own loop,
261 265 # we don't want to prevent future code from handling it
262 266 signal(SIGINT, default_int_handler)
263 267 while self.eventloop is not None:
264 268 try:
265 269 self.eventloop(self)
266 270 except KeyboardInterrupt:
267 271 # Ctrl-C shouldn't crash the kernel
268 272 io.raw_print("KeyboardInterrupt caught in kernel")
269 273 continue
270 274 else:
271 275 # eventloop exited cleanly, this means we should stop (right?)
272 276 self.eventloop = None
273 277 break
274 278
275 279
276 280 def record_ports(self, ports):
277 281 """Record the ports that this kernel is using.
278 282
279 283 The creator of the Kernel instance must call this methods if they
280 284 want the :meth:`connect_request` method to return the port numbers.
281 285 """
282 286 self._recorded_ports = ports
283 287
284 288 #---------------------------------------------------------------------------
285 289 # Kernel request handlers
286 290 #---------------------------------------------------------------------------
287 291
288 292 def _publish_pyin(self, code, parent, execution_count):
289 293 """Publish the code request on the pyin stream."""
290 294
291 295 self.session.send(self.iopub_socket, u'pyin', {u'code':code,
292 296 u'execution_count': execution_count}, parent=parent)
293 297
294 298 def execute_request(self, socket, ident, parent):
295 299
296 300 self.session.send(self.iopub_socket,
297 301 u'status',
298 302 {u'execution_state':u'busy'},
299 303 parent=parent )
300 304
301 305 try:
302 306 content = parent[u'content']
303 307 code = content[u'code']
304 308 silent = content[u'silent']
305 309 except:
306 310 self.log.error("Got bad msg: ")
307 311 self.log.error("%s", parent)
308 312 return
309 313
310 314 shell = self.shell # we'll need this a lot here
311 315
312 316 # Replace raw_input. Note that is not sufficient to replace
313 317 # raw_input in the user namespace.
314 318 if content.get('allow_stdin', False):
315 319 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
316 320 else:
317 321 raw_input = lambda prompt='' : self._no_raw_input()
318 322
319 323 if py3compat.PY3:
320 324 __builtin__.input = raw_input
321 325 else:
322 326 __builtin__.raw_input = raw_input
323 327
324 328 # Set the parent message of the display hook and out streams.
325 329 shell.displayhook.set_parent(parent)
326 330 shell.display_pub.set_parent(parent)
327 331 sys.stdout.set_parent(parent)
328 332 sys.stderr.set_parent(parent)
329 333
330 334 # Re-broadcast our input for the benefit of listening clients, and
331 335 # start computing output
332 336 if not silent:
333 337 self._publish_pyin(code, parent, shell.execution_count)
334 338
335 339 reply_content = {}
336 340 try:
337 341 if silent:
338 342 # run_code uses 'exec' mode, so no displayhook will fire, and it
339 343 # doesn't call logging or history manipulations. Print
340 344 # statements in that code will obviously still execute.
341 345 shell.run_code(code)
342 346 else:
343 347 # FIXME: the shell calls the exception handler itself.
344 348 shell.run_cell(code, store_history=True)
345 349 except:
346 350 status = u'error'
347 351 # FIXME: this code right now isn't being used yet by default,
348 352 # because the run_cell() call above directly fires off exception
349 353 # reporting. This code, therefore, is only active in the scenario
350 354 # where runlines itself has an unhandled exception. We need to
351 355 # uniformize this, for all exception construction to come from a
352 356 # single location in the codbase.
353 357 etype, evalue, tb = sys.exc_info()
354 358 tb_list = traceback.format_exception(etype, evalue, tb)
355 359 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
356 360 else:
357 361 status = u'ok'
358 362
359 363 reply_content[u'status'] = status
360 364
361 365 # Return the execution counter so clients can display prompts
362 366 reply_content['execution_count'] = shell.execution_count -1
363 367
364 368 # FIXME - fish exception info out of shell, possibly left there by
365 369 # runlines. We'll need to clean up this logic later.
366 370 if shell._reply_content is not None:
367 371 reply_content.update(shell._reply_content)
368 372 # reset after use
369 373 shell._reply_content = None
370 374
371 375 # At this point, we can tell whether the main code execution succeeded
372 376 # or not. If it did, we proceed to evaluate user_variables/expressions
373 377 if reply_content['status'] == 'ok':
374 378 reply_content[u'user_variables'] = \
375 379 shell.user_variables(content[u'user_variables'])
376 380 reply_content[u'user_expressions'] = \
377 381 shell.user_expressions(content[u'user_expressions'])
378 382 else:
379 383 # If there was an error, don't even try to compute variables or
380 384 # expressions
381 385 reply_content[u'user_variables'] = {}
382 386 reply_content[u'user_expressions'] = {}
383 387
384 388 # Payloads should be retrieved regardless of outcome, so we can both
385 389 # recover partial output (that could have been generated early in a
386 390 # block, before an error) and clear the payload system always.
387 391 reply_content[u'payload'] = shell.payload_manager.read_payload()
388 392 # Be agressive about clearing the payload because we don't want
389 393 # it to sit in memory until the next execute_request comes in.
390 394 shell.payload_manager.clear_payload()
391 395
392 396 # Flush output before sending the reply.
393 397 sys.stdout.flush()
394 398 sys.stderr.flush()
395 399 # FIXME: on rare occasions, the flush doesn't seem to make it to the
396 400 # clients... This seems to mitigate the problem, but we definitely need
397 401 # to better understand what's going on.
398 402 if self._execute_sleep:
399 403 time.sleep(self._execute_sleep)
400 404
401 405 # Send the reply.
402 406 reply_content = json_clean(reply_content)
403 407 reply_msg = self.session.send(socket, u'execute_reply',
404 408 reply_content, parent, ident=ident)
405 409 self.log.debug("%s", reply_msg)
406 410
407 411 if reply_msg['content']['status'] == u'error':
408 412 self._abort_queues()
409 413
410 414 self.session.send(self.iopub_socket,
411 415 u'status',
412 416 {u'execution_state':u'idle'},
413 417 parent=parent )
414 418
415 419 def complete_request(self, socket, ident, parent):
416 420 txt, matches = self._complete(parent)
417 421 matches = {'matches' : matches,
418 422 'matched_text' : txt,
419 423 'status' : 'ok'}
420 424 matches = json_clean(matches)
421 425 completion_msg = self.session.send(socket, 'complete_reply',
422 426 matches, parent, ident)
423 427 self.log.debug("%s", completion_msg)
424 428
425 429 def object_info_request(self, socket, ident, parent):
426 430 content = parent['content']
427 431 object_info = self.shell.object_inspect(content['oname'],
428 432 detail_level = content.get('detail_level', 0)
429 433 )
430 434 # Before we send this object over, we scrub it for JSON usage
431 435 oinfo = json_clean(object_info)
432 436 msg = self.session.send(socket, 'object_info_reply',
433 437 oinfo, parent, ident)
434 438 self.log.debug("%s", msg)
435 439
436 440 def history_request(self, socket, ident, parent):
437 441 # We need to pull these out, as passing **kwargs doesn't work with
438 442 # unicode keys before Python 2.6.5.
439 443 hist_access_type = parent['content']['hist_access_type']
440 444 raw = parent['content']['raw']
441 445 output = parent['content']['output']
442 446 if hist_access_type == 'tail':
443 447 n = parent['content']['n']
444 448 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
445 449 include_latest=True)
446 450
447 451 elif hist_access_type == 'range':
448 452 session = parent['content']['session']
449 453 start = parent['content']['start']
450 454 stop = parent['content']['stop']
451 455 hist = self.shell.history_manager.get_range(session, start, stop,
452 456 raw=raw, output=output)
453 457
454 458 elif hist_access_type == 'search':
455 459 pattern = parent['content']['pattern']
456 460 hist = self.shell.history_manager.search(pattern, raw=raw,
457 461 output=output)
458 462
459 463 else:
460 464 hist = []
461 465 hist = list(hist)
462 466 content = {'history' : hist}
463 467 content = json_clean(content)
464 468 msg = self.session.send(socket, 'history_reply',
465 469 content, parent, ident)
466 470 self.log.debug("Sending history reply with %i entries", len(hist))
467 471
468 472 def connect_request(self, socket, ident, parent):
469 473 if self._recorded_ports is not None:
470 474 content = self._recorded_ports.copy()
471 475 else:
472 476 content = {}
473 477 msg = self.session.send(socket, 'connect_reply',
474 478 content, parent, ident)
475 479 self.log.debug("%s", msg)
476 480
477 481 def shutdown_request(self, ident, parent):
478 482 self.shell.exit_now = True
479 483 self._shutdown_message = self.session.msg(u'shutdown_reply',
480 484 parent['content'], parent)
481 485 sys.exit(0)
482 486
483 487 #---------------------------------------------------------------------------
484 488 # Engine methods
485 489 #---------------------------------------------------------------------------
486 490
487 491 def apply_request(self, socket, ident, parent):
488 492 try:
489 493 content = parent[u'content']
490 494 bufs = parent[u'buffers']
491 495 msg_id = parent['header']['msg_id']
492 496 # bound = parent['header'].get('bound', False)
493 497 except:
494 498 self.log.error("Got bad msg: %s", parent, exc_info=True)
495 499 return
496 500 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
497 501 # self.iopub_stream.send(pyin_msg)
498 502 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
499 503 sub = {'dependencies_met' : True, 'engine' : self.ident,
500 504 'started': datetime.now()}
501 505 try:
502 506 # allow for not overriding displayhook
503 507 if hasattr(sys.displayhook, 'set_parent'):
504 508 sys.displayhook.set_parent(parent)
505 509 sys.stdout.set_parent(parent)
506 510 sys.stderr.set_parent(parent)
507 511 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
508 512 working = self.user_ns
509 513 # suffix =
510 514 prefix = "_"+str(msg_id).replace("-","")+"_"
511 515
512 516 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
513 517 # if bound:
514 518 # bound_ns = Namespace(working)
515 519 # args = [bound_ns]+list(args)
516 520
517 521 fname = getattr(f, '__name__', 'f')
518 522
519 523 fname = prefix+"f"
520 524 argname = prefix+"args"
521 525 kwargname = prefix+"kwargs"
522 526 resultname = prefix+"result"
523 527
524 528 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
525 529 # print ns
526 530 working.update(ns)
527 531 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
528 532 try:
529 533 exec code in working,working
530 534 result = working.get(resultname)
531 535 finally:
532 536 for key in ns.iterkeys():
533 537 working.pop(key)
534 538 # if bound:
535 539 # working.update(bound_ns)
536 540
537 541 packed_result,buf = serialize_object(result)
538 542 result_buf = [packed_result]+buf
539 543 except:
540 544 exc_content = self._wrap_exception('apply')
541 545 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
542 546 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
543 ident=asbytes('%s.pyerr'%self.prefix))
547 ident=py3compat.str_to_bytes('%s.pyerr'%self.prefix))
544 548 reply_content = exc_content
545 549 result_buf = []
546 550
547 551 if exc_content['ename'] == 'UnmetDependency':
548 552 sub['dependencies_met'] = False
549 553 else:
550 554 reply_content = {'status' : 'ok'}
551 555
552 556 # put 'ok'/'error' status in header, for scheduler introspection:
553 557 sub['status'] = reply_content['status']
554 558
555 559 # flush i/o
556 560 sys.stdout.flush()
557 561 sys.stderr.flush()
558 562
559 563 reply_msg = self.session.send(socket, u'apply_reply', reply_content,
560 564 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
561 565
562 566 #---------------------------------------------------------------------------
563 567 # Control messages
564 568 #---------------------------------------------------------------------------
565 569
566 570 def abort_request(self, socket, ident, parent):
567 571 """abort a specifig msg by id"""
568 572 msg_ids = parent['content'].get('msg_ids', None)
569 573 if isinstance(msg_ids, basestring):
570 574 msg_ids = [msg_ids]
571 575 if not msg_ids:
572 576 self.abort_queues()
573 577 for mid in msg_ids:
574 578 self.aborted.add(str(mid))
575 579
576 580 content = dict(status='ok')
577 581 reply_msg = self.session.send(socket, 'abort_reply', content=content,
578 582 parent=parent, ident=ident)
579 583 self.log.debug("%s", reply_msg)
580 584
581 585 def clear_request(self, socket, idents, parent):
582 586 """Clear our namespace."""
583 587 self.user_ns = {}
584 588 msg = self.session.send(socket, 'clear_reply', ident=idents, parent=parent,
585 589 content = dict(status='ok'))
586 590 self._initial_exec_lines()
587 591
588 592
589 593 #---------------------------------------------------------------------------
590 594 # Protected interface
591 595 #---------------------------------------------------------------------------
592 596
593 597 def _abort_queues(self):
594 598 for socket in self.shell_sockets:
595 599 if socket:
596 600 self._abort_queue(socket)
597 601
598 602 def _abort_queue(self, socket):
599 603 while True:
600 604 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
601 605 if msg is None:
602 606 return
603 607
604 608 self.log.info("Aborting:")
605 609 self.log.info("%s", msg)
606 610 msg_type = msg['header']['msg_type']
607 611 reply_type = msg_type.split('_')[0] + '_reply'
608 612 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
609 613 # self.reply_socket.send(ident,zmq.SNDMORE)
610 614 # self.reply_socket.send_json(reply_msg)
611 615 reply_msg = self.session.send(socket, reply_type,
612 616 content={'status' : 'aborted'}, parent=msg, ident=idents)
613 617 self.log.debug("%s", reply_msg)
614 618 # We need to wait a bit for requests to come in. This can probably
615 619 # be set shorter for true asynchronous clients.
616 620 time.sleep(0.05)
617 621
618 622
619 623 def _no_raw_input(self):
620 624 """Raise StdinNotImplentedError if active frontend doesn't support
621 625 stdin."""
622 626 raise StdinNotImplementedError("raw_input was called, but this "
623 627 "frontend does not support stdin.")
624 628
625 629 def _raw_input(self, prompt, ident, parent):
626 630 # Flush output before making the request.
627 631 sys.stderr.flush()
628 632 sys.stdout.flush()
629 633
630 634 # Send the input request.
631 635 content = json_clean(dict(prompt=prompt))
632 636 self.session.send(self.stdin_socket, u'input_request', content, parent,
633 637 ident=ident)
634 638
635 639 # Await a response.
636 640 while True:
637 641 try:
638 642 ident, reply = self.session.recv(self.stdin_socket, 0)
639 643 except Exception:
640 644 self.log.warn("Invalid Message:", exc_info=True)
641 645 else:
642 646 break
643 647 try:
644 648 value = reply['content']['value']
645 649 except:
646 650 self.log.error("Got bad raw_input reply: ")
647 651 self.log.error("%s", parent)
648 652 value = ''
649 653 if value == '\x04':
650 654 # EOF
651 655 raise EOFError
652 656 return value
653 657
654 658 def _complete(self, msg):
655 659 c = msg['content']
656 660 try:
657 661 cpos = int(c['cursor_pos'])
658 662 except:
659 663 # If we don't get something that we can convert to an integer, at
660 664 # least attempt the completion guessing the cursor is at the end of
661 665 # the text, if there's any, and otherwise of the line
662 666 cpos = len(c['text'])
663 667 if cpos==0:
664 668 cpos = len(c['line'])
665 669 return self.shell.complete(c['text'], c['line'], cpos)
666 670
667 671 def _object_info(self, context):
668 672 symbol, leftover = self._symbol_from_context(context)
669 673 if symbol is not None and not leftover:
670 674 doc = getattr(symbol, '__doc__', '')
671 675 else:
672 676 doc = ''
673 677 object_info = dict(docstring = doc)
674 678 return object_info
675 679
676 680 def _symbol_from_context(self, context):
677 681 if not context:
678 682 return None, context
679 683
680 684 base_symbol_string = context[0]
681 685 symbol = self.shell.user_ns.get(base_symbol_string, None)
682 686 if symbol is None:
683 687 symbol = __builtin__.__dict__.get(base_symbol_string, None)
684 688 if symbol is None:
685 689 return None, context
686 690
687 691 context = context[1:]
688 692 for i, name in enumerate(context):
689 693 new_symbol = getattr(symbol, name, None)
690 694 if new_symbol is None:
691 695 return symbol, context[i:]
692 696 else:
693 697 symbol = new_symbol
694 698
695 699 return symbol, []
696 700
697 701 def _at_shutdown(self):
698 702 """Actions taken at shutdown by the kernel, called by python's atexit.
699 703 """
700 704 # io.rprint("Kernel at_shutdown") # dbg
701 705 if self._shutdown_message is not None:
702 706 self.session.send(self.shell_socket, self._shutdown_message)
703 707 self.session.send(self.iopub_socket, self._shutdown_message)
704 708 self.log.debug("%s", self._shutdown_message)
705 709 # A very short sleep to give zmq time to flush its message buffers
706 710 # before Python truly shuts down.
707 711 time.sleep(0.01)
708 712
709 713 #-----------------------------------------------------------------------------
710 714 # Aliases and Flags for the IPKernelApp
711 715 #-----------------------------------------------------------------------------
712 716
713 717 flags = dict(kernel_flags)
714 718 flags.update(shell_flags)
715 719
716 720 addflag = lambda *args: flags.update(boolean_flag(*args))
717 721
718 722 flags['pylab'] = (
719 723 {'IPKernelApp' : {'pylab' : 'auto'}},
720 724 """Pre-load matplotlib and numpy for interactive use with
721 725 the default matplotlib backend."""
722 726 )
723 727
724 728 aliases = dict(kernel_aliases)
725 729 aliases.update(shell_aliases)
726 730
727 731 # it's possible we don't want short aliases for *all* of these:
728 732 aliases.update(dict(
729 733 pylab='IPKernelApp.pylab',
730 734 ))
731 735
732 736 #-----------------------------------------------------------------------------
733 737 # The IPKernelApp class
734 738 #-----------------------------------------------------------------------------
735 739
736 740 class IPKernelApp(KernelApp, InteractiveShellApp):
737 741 name = 'ipkernel'
738 742
739 743 aliases = Dict(aliases)
740 744 flags = Dict(flags)
741 745 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
742 746
743 747 # configurables
744 748 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
745 749 config=True,
746 750 help="""Pre-load matplotlib and numpy for interactive use,
747 751 selecting a particular matplotlib backend and loop integration.
748 752 """
749 753 )
750 754
751 755 @catch_config_error
752 756 def initialize(self, argv=None):
753 757 super(IPKernelApp, self).initialize(argv)
754 758 self.init_path()
755 759 self.init_shell()
756 760 self.init_extensions()
757 761 self.init_code()
758 762
759 763 def init_kernel(self):
760 764
761 765 kernel = Kernel(config=self.config, session=self.session,
762 766 shell_sockets=[self.shell_socket],
763 767 iopub_socket=self.iopub_socket,
764 768 stdin_socket=self.stdin_socket,
765 769 log=self.log,
766 770 profile_dir=self.profile_dir,
767 771 )
768 772 self.kernel = kernel
769 773 kernel.record_ports(self.ports)
770 774 shell = kernel.shell
771 775 if self.pylab:
772 776 try:
773 777 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
774 778 shell.enable_pylab(gui, import_all=self.pylab_import_all)
775 779 except Exception:
776 780 self.log.error("Pylab initialization failed", exc_info=True)
777 781 # print exception straight to stdout, because normally
778 782 # _showtraceback associates the reply with an execution,
779 783 # which means frontends will never draw it, as this exception
780 784 # is not associated with any execute request.
781 785
782 786 # replace pyerr-sending traceback with stdout
783 787 _showtraceback = shell._showtraceback
784 788 def print_tb(etype, evalue, stb):
785 789 print ("Error initializing pylab, pylab mode will not "
786 790 "be active", file=io.stderr)
787 791 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
788 792 shell._showtraceback = print_tb
789 793
790 794 # send the traceback over stdout
791 795 shell.showtraceback(tb_offset=0)
792 796
793 797 # restore proper _showtraceback method
794 798 shell._showtraceback = _showtraceback
795 799
796 800
797 801 def init_shell(self):
798 802 self.shell = self.kernel.shell
799 803 self.shell.configurables.append(self)
800 804
801 805
802 806 #-----------------------------------------------------------------------------
803 807 # Kernel main and launch functions
804 808 #-----------------------------------------------------------------------------
805 809
806 810 def launch_kernel(*args, **kwargs):
807 811 """Launches a localhost IPython kernel, binding to the specified ports.
808 812
809 813 This function simply calls entry_point.base_launch_kernel with the right
810 814 first command to start an ipkernel. See base_launch_kernel for arguments.
811 815
812 816 Returns
813 817 -------
814 818 A tuple of form:
815 819 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
816 820 where kernel_process is a Popen object and the ports are integers.
817 821 """
818 822 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
819 823 *args, **kwargs)
820 824
821 825
822 826 def embed_kernel(module=None, local_ns=None, **kwargs):
823 827 """Embed and start an IPython kernel in a given scope.
824 828
825 829 Parameters
826 830 ----------
827 831 module : ModuleType, optional
828 832 The module to load into IPython globals (default: caller)
829 833 local_ns : dict, optional
830 834 The namespace to load into IPython user namespace (default: caller)
831 835
832 836 kwargs : various, optional
833 837 Further keyword args are relayed to the KernelApp constructor,
834 838 allowing configuration of the Kernel. Will only have an effect
835 839 on the first embed_kernel call for a given process.
836 840
837 841 """
838 842 # get the app if it exists, or set it up if it doesn't
839 843 if IPKernelApp.initialized():
840 844 app = IPKernelApp.instance()
841 845 else:
842 846 app = IPKernelApp.instance(**kwargs)
843 847 app.initialize([])
844 848
845 849 # load the calling scope if not given
846 850 (caller_module, caller_locals) = extract_module_locals(1)
847 851 if module is None:
848 852 module = caller_module
849 853 if local_ns is None:
850 854 local_ns = caller_locals
851 855
852 856 app.kernel.user_module = module
853 857 app.kernel.user_ns = local_ns
854 858 app.start()
855 859
856 860 def main():
857 861 """Run an IPKernel as an application"""
858 862 app = IPKernelApp.instance()
859 863 app.initialize()
860 864 app.start()
861 865
862 866
863 867 if __name__ == '__main__':
864 868 main()
General Comments 0
You need to be logged in to leave comments. Login now