##// END OF EJS Templates
move apply serialization into zmq.serialize
MinRK -
Show More
@@ -0,0 +1,179 b''
1 """serialization utilities for apply messages
2
3 Authors:
4
5 * Min RK
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 # Standard library imports
19 import logging
20 import os
21 import re
22 import socket
23 import sys
24
25 try:
26 import cPickle
27 pickle = cPickle
28 except:
29 cPickle = None
30 import pickle
31
32
33 # IPython imports
34 from IPython.utils import py3compat
35 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
36 from IPython.utils.newserialized import serialize, unserialize
37
38 if py3compat.PY3:
39 buffer = memoryview
40
41 #-----------------------------------------------------------------------------
42 # Serialization Functions
43 #-----------------------------------------------------------------------------
44
45 def serialize_object(obj, threshold=64e-6):
46 """Serialize an object into a list of sendable buffers.
47
48 Parameters
49 ----------
50
51 obj : object
52 The object to be serialized
53 threshold : float
54 The threshold for not double-pickling the content.
55
56
57 Returns
58 -------
59 ('pmd', [bufs]) :
60 where pmd is the pickled metadata wrapper,
61 bufs is a list of data buffers
62 """
63 databuffers = []
64 if isinstance(obj, (list, tuple)):
65 clist = canSequence(obj)
66 slist = map(serialize, clist)
67 for s in slist:
68 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
69 databuffers.append(s.getData())
70 s.data = None
71 return pickle.dumps(slist,-1), databuffers
72 elif isinstance(obj, dict):
73 sobj = {}
74 for k in sorted(obj.iterkeys()):
75 s = serialize(can(obj[k]))
76 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
77 databuffers.append(s.getData())
78 s.data = None
79 sobj[k] = s
80 return pickle.dumps(sobj,-1),databuffers
81 else:
82 s = serialize(can(obj))
83 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
84 databuffers.append(s.getData())
85 s.data = None
86 return pickle.dumps(s,-1),databuffers
87
88
89 def unserialize_object(bufs):
90 """reconstruct an object serialized by serialize_object from data buffers."""
91 bufs = list(bufs)
92 sobj = pickle.loads(bufs.pop(0))
93 if isinstance(sobj, (list, tuple)):
94 for s in sobj:
95 if s.data is None:
96 s.data = bufs.pop(0)
97 return uncanSequence(map(unserialize, sobj)), bufs
98 elif isinstance(sobj, dict):
99 newobj = {}
100 for k in sorted(sobj.iterkeys()):
101 s = sobj[k]
102 if s.data is None:
103 s.data = bufs.pop(0)
104 newobj[k] = uncan(unserialize(s))
105 return newobj, bufs
106 else:
107 if sobj.data is None:
108 sobj.data = bufs.pop(0)
109 return uncan(unserialize(sobj)), bufs
110
111 def pack_apply_message(f, args, kwargs, threshold=64e-6):
112 """pack up a function, args, and kwargs to be sent over the wire
113 as a series of buffers. Any object whose data is larger than `threshold`
114 will not have their data copied (currently only numpy arrays support zero-copy)"""
115 msg = [pickle.dumps(can(f),-1)]
116 databuffers = [] # for large objects
117 sargs, bufs = serialize_object(args,threshold)
118 msg.append(sargs)
119 databuffers.extend(bufs)
120 skwargs, bufs = serialize_object(kwargs,threshold)
121 msg.append(skwargs)
122 databuffers.extend(bufs)
123 msg.extend(databuffers)
124 return msg
125
126 def unpack_apply_message(bufs, g=None, copy=True):
127 """unpack f,args,kwargs from buffers packed by pack_apply_message()
128 Returns: original f,args,kwargs"""
129 bufs = list(bufs) # allow us to pop
130 assert len(bufs) >= 3, "not enough buffers!"
131 if not copy:
132 for i in range(3):
133 bufs[i] = bufs[i].bytes
134 cf = pickle.loads(bufs.pop(0))
135 sargs = list(pickle.loads(bufs.pop(0)))
136 skwargs = dict(pickle.loads(bufs.pop(0)))
137 # print sargs, skwargs
138 f = uncan(cf, g)
139 for sa in sargs:
140 if sa.data is None:
141 m = bufs.pop(0)
142 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
143 # always use a buffer, until memoryviews get sorted out
144 sa.data = buffer(m)
145 # disable memoryview support
146 # if copy:
147 # sa.data = buffer(m)
148 # else:
149 # sa.data = m.buffer
150 else:
151 if copy:
152 sa.data = m
153 else:
154 sa.data = m.bytes
155
156 args = uncanSequence(map(unserialize, sargs), g)
157 kwargs = {}
158 for k in sorted(skwargs.iterkeys()):
159 sa = skwargs[k]
160 if sa.data is None:
161 m = bufs.pop(0)
162 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
163 # always use a buffer, until memoryviews get sorted out
164 sa.data = buffer(m)
165 # disable memoryview support
166 # if copy:
167 # sa.data = buffer(m)
168 # else:
169 # sa.data = m.buffer
170 else:
171 if copy:
172 sa.data = m
173 else:
174 sa.data = m.bytes
175
176 kwargs[k] = uncan(unserialize(sa), g)
177
178 return f,args,kwargs
179
@@ -1,495 +1,364 b''
1 """some generic utilities for dealing with classes, urls, and serialization
1 """some generic utilities for dealing with classes, urls, and serialization
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23 import socket
23 import socket
24 import sys
24 import sys
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 try:
26 try:
27 from signal import SIGKILL
27 from signal import SIGKILL
28 except ImportError:
28 except ImportError:
29 SIGKILL=None
29 SIGKILL=None
30
30
31 try:
31 try:
32 import cPickle
32 import cPickle
33 pickle = cPickle
33 pickle = cPickle
34 except:
34 except:
35 cPickle = None
35 cPickle = None
36 import pickle
36 import pickle
37
37
38 # System library imports
38 # System library imports
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
43
43
44 # IPython imports
44 # IPython imports
45 from IPython.config.application import Application
45 from IPython.config.application import Application
46 from IPython.utils import py3compat
46 from IPython.utils import py3compat
47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
48 from IPython.utils.newserialized import serialize, unserialize
48 from IPython.utils.newserialized import serialize, unserialize
49 from IPython.zmq.log import EnginePUBHandler
49 from IPython.zmq.log import EnginePUBHandler
50 from IPython.zmq.serialize import (
51 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
52 )
50
53
51 if py3compat.PY3:
54 if py3compat.PY3:
52 buffer = memoryview
55 buffer = memoryview
53
56
54 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
55 # Classes
58 # Classes
56 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
57
60
58 class Namespace(dict):
61 class Namespace(dict):
59 """Subclass of dict for attribute access to keys."""
62 """Subclass of dict for attribute access to keys."""
60
63
61 def __getattr__(self, key):
64 def __getattr__(self, key):
62 """getattr aliased to getitem"""
65 """getattr aliased to getitem"""
63 if key in self.iterkeys():
66 if key in self.iterkeys():
64 return self[key]
67 return self[key]
65 else:
68 else:
66 raise NameError(key)
69 raise NameError(key)
67
70
68 def __setattr__(self, key, value):
71 def __setattr__(self, key, value):
69 """setattr aliased to setitem, with strict"""
72 """setattr aliased to setitem, with strict"""
70 if hasattr(dict, key):
73 if hasattr(dict, key):
71 raise KeyError("Cannot override dict keys %r"%key)
74 raise KeyError("Cannot override dict keys %r"%key)
72 self[key] = value
75 self[key] = value
73
76
74
77
75 class ReverseDict(dict):
78 class ReverseDict(dict):
76 """simple double-keyed subset of dict methods."""
79 """simple double-keyed subset of dict methods."""
77
80
78 def __init__(self, *args, **kwargs):
81 def __init__(self, *args, **kwargs):
79 dict.__init__(self, *args, **kwargs)
82 dict.__init__(self, *args, **kwargs)
80 self._reverse = dict()
83 self._reverse = dict()
81 for key, value in self.iteritems():
84 for key, value in self.iteritems():
82 self._reverse[value] = key
85 self._reverse[value] = key
83
86
84 def __getitem__(self, key):
87 def __getitem__(self, key):
85 try:
88 try:
86 return dict.__getitem__(self, key)
89 return dict.__getitem__(self, key)
87 except KeyError:
90 except KeyError:
88 return self._reverse[key]
91 return self._reverse[key]
89
92
90 def __setitem__(self, key, value):
93 def __setitem__(self, key, value):
91 if key in self._reverse:
94 if key in self._reverse:
92 raise KeyError("Can't have key %r on both sides!"%key)
95 raise KeyError("Can't have key %r on both sides!"%key)
93 dict.__setitem__(self, key, value)
96 dict.__setitem__(self, key, value)
94 self._reverse[value] = key
97 self._reverse[value] = key
95
98
96 def pop(self, key):
99 def pop(self, key):
97 value = dict.pop(self, key)
100 value = dict.pop(self, key)
98 self._reverse.pop(value)
101 self._reverse.pop(value)
99 return value
102 return value
100
103
101 def get(self, key, default=None):
104 def get(self, key, default=None):
102 try:
105 try:
103 return self[key]
106 return self[key]
104 except KeyError:
107 except KeyError:
105 return default
108 return default
106
109
107 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
108 # Functions
111 # Functions
109 #-----------------------------------------------------------------------------
112 #-----------------------------------------------------------------------------
110
113
111 @decorator
114 @decorator
112 def log_errors(f, self, *args, **kwargs):
115 def log_errors(f, self, *args, **kwargs):
113 """decorator to log unhandled exceptions raised in a method.
116 """decorator to log unhandled exceptions raised in a method.
114
117
115 For use wrapping on_recv callbacks, so that exceptions
118 For use wrapping on_recv callbacks, so that exceptions
116 do not cause the stream to be closed.
119 do not cause the stream to be closed.
117 """
120 """
118 try:
121 try:
119 return f(self, *args, **kwargs)
122 return f(self, *args, **kwargs)
120 except Exception:
123 except Exception:
121 self.log.error("Uncaught exception in %r" % f, exc_info=True)
124 self.log.error("Uncaught exception in %r" % f, exc_info=True)
122
125
123
126
124 def asbytes(s):
127 def asbytes(s):
125 """ensure that an object is ascii bytes"""
128 """ensure that an object is ascii bytes"""
126 if isinstance(s, unicode):
129 if isinstance(s, unicode):
127 s = s.encode('ascii')
130 s = s.encode('ascii')
128 return s
131 return s
129
132
130 def is_url(url):
133 def is_url(url):
131 """boolean check for whether a string is a zmq url"""
134 """boolean check for whether a string is a zmq url"""
132 if '://' not in url:
135 if '://' not in url:
133 return False
136 return False
134 proto, addr = url.split('://', 1)
137 proto, addr = url.split('://', 1)
135 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
138 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
136 return False
139 return False
137 return True
140 return True
138
141
139 def validate_url(url):
142 def validate_url(url):
140 """validate a url for zeromq"""
143 """validate a url for zeromq"""
141 if not isinstance(url, basestring):
144 if not isinstance(url, basestring):
142 raise TypeError("url must be a string, not %r"%type(url))
145 raise TypeError("url must be a string, not %r"%type(url))
143 url = url.lower()
146 url = url.lower()
144
147
145 proto_addr = url.split('://')
148 proto_addr = url.split('://')
146 assert len(proto_addr) == 2, 'Invalid url: %r'%url
149 assert len(proto_addr) == 2, 'Invalid url: %r'%url
147 proto, addr = proto_addr
150 proto, addr = proto_addr
148 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
151 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
149
152
150 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
153 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
151 # author: Remi Sabourin
154 # author: Remi Sabourin
152 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
155 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
153
156
154 if proto == 'tcp':
157 if proto == 'tcp':
155 lis = addr.split(':')
158 lis = addr.split(':')
156 assert len(lis) == 2, 'Invalid url: %r'%url
159 assert len(lis) == 2, 'Invalid url: %r'%url
157 addr,s_port = lis
160 addr,s_port = lis
158 try:
161 try:
159 port = int(s_port)
162 port = int(s_port)
160 except ValueError:
163 except ValueError:
161 raise AssertionError("Invalid port %r in url: %r"%(port, url))
164 raise AssertionError("Invalid port %r in url: %r"%(port, url))
162
165
163 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
166 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
164
167
165 else:
168 else:
166 # only validate tcp urls currently
169 # only validate tcp urls currently
167 pass
170 pass
168
171
169 return True
172 return True
170
173
171
174
172 def validate_url_container(container):
175 def validate_url_container(container):
173 """validate a potentially nested collection of urls."""
176 """validate a potentially nested collection of urls."""
174 if isinstance(container, basestring):
177 if isinstance(container, basestring):
175 url = container
178 url = container
176 return validate_url(url)
179 return validate_url(url)
177 elif isinstance(container, dict):
180 elif isinstance(container, dict):
178 container = container.itervalues()
181 container = container.itervalues()
179
182
180 for element in container:
183 for element in container:
181 validate_url_container(element)
184 validate_url_container(element)
182
185
183
186
184 def split_url(url):
187 def split_url(url):
185 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
188 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
186 proto_addr = url.split('://')
189 proto_addr = url.split('://')
187 assert len(proto_addr) == 2, 'Invalid url: %r'%url
190 assert len(proto_addr) == 2, 'Invalid url: %r'%url
188 proto, addr = proto_addr
191 proto, addr = proto_addr
189 lis = addr.split(':')
192 lis = addr.split(':')
190 assert len(lis) == 2, 'Invalid url: %r'%url
193 assert len(lis) == 2, 'Invalid url: %r'%url
191 addr,s_port = lis
194 addr,s_port = lis
192 return proto,addr,s_port
195 return proto,addr,s_port
193
196
194 def disambiguate_ip_address(ip, location=None):
197 def disambiguate_ip_address(ip, location=None):
195 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
198 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
196 ones, based on the location (default interpretation of location is localhost)."""
199 ones, based on the location (default interpretation of location is localhost)."""
197 if ip in ('0.0.0.0', '*'):
200 if ip in ('0.0.0.0', '*'):
198 try:
201 try:
199 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
202 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
200 except (socket.gaierror, IndexError):
203 except (socket.gaierror, IndexError):
201 # couldn't identify this machine, assume localhost
204 # couldn't identify this machine, assume localhost
202 external_ips = []
205 external_ips = []
203 if location is None or location in external_ips or not external_ips:
206 if location is None or location in external_ips or not external_ips:
204 # If location is unspecified or cannot be determined, assume local
207 # If location is unspecified or cannot be determined, assume local
205 ip='127.0.0.1'
208 ip='127.0.0.1'
206 elif location:
209 elif location:
207 return location
210 return location
208 return ip
211 return ip
209
212
210 def disambiguate_url(url, location=None):
213 def disambiguate_url(url, location=None):
211 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
214 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
212 ones, based on the location (default interpretation is localhost).
215 ones, based on the location (default interpretation is localhost).
213
216
214 This is for zeromq urls, such as tcp://*:10101."""
217 This is for zeromq urls, such as tcp://*:10101."""
215 try:
218 try:
216 proto,ip,port = split_url(url)
219 proto,ip,port = split_url(url)
217 except AssertionError:
220 except AssertionError:
218 # probably not tcp url; could be ipc, etc.
221 # probably not tcp url; could be ipc, etc.
219 return url
222 return url
220
223
221 ip = disambiguate_ip_address(ip,location)
224 ip = disambiguate_ip_address(ip,location)
222
225
223 return "%s://%s:%s"%(proto,ip,port)
226 return "%s://%s:%s"%(proto,ip,port)
224
227
225 def serialize_object(obj, threshold=64e-6):
226 """Serialize an object into a list of sendable buffers.
227
228 Parameters
229 ----------
230
231 obj : object
232 The object to be serialized
233 threshold : float
234 The threshold for not double-pickling the content.
235
236
237 Returns
238 -------
239 ('pmd', [bufs]) :
240 where pmd is the pickled metadata wrapper,
241 bufs is a list of data buffers
242 """
243 databuffers = []
244 if isinstance(obj, (list, tuple)):
245 clist = canSequence(obj)
246 slist = map(serialize, clist)
247 for s in slist:
248 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
249 databuffers.append(s.getData())
250 s.data = None
251 return pickle.dumps(slist,-1), databuffers
252 elif isinstance(obj, dict):
253 sobj = {}
254 for k in sorted(obj.iterkeys()):
255 s = serialize(can(obj[k]))
256 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
257 databuffers.append(s.getData())
258 s.data = None
259 sobj[k] = s
260 return pickle.dumps(sobj,-1),databuffers
261 else:
262 s = serialize(can(obj))
263 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
264 databuffers.append(s.getData())
265 s.data = None
266 return pickle.dumps(s,-1),databuffers
267
268
269 def unserialize_object(bufs):
270 """reconstruct an object serialized by serialize_object from data buffers."""
271 bufs = list(bufs)
272 sobj = pickle.loads(bufs.pop(0))
273 if isinstance(sobj, (list, tuple)):
274 for s in sobj:
275 if s.data is None:
276 s.data = bufs.pop(0)
277 return uncanSequence(map(unserialize, sobj)), bufs
278 elif isinstance(sobj, dict):
279 newobj = {}
280 for k in sorted(sobj.iterkeys()):
281 s = sobj[k]
282 if s.data is None:
283 s.data = bufs.pop(0)
284 newobj[k] = uncan(unserialize(s))
285 return newobj, bufs
286 else:
287 if sobj.data is None:
288 sobj.data = bufs.pop(0)
289 return uncan(unserialize(sobj)), bufs
290
291 def pack_apply_message(f, args, kwargs, threshold=64e-6):
292 """pack up a function, args, and kwargs to be sent over the wire
293 as a series of buffers. Any object whose data is larger than `threshold`
294 will not have their data copied (currently only numpy arrays support zero-copy)"""
295 msg = [pickle.dumps(can(f),-1)]
296 databuffers = [] # for large objects
297 sargs, bufs = serialize_object(args,threshold)
298 msg.append(sargs)
299 databuffers.extend(bufs)
300 skwargs, bufs = serialize_object(kwargs,threshold)
301 msg.append(skwargs)
302 databuffers.extend(bufs)
303 msg.extend(databuffers)
304 return msg
305
306 def unpack_apply_message(bufs, g=None, copy=True):
307 """unpack f,args,kwargs from buffers packed by pack_apply_message()
308 Returns: original f,args,kwargs"""
309 bufs = list(bufs) # allow us to pop
310 assert len(bufs) >= 3, "not enough buffers!"
311 if not copy:
312 for i in range(3):
313 bufs[i] = bufs[i].bytes
314 cf = pickle.loads(bufs.pop(0))
315 sargs = list(pickle.loads(bufs.pop(0)))
316 skwargs = dict(pickle.loads(bufs.pop(0)))
317 # print sargs, skwargs
318 f = uncan(cf, g)
319 for sa in sargs:
320 if sa.data is None:
321 m = bufs.pop(0)
322 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
323 # always use a buffer, until memoryviews get sorted out
324 sa.data = buffer(m)
325 # disable memoryview support
326 # if copy:
327 # sa.data = buffer(m)
328 # else:
329 # sa.data = m.buffer
330 else:
331 if copy:
332 sa.data = m
333 else:
334 sa.data = m.bytes
335
336 args = uncanSequence(map(unserialize, sargs), g)
337 kwargs = {}
338 for k in sorted(skwargs.iterkeys()):
339 sa = skwargs[k]
340 if sa.data is None:
341 m = bufs.pop(0)
342 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
343 # always use a buffer, until memoryviews get sorted out
344 sa.data = buffer(m)
345 # disable memoryview support
346 # if copy:
347 # sa.data = buffer(m)
348 # else:
349 # sa.data = m.buffer
350 else:
351 if copy:
352 sa.data = m
353 else:
354 sa.data = m.bytes
355
356 kwargs[k] = uncan(unserialize(sa), g)
357
358 return f,args,kwargs
359
228
360 #--------------------------------------------------------------------------
229 #--------------------------------------------------------------------------
361 # helpers for implementing old MEC API via view.apply
230 # helpers for implementing old MEC API via view.apply
362 #--------------------------------------------------------------------------
231 #--------------------------------------------------------------------------
363
232
364 def interactive(f):
233 def interactive(f):
365 """decorator for making functions appear as interactively defined.
234 """decorator for making functions appear as interactively defined.
366 This results in the function being linked to the user_ns as globals()
235 This results in the function being linked to the user_ns as globals()
367 instead of the module globals().
236 instead of the module globals().
368 """
237 """
369 f.__module__ = '__main__'
238 f.__module__ = '__main__'
370 return f
239 return f
371
240
372 @interactive
241 @interactive
373 def _push(**ns):
242 def _push(**ns):
374 """helper method for implementing `client.push` via `client.apply`"""
243 """helper method for implementing `client.push` via `client.apply`"""
375 globals().update(ns)
244 globals().update(ns)
376
245
377 @interactive
246 @interactive
378 def _pull(keys):
247 def _pull(keys):
379 """helper method for implementing `client.pull` via `client.apply`"""
248 """helper method for implementing `client.pull` via `client.apply`"""
380 user_ns = globals()
249 user_ns = globals()
381 if isinstance(keys, (list,tuple, set)):
250 if isinstance(keys, (list,tuple, set)):
382 for key in keys:
251 for key in keys:
383 if not user_ns.has_key(key):
252 if not user_ns.has_key(key):
384 raise NameError("name '%s' is not defined"%key)
253 raise NameError("name '%s' is not defined"%key)
385 return map(user_ns.get, keys)
254 return map(user_ns.get, keys)
386 else:
255 else:
387 if not user_ns.has_key(keys):
256 if not user_ns.has_key(keys):
388 raise NameError("name '%s' is not defined"%keys)
257 raise NameError("name '%s' is not defined"%keys)
389 return user_ns.get(keys)
258 return user_ns.get(keys)
390
259
391 @interactive
260 @interactive
392 def _execute(code):
261 def _execute(code):
393 """helper method for implementing `client.execute` via `client.apply`"""
262 """helper method for implementing `client.execute` via `client.apply`"""
394 exec code in globals()
263 exec code in globals()
395
264
396 #--------------------------------------------------------------------------
265 #--------------------------------------------------------------------------
397 # extra process management utilities
266 # extra process management utilities
398 #--------------------------------------------------------------------------
267 #--------------------------------------------------------------------------
399
268
400 _random_ports = set()
269 _random_ports = set()
401
270
402 def select_random_ports(n):
271 def select_random_ports(n):
403 """Selects and return n random ports that are available."""
272 """Selects and return n random ports that are available."""
404 ports = []
273 ports = []
405 for i in xrange(n):
274 for i in xrange(n):
406 sock = socket.socket()
275 sock = socket.socket()
407 sock.bind(('', 0))
276 sock.bind(('', 0))
408 while sock.getsockname()[1] in _random_ports:
277 while sock.getsockname()[1] in _random_ports:
409 sock.close()
278 sock.close()
410 sock = socket.socket()
279 sock = socket.socket()
411 sock.bind(('', 0))
280 sock.bind(('', 0))
412 ports.append(sock)
281 ports.append(sock)
413 for i, sock in enumerate(ports):
282 for i, sock in enumerate(ports):
414 port = sock.getsockname()[1]
283 port = sock.getsockname()[1]
415 sock.close()
284 sock.close()
416 ports[i] = port
285 ports[i] = port
417 _random_ports.add(port)
286 _random_ports.add(port)
418 return ports
287 return ports
419
288
420 def signal_children(children):
289 def signal_children(children):
421 """Relay interupt/term signals to children, for more solid process cleanup."""
290 """Relay interupt/term signals to children, for more solid process cleanup."""
422 def terminate_children(sig, frame):
291 def terminate_children(sig, frame):
423 log = Application.instance().log
292 log = Application.instance().log
424 log.critical("Got signal %i, terminating children..."%sig)
293 log.critical("Got signal %i, terminating children..."%sig)
425 for child in children:
294 for child in children:
426 child.terminate()
295 child.terminate()
427
296
428 sys.exit(sig != SIGINT)
297 sys.exit(sig != SIGINT)
429 # sys.exit(sig)
298 # sys.exit(sig)
430 for sig in (SIGINT, SIGABRT, SIGTERM):
299 for sig in (SIGINT, SIGABRT, SIGTERM):
431 signal(sig, terminate_children)
300 signal(sig, terminate_children)
432
301
433 def generate_exec_key(keyfile):
302 def generate_exec_key(keyfile):
434 import uuid
303 import uuid
435 newkey = str(uuid.uuid4())
304 newkey = str(uuid.uuid4())
436 with open(keyfile, 'w') as f:
305 with open(keyfile, 'w') as f:
437 # f.write('ipython-key ')
306 # f.write('ipython-key ')
438 f.write(newkey+'\n')
307 f.write(newkey+'\n')
439 # set user-only RW permissions (0600)
308 # set user-only RW permissions (0600)
440 # this will have no effect on Windows
309 # this will have no effect on Windows
441 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
310 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
442
311
443
312
444 def integer_loglevel(loglevel):
313 def integer_loglevel(loglevel):
445 try:
314 try:
446 loglevel = int(loglevel)
315 loglevel = int(loglevel)
447 except ValueError:
316 except ValueError:
448 if isinstance(loglevel, str):
317 if isinstance(loglevel, str):
449 loglevel = getattr(logging, loglevel)
318 loglevel = getattr(logging, loglevel)
450 return loglevel
319 return loglevel
451
320
452 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
321 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
453 logger = logging.getLogger(logname)
322 logger = logging.getLogger(logname)
454 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
323 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
455 # don't add a second PUBHandler
324 # don't add a second PUBHandler
456 return
325 return
457 loglevel = integer_loglevel(loglevel)
326 loglevel = integer_loglevel(loglevel)
458 lsock = context.socket(zmq.PUB)
327 lsock = context.socket(zmq.PUB)
459 lsock.connect(iface)
328 lsock.connect(iface)
460 handler = handlers.PUBHandler(lsock)
329 handler = handlers.PUBHandler(lsock)
461 handler.setLevel(loglevel)
330 handler.setLevel(loglevel)
462 handler.root_topic = root
331 handler.root_topic = root
463 logger.addHandler(handler)
332 logger.addHandler(handler)
464 logger.setLevel(loglevel)
333 logger.setLevel(loglevel)
465
334
466 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
335 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
467 logger = logging.getLogger()
336 logger = logging.getLogger()
468 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
337 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
469 # don't add a second PUBHandler
338 # don't add a second PUBHandler
470 return
339 return
471 loglevel = integer_loglevel(loglevel)
340 loglevel = integer_loglevel(loglevel)
472 lsock = context.socket(zmq.PUB)
341 lsock = context.socket(zmq.PUB)
473 lsock.connect(iface)
342 lsock.connect(iface)
474 handler = EnginePUBHandler(engine, lsock)
343 handler = EnginePUBHandler(engine, lsock)
475 handler.setLevel(loglevel)
344 handler.setLevel(loglevel)
476 logger.addHandler(handler)
345 logger.addHandler(handler)
477 logger.setLevel(loglevel)
346 logger.setLevel(loglevel)
478 return logger
347 return logger
479
348
480 def local_logger(logname, loglevel=logging.DEBUG):
349 def local_logger(logname, loglevel=logging.DEBUG):
481 loglevel = integer_loglevel(loglevel)
350 loglevel = integer_loglevel(loglevel)
482 logger = logging.getLogger(logname)
351 logger = logging.getLogger(logname)
483 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
352 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
484 # don't add a second StreamHandler
353 # don't add a second StreamHandler
485 return
354 return
486 handler = logging.StreamHandler()
355 handler = logging.StreamHandler()
487 handler.setLevel(loglevel)
356 handler.setLevel(loglevel)
488 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
357 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
489 datefmt="%Y-%m-%d %H:%M:%S")
358 datefmt="%Y-%m-%d %H:%M:%S")
490 handler.setFormatter(formatter)
359 handler.setFormatter(formatter)
491
360
492 logger.addHandler(handler)
361 logger.addHandler(handler)
493 logger.setLevel(loglevel)
362 logger.setLevel(loglevel)
494 return logger
363 return logger
495
364
@@ -1,864 +1,868 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import __builtin__
19 import __builtin__
20 import atexit
20 import atexit
21 import sys
21 import sys
22 import time
22 import time
23 import traceback
23 import traceback
24 import logging
24 import logging
25 import uuid
26
27 from datetime import datetime
25 from signal import (
28 from signal import (
26 signal, default_int_handler, SIGINT, SIG_IGN
29 signal, default_int_handler, SIGINT, SIG_IGN
27 )
30 )
28 # System library imports.
31 # System library imports.
29 import zmq
32 import zmq
30
33
31 # Local imports.
34 # Local imports.
32 from IPython.core import pylabtools
35 from IPython.core import pylabtools
33 from IPython.config.configurable import Configurable
36 from IPython.config.configurable import Configurable
34 from IPython.config.application import boolean_flag, catch_config_error
37 from IPython.config.application import boolean_flag, catch_config_error
35 from IPython.core.application import ProfileDir
38 from IPython.core.application import ProfileDir
36 from IPython.core.error import StdinNotImplementedError
39 from IPython.core.error import StdinNotImplementedError
37 from IPython.core.shellapp import (
40 from IPython.core.shellapp import (
38 InteractiveShellApp, shell_flags, shell_aliases
41 InteractiveShellApp, shell_flags, shell_aliases
39 )
42 )
40 from IPython.utils import io
43 from IPython.utils import io
41 from IPython.utils import py3compat
44 from IPython.utils import py3compat
42 from IPython.utils.frame import extract_module_locals
45 from IPython.utils.frame import extract_module_locals
43 from IPython.utils.jsonutil import json_clean
46 from IPython.utils.jsonutil import json_clean
44 from IPython.utils.traitlets import (
47 from IPython.utils.traitlets import (
45 Any, Instance, Float, Dict, CaselessStrEnum, List, Set
48 Any, Instance, Float, Dict, CaselessStrEnum, List, Set
46 )
49 )
47
50
48 from entry_point import base_launch_kernel
51 from entry_point import base_launch_kernel
49 from kernelapp import KernelApp, kernel_flags, kernel_aliases
52 from kernelapp import KernelApp, kernel_flags, kernel_aliases
53 from serialize import serialize_object, unpack_apply_message
50 from session import Session, Message
54 from session import Session, Message
51 from zmqshell import ZMQInteractiveShell
55 from zmqshell import ZMQInteractiveShell
52
56
53
57
54 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
55 # Main kernel class
59 # Main kernel class
56 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
57
61
58 class Kernel(Configurable):
62 class Kernel(Configurable):
59
63
60 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
61 # Kernel interface
65 # Kernel interface
62 #---------------------------------------------------------------------------
66 #---------------------------------------------------------------------------
63
67
64 # attribute to override with a GUI
68 # attribute to override with a GUI
65 eventloop = Any(None)
69 eventloop = Any(None)
66
70
67 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
71 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
68 session = Instance(Session)
72 session = Instance(Session)
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
73 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 shell_sockets = List()
74 shell_sockets = List()
71 control_socket = Instance('zmq.Socket')
75 control_socket = Instance('zmq.Socket')
72 iopub_socket = Instance('zmq.Socket')
76 iopub_socket = Instance('zmq.Socket')
73 log = Instance(logging.Logger)
77 log = Instance(logging.Logger)
74
78
75 user_module = Instance('types.ModuleType')
79 user_module = Instance('types.ModuleType')
76 def _user_module_changed(self, name, old, new):
80 def _user_module_changed(self, name, old, new):
77 if self.shell is not None:
81 if self.shell is not None:
78 self.shell.user_module = new
82 self.shell.user_module = new
79
83
80 user_ns = Dict(default_value=None)
84 user_ns = Dict(default_value=None)
81 def _user_ns_changed(self, name, old, new):
85 def _user_ns_changed(self, name, old, new):
82 if self.shell is not None:
86 if self.shell is not None:
83 self.shell.user_ns = new
87 self.shell.user_ns = new
84 self.shell.init_user_ns()
88 self.shell.init_user_ns()
85
89
86 # Private interface
90 # Private interface
87
91
88 # Time to sleep after flushing the stdout/err buffers in each execute
92 # Time to sleep after flushing the stdout/err buffers in each execute
89 # cycle. While this introduces a hard limit on the minimal latency of the
93 # cycle. While this introduces a hard limit on the minimal latency of the
90 # execute cycle, it helps prevent output synchronization problems for
94 # execute cycle, it helps prevent output synchronization problems for
91 # clients.
95 # clients.
92 # Units are in seconds. The minimum zmq latency on local host is probably
96 # Units are in seconds. The minimum zmq latency on local host is probably
93 # ~150 microseconds, set this to 500us for now. We may need to increase it
97 # ~150 microseconds, set this to 500us for now. We may need to increase it
94 # a little if it's not enough after more interactive testing.
98 # a little if it's not enough after more interactive testing.
95 _execute_sleep = Float(0.0005, config=True)
99 _execute_sleep = Float(0.0005, config=True)
96
100
97 # Frequency of the kernel's event loop.
101 # Frequency of the kernel's event loop.
98 # Units are in seconds, kernel subclasses for GUI toolkits may need to
102 # Units are in seconds, kernel subclasses for GUI toolkits may need to
99 # adapt to milliseconds.
103 # adapt to milliseconds.
100 _poll_interval = Float(0.05, config=True)
104 _poll_interval = Float(0.05, config=True)
101
105
102 # If the shutdown was requested over the network, we leave here the
106 # If the shutdown was requested over the network, we leave here the
103 # necessary reply message so it can be sent by our registered atexit
107 # necessary reply message so it can be sent by our registered atexit
104 # handler. This ensures that the reply is only sent to clients truly at
108 # handler. This ensures that the reply is only sent to clients truly at
105 # the end of our shutdown process (which happens after the underlying
109 # the end of our shutdown process (which happens after the underlying
106 # IPython shell's own shutdown).
110 # IPython shell's own shutdown).
107 _shutdown_message = None
111 _shutdown_message = None
108
112
109 # This is a dict of port number that the kernel is listening on. It is set
113 # This is a dict of port number that the kernel is listening on. It is set
110 # by record_ports and used by connect_request.
114 # by record_ports and used by connect_request.
111 _recorded_ports = Dict()
115 _recorded_ports = Dict()
112
116
113 # set of aborted msg_ids
117 # set of aborted msg_ids
114 aborted = Set()
118 aborted = Set()
115
119
116
120
117
121
118 def __init__(self, **kwargs):
122 def __init__(self, **kwargs):
119 super(Kernel, self).__init__(**kwargs)
123 super(Kernel, self).__init__(**kwargs)
120
124
121 # Before we even start up the shell, register *first* our exit handlers
125 # Before we even start up the shell, register *first* our exit handlers
122 # so they come before the shell's
126 # so they come before the shell's
123 atexit.register(self._at_shutdown)
127 atexit.register(self._at_shutdown)
124
128
125 # Initialize the InteractiveShell subclass
129 # Initialize the InteractiveShell subclass
126 self.shell = ZMQInteractiveShell.instance(config=self.config,
130 self.shell = ZMQInteractiveShell.instance(config=self.config,
127 profile_dir = self.profile_dir,
131 profile_dir = self.profile_dir,
128 user_module = self.user_module,
132 user_module = self.user_module,
129 user_ns = self.user_ns,
133 user_ns = self.user_ns,
130 )
134 )
131 self.shell.displayhook.session = self.session
135 self.shell.displayhook.session = self.session
132 self.shell.displayhook.pub_socket = self.iopub_socket
136 self.shell.displayhook.pub_socket = self.iopub_socket
133 self.shell.display_pub.session = self.session
137 self.shell.display_pub.session = self.session
134 self.shell.display_pub.pub_socket = self.iopub_socket
138 self.shell.display_pub.pub_socket = self.iopub_socket
135
139
136 # TMP - hack while developing
140 # TMP - hack while developing
137 self.shell._reply_content = None
141 self.shell._reply_content = None
138
142
139 # Build dict of handlers for message types
143 # Build dict of handlers for message types
140 msg_types = [ 'execute_request', 'complete_request',
144 msg_types = [ 'execute_request', 'complete_request',
141 'object_info_request', 'history_request',
145 'object_info_request', 'history_request',
142 'connect_request', 'shutdown_request',
146 'connect_request', 'shutdown_request',
143 'apply_request',
147 'apply_request',
144 ]
148 ]
145 self.handlers = {}
149 self.handlers = {}
146 for msg_type in msg_types:
150 for msg_type in msg_types:
147 self.handlers[msg_type] = getattr(self, msg_type)
151 self.handlers[msg_type] = getattr(self, msg_type)
148
152
149 control_msg_types = [ 'clear_request', 'abort_request' ]
153 control_msg_types = [ 'clear_request', 'abort_request' ]
150 self.control_handlers = {}
154 self.control_handlers = {}
151 for msg_type in control_msg_types:
155 for msg_type in control_msg_types:
152 self.control_handlers[msg_type] = getattr(self, msg_type)
156 self.control_handlers[msg_type] = getattr(self, msg_type)
153
157
154 def do_one_iteration(self):
158 def do_one_iteration(self):
155 """Do one iteration of the kernel's evaluation loop.
159 """Do one iteration of the kernel's evaluation loop.
156 """
160 """
157
161
158 # always flush control socket first
162 # always flush control socket first
159 while True:
163 while True:
160 if self.control_socket is None:
164 if self.control_socket is None:
161 break
165 break
162 try:
166 try:
163 idents,msg = self.session.recv(self.control_socket, zmq.NOBLOCK)
167 idents,msg = self.session.recv(self.control_socket, zmq.NOBLOCK)
164 except Exception:
168 except Exception:
165 self.log.warn("Invalid Control Message:", exc_info=True)
169 self.log.warn("Invalid Control Message:", exc_info=True)
166 continue
170 continue
167 if msg is None:
171 if msg is None:
168 break
172 break
169 self.dispatch_message(self.control_socket, idents, msg, self.control_handlers)
173 self.dispatch_message(self.control_socket, idents, msg, self.control_handlers)
170
174
171 for socket in self.shell_sockets:
175 for socket in self.shell_sockets:
172 try:
176 try:
173 idents,msg = self.session.recv(socket, zmq.NOBLOCK, copy=False)
177 idents,msg = self.session.recv(socket, zmq.NOBLOCK, copy=False)
174 except Exception:
178 except Exception:
175 self.log.warn("Invalid Message:", exc_info=True)
179 self.log.warn("Invalid Message:", exc_info=True)
176 continue
180 continue
177
181
178 if msg is None:
182 if msg is None:
179 continue
183 continue
180
184
181 self.dispatch_message(socket, idents, msg, self.handlers)
185 self.dispatch_message(socket, idents, msg, self.handlers)
182
186
183 def dispatch_message(self, socket, idents, msg, handlers):
187 def dispatch_message(self, socket, idents, msg, handlers):
184 msg_type = msg['header']['msg_type']
188 msg_type = msg['header']['msg_type']
185 msg_id = msg['header']['msg_id']
189 msg_id = msg['header']['msg_id']
186
190
187 # This assert will raise in versions of zeromq 2.0.7 and lesser.
191 # This assert will raise in versions of zeromq 2.0.7 and lesser.
188 # We now require 2.0.8 or above, so we can uncomment for safety.
192 # We now require 2.0.8 or above, so we can uncomment for safety.
189 # print(ident,msg, file=sys.__stdout__)
193 # print(ident,msg, file=sys.__stdout__)
190 assert idents is not None, "Missing message part."
194 assert idents is not None, "Missing message part."
191
195
192 # Print some info about this message and leave a '--->' marker, so it's
196 # Print some info about this message and leave a '--->' marker, so it's
193 # easier to trace visually the message chain when debugging. Each
197 # easier to trace visually the message chain when debugging. Each
194 # handler prints its message at the end.
198 # handler prints its message at the end.
195 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
199 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
196 self.log.debug(' Content: %s\n --->\n ', msg['content'])
200 self.log.debug(' Content: %s\n --->\n ', msg['content'])
197
201
198 # check if request has been aborted
202 # check if request has been aborted
199 if msg_id in self.aborted:
203 if msg_id in self.aborted:
200 self.aborted.remove(msg_id)
204 self.aborted.remove(msg_id)
201 # is it safe to assume a msg_id will not be resubmitted?
205 # is it safe to assume a msg_id will not be resubmitted?
202 reply_type = msg_type.split('_')[0] + '_reply'
206 reply_type = msg_type.split('_')[0] + '_reply'
203 status = {'status' : 'aborted'}
207 status = {'status' : 'aborted'}
204 reply_msg = self.session.send(socket, reply_type, subheader=status,
208 reply_msg = self.session.send(socket, reply_type, subheader=status,
205 content=status, parent=msg, ident=idents)
209 content=status, parent=msg, ident=idents)
206 return
210 return
207
211
208
212
209 # Find and call actual handler for message
213 # Find and call actual handler for message
210 handler = handlers.get(msg_type, None)
214 handler = handlers.get(msg_type, None)
211 if handler is None:
215 if handler is None:
212 self.log.error("UNKNOWN MESSAGE TYPE: %s", msg)
216 self.log.error("UNKNOWN MESSAGE TYPE: %s", msg)
213 else:
217 else:
214 handler(socket, idents, msg)
218 handler(socket, idents, msg)
215
219
216 # Check whether we should exit, in case the incoming message set the
220 # Check whether we should exit, in case the incoming message set the
217 # exit flag on
221 # exit flag on
218 if self.shell.exit_now:
222 if self.shell.exit_now:
219 self.log.debug('\nExiting IPython kernel...')
223 self.log.debug('\nExiting IPython kernel...')
220 # We do a normal, clean exit, which allows any actions registered
224 # We do a normal, clean exit, which allows any actions registered
221 # via atexit (such as history saving) to take place.
225 # via atexit (such as history saving) to take place.
222 sys.exit(0)
226 sys.exit(0)
223
227
224
228
225 def start(self):
229 def start(self):
226 """ Start the kernel main loop.
230 """ Start the kernel main loop.
227 """
231 """
228 # a KeyboardInterrupt (SIGINT) can occur on any python statement, so
232 # a KeyboardInterrupt (SIGINT) can occur on any python statement, so
229 # let's ignore (SIG_IGN) them until we're in a place to handle them properly
233 # let's ignore (SIG_IGN) them until we're in a place to handle them properly
230 signal(SIGINT,SIG_IGN)
234 signal(SIGINT,SIG_IGN)
231 poller = zmq.Poller()
235 poller = zmq.Poller()
232 for socket in self.shell_sockets:
236 for socket in self.shell_sockets:
233 poller.register(socket, zmq.POLLIN)
237 poller.register(socket, zmq.POLLIN)
234 if self.control_socket:
238 if self.control_socket:
235 poller.register(self.control_socket, zmq.POLLIN)
239 poller.register(self.control_socket, zmq.POLLIN)
236
240
237 # loop while self.eventloop has not been overridden
241 # loop while self.eventloop has not been overridden
238 while self.eventloop is None:
242 while self.eventloop is None:
239 try:
243 try:
240 # scale by extra factor of 10, because there is no
244 # scale by extra factor of 10, because there is no
241 # reason for this to be anything less than ~ 0.1s
245 # reason for this to be anything less than ~ 0.1s
242 # since it is a real poller and will respond
246 # since it is a real poller and will respond
243 # to events immediately
247 # to events immediately
244
248
245 # double nested try/except, to properly catch KeyboardInterrupt
249 # double nested try/except, to properly catch KeyboardInterrupt
246 # due to pyzmq Issue #130
250 # due to pyzmq Issue #130
247 try:
251 try:
248 poller.poll(10*1000*self._poll_interval)
252 poller.poll(10*1000*self._poll_interval)
249 # restore raising of KeyboardInterrupt
253 # restore raising of KeyboardInterrupt
250 signal(SIGINT, default_int_handler)
254 signal(SIGINT, default_int_handler)
251 self.do_one_iteration()
255 self.do_one_iteration()
252 except:
256 except:
253 raise
257 raise
254 finally:
258 finally:
255 # prevent raising of KeyboardInterrupt
259 # prevent raising of KeyboardInterrupt
256 signal(SIGINT,SIG_IGN)
260 signal(SIGINT,SIG_IGN)
257 except KeyboardInterrupt:
261 except KeyboardInterrupt:
258 # Ctrl-C shouldn't crash the kernel
262 # Ctrl-C shouldn't crash the kernel
259 io.raw_print("KeyboardInterrupt caught in kernel")
263 io.raw_print("KeyboardInterrupt caught in kernel")
260 # stop ignoring sigint, now that we are out of our own loop,
264 # stop ignoring sigint, now that we are out of our own loop,
261 # we don't want to prevent future code from handling it
265 # we don't want to prevent future code from handling it
262 signal(SIGINT, default_int_handler)
266 signal(SIGINT, default_int_handler)
263 while self.eventloop is not None:
267 while self.eventloop is not None:
264 try:
268 try:
265 self.eventloop(self)
269 self.eventloop(self)
266 except KeyboardInterrupt:
270 except KeyboardInterrupt:
267 # Ctrl-C shouldn't crash the kernel
271 # Ctrl-C shouldn't crash the kernel
268 io.raw_print("KeyboardInterrupt caught in kernel")
272 io.raw_print("KeyboardInterrupt caught in kernel")
269 continue
273 continue
270 else:
274 else:
271 # eventloop exited cleanly, this means we should stop (right?)
275 # eventloop exited cleanly, this means we should stop (right?)
272 self.eventloop = None
276 self.eventloop = None
273 break
277 break
274
278
275
279
276 def record_ports(self, ports):
280 def record_ports(self, ports):
277 """Record the ports that this kernel is using.
281 """Record the ports that this kernel is using.
278
282
279 The creator of the Kernel instance must call this methods if they
283 The creator of the Kernel instance must call this methods if they
280 want the :meth:`connect_request` method to return the port numbers.
284 want the :meth:`connect_request` method to return the port numbers.
281 """
285 """
282 self._recorded_ports = ports
286 self._recorded_ports = ports
283
287
284 #---------------------------------------------------------------------------
288 #---------------------------------------------------------------------------
285 # Kernel request handlers
289 # Kernel request handlers
286 #---------------------------------------------------------------------------
290 #---------------------------------------------------------------------------
287
291
288 def _publish_pyin(self, code, parent, execution_count):
292 def _publish_pyin(self, code, parent, execution_count):
289 """Publish the code request on the pyin stream."""
293 """Publish the code request on the pyin stream."""
290
294
291 self.session.send(self.iopub_socket, u'pyin', {u'code':code,
295 self.session.send(self.iopub_socket, u'pyin', {u'code':code,
292 u'execution_count': execution_count}, parent=parent)
296 u'execution_count': execution_count}, parent=parent)
293
297
294 def execute_request(self, socket, ident, parent):
298 def execute_request(self, socket, ident, parent):
295
299
296 self.session.send(self.iopub_socket,
300 self.session.send(self.iopub_socket,
297 u'status',
301 u'status',
298 {u'execution_state':u'busy'},
302 {u'execution_state':u'busy'},
299 parent=parent )
303 parent=parent )
300
304
301 try:
305 try:
302 content = parent[u'content']
306 content = parent[u'content']
303 code = content[u'code']
307 code = content[u'code']
304 silent = content[u'silent']
308 silent = content[u'silent']
305 except:
309 except:
306 self.log.error("Got bad msg: ")
310 self.log.error("Got bad msg: ")
307 self.log.error("%s", parent)
311 self.log.error("%s", parent)
308 return
312 return
309
313
310 shell = self.shell # we'll need this a lot here
314 shell = self.shell # we'll need this a lot here
311
315
312 # Replace raw_input. Note that is not sufficient to replace
316 # Replace raw_input. Note that is not sufficient to replace
313 # raw_input in the user namespace.
317 # raw_input in the user namespace.
314 if content.get('allow_stdin', False):
318 if content.get('allow_stdin', False):
315 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
319 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
316 else:
320 else:
317 raw_input = lambda prompt='' : self._no_raw_input()
321 raw_input = lambda prompt='' : self._no_raw_input()
318
322
319 if py3compat.PY3:
323 if py3compat.PY3:
320 __builtin__.input = raw_input
324 __builtin__.input = raw_input
321 else:
325 else:
322 __builtin__.raw_input = raw_input
326 __builtin__.raw_input = raw_input
323
327
324 # Set the parent message of the display hook and out streams.
328 # Set the parent message of the display hook and out streams.
325 shell.displayhook.set_parent(parent)
329 shell.displayhook.set_parent(parent)
326 shell.display_pub.set_parent(parent)
330 shell.display_pub.set_parent(parent)
327 sys.stdout.set_parent(parent)
331 sys.stdout.set_parent(parent)
328 sys.stderr.set_parent(parent)
332 sys.stderr.set_parent(parent)
329
333
330 # Re-broadcast our input for the benefit of listening clients, and
334 # Re-broadcast our input for the benefit of listening clients, and
331 # start computing output
335 # start computing output
332 if not silent:
336 if not silent:
333 self._publish_pyin(code, parent, shell.execution_count)
337 self._publish_pyin(code, parent, shell.execution_count)
334
338
335 reply_content = {}
339 reply_content = {}
336 try:
340 try:
337 if silent:
341 if silent:
338 # run_code uses 'exec' mode, so no displayhook will fire, and it
342 # run_code uses 'exec' mode, so no displayhook will fire, and it
339 # doesn't call logging or history manipulations. Print
343 # doesn't call logging or history manipulations. Print
340 # statements in that code will obviously still execute.
344 # statements in that code will obviously still execute.
341 shell.run_code(code)
345 shell.run_code(code)
342 else:
346 else:
343 # FIXME: the shell calls the exception handler itself.
347 # FIXME: the shell calls the exception handler itself.
344 shell.run_cell(code, store_history=True)
348 shell.run_cell(code, store_history=True)
345 except:
349 except:
346 status = u'error'
350 status = u'error'
347 # FIXME: this code right now isn't being used yet by default,
351 # FIXME: this code right now isn't being used yet by default,
348 # because the run_cell() call above directly fires off exception
352 # because the run_cell() call above directly fires off exception
349 # reporting. This code, therefore, is only active in the scenario
353 # reporting. This code, therefore, is only active in the scenario
350 # where runlines itself has an unhandled exception. We need to
354 # where runlines itself has an unhandled exception. We need to
351 # uniformize this, for all exception construction to come from a
355 # uniformize this, for all exception construction to come from a
352 # single location in the codbase.
356 # single location in the codbase.
353 etype, evalue, tb = sys.exc_info()
357 etype, evalue, tb = sys.exc_info()
354 tb_list = traceback.format_exception(etype, evalue, tb)
358 tb_list = traceback.format_exception(etype, evalue, tb)
355 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
359 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
356 else:
360 else:
357 status = u'ok'
361 status = u'ok'
358
362
359 reply_content[u'status'] = status
363 reply_content[u'status'] = status
360
364
361 # Return the execution counter so clients can display prompts
365 # Return the execution counter so clients can display prompts
362 reply_content['execution_count'] = shell.execution_count -1
366 reply_content['execution_count'] = shell.execution_count -1
363
367
364 # FIXME - fish exception info out of shell, possibly left there by
368 # FIXME - fish exception info out of shell, possibly left there by
365 # runlines. We'll need to clean up this logic later.
369 # runlines. We'll need to clean up this logic later.
366 if shell._reply_content is not None:
370 if shell._reply_content is not None:
367 reply_content.update(shell._reply_content)
371 reply_content.update(shell._reply_content)
368 # reset after use
372 # reset after use
369 shell._reply_content = None
373 shell._reply_content = None
370
374
371 # At this point, we can tell whether the main code execution succeeded
375 # At this point, we can tell whether the main code execution succeeded
372 # or not. If it did, we proceed to evaluate user_variables/expressions
376 # or not. If it did, we proceed to evaluate user_variables/expressions
373 if reply_content['status'] == 'ok':
377 if reply_content['status'] == 'ok':
374 reply_content[u'user_variables'] = \
378 reply_content[u'user_variables'] = \
375 shell.user_variables(content[u'user_variables'])
379 shell.user_variables(content[u'user_variables'])
376 reply_content[u'user_expressions'] = \
380 reply_content[u'user_expressions'] = \
377 shell.user_expressions(content[u'user_expressions'])
381 shell.user_expressions(content[u'user_expressions'])
378 else:
382 else:
379 # If there was an error, don't even try to compute variables or
383 # If there was an error, don't even try to compute variables or
380 # expressions
384 # expressions
381 reply_content[u'user_variables'] = {}
385 reply_content[u'user_variables'] = {}
382 reply_content[u'user_expressions'] = {}
386 reply_content[u'user_expressions'] = {}
383
387
384 # Payloads should be retrieved regardless of outcome, so we can both
388 # Payloads should be retrieved regardless of outcome, so we can both
385 # recover partial output (that could have been generated early in a
389 # recover partial output (that could have been generated early in a
386 # block, before an error) and clear the payload system always.
390 # block, before an error) and clear the payload system always.
387 reply_content[u'payload'] = shell.payload_manager.read_payload()
391 reply_content[u'payload'] = shell.payload_manager.read_payload()
388 # Be agressive about clearing the payload because we don't want
392 # Be agressive about clearing the payload because we don't want
389 # it to sit in memory until the next execute_request comes in.
393 # it to sit in memory until the next execute_request comes in.
390 shell.payload_manager.clear_payload()
394 shell.payload_manager.clear_payload()
391
395
392 # Flush output before sending the reply.
396 # Flush output before sending the reply.
393 sys.stdout.flush()
397 sys.stdout.flush()
394 sys.stderr.flush()
398 sys.stderr.flush()
395 # FIXME: on rare occasions, the flush doesn't seem to make it to the
399 # FIXME: on rare occasions, the flush doesn't seem to make it to the
396 # clients... This seems to mitigate the problem, but we definitely need
400 # clients... This seems to mitigate the problem, but we definitely need
397 # to better understand what's going on.
401 # to better understand what's going on.
398 if self._execute_sleep:
402 if self._execute_sleep:
399 time.sleep(self._execute_sleep)
403 time.sleep(self._execute_sleep)
400
404
401 # Send the reply.
405 # Send the reply.
402 reply_content = json_clean(reply_content)
406 reply_content = json_clean(reply_content)
403 reply_msg = self.session.send(socket, u'execute_reply',
407 reply_msg = self.session.send(socket, u'execute_reply',
404 reply_content, parent, ident=ident)
408 reply_content, parent, ident=ident)
405 self.log.debug("%s", reply_msg)
409 self.log.debug("%s", reply_msg)
406
410
407 if reply_msg['content']['status'] == u'error':
411 if reply_msg['content']['status'] == u'error':
408 self._abort_queues()
412 self._abort_queues()
409
413
410 self.session.send(self.iopub_socket,
414 self.session.send(self.iopub_socket,
411 u'status',
415 u'status',
412 {u'execution_state':u'idle'},
416 {u'execution_state':u'idle'},
413 parent=parent )
417 parent=parent )
414
418
415 def complete_request(self, socket, ident, parent):
419 def complete_request(self, socket, ident, parent):
416 txt, matches = self._complete(parent)
420 txt, matches = self._complete(parent)
417 matches = {'matches' : matches,
421 matches = {'matches' : matches,
418 'matched_text' : txt,
422 'matched_text' : txt,
419 'status' : 'ok'}
423 'status' : 'ok'}
420 matches = json_clean(matches)
424 matches = json_clean(matches)
421 completion_msg = self.session.send(socket, 'complete_reply',
425 completion_msg = self.session.send(socket, 'complete_reply',
422 matches, parent, ident)
426 matches, parent, ident)
423 self.log.debug("%s", completion_msg)
427 self.log.debug("%s", completion_msg)
424
428
425 def object_info_request(self, socket, ident, parent):
429 def object_info_request(self, socket, ident, parent):
426 content = parent['content']
430 content = parent['content']
427 object_info = self.shell.object_inspect(content['oname'],
431 object_info = self.shell.object_inspect(content['oname'],
428 detail_level = content.get('detail_level', 0)
432 detail_level = content.get('detail_level', 0)
429 )
433 )
430 # Before we send this object over, we scrub it for JSON usage
434 # Before we send this object over, we scrub it for JSON usage
431 oinfo = json_clean(object_info)
435 oinfo = json_clean(object_info)
432 msg = self.session.send(socket, 'object_info_reply',
436 msg = self.session.send(socket, 'object_info_reply',
433 oinfo, parent, ident)
437 oinfo, parent, ident)
434 self.log.debug("%s", msg)
438 self.log.debug("%s", msg)
435
439
436 def history_request(self, socket, ident, parent):
440 def history_request(self, socket, ident, parent):
437 # We need to pull these out, as passing **kwargs doesn't work with
441 # We need to pull these out, as passing **kwargs doesn't work with
438 # unicode keys before Python 2.6.5.
442 # unicode keys before Python 2.6.5.
439 hist_access_type = parent['content']['hist_access_type']
443 hist_access_type = parent['content']['hist_access_type']
440 raw = parent['content']['raw']
444 raw = parent['content']['raw']
441 output = parent['content']['output']
445 output = parent['content']['output']
442 if hist_access_type == 'tail':
446 if hist_access_type == 'tail':
443 n = parent['content']['n']
447 n = parent['content']['n']
444 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
448 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
445 include_latest=True)
449 include_latest=True)
446
450
447 elif hist_access_type == 'range':
451 elif hist_access_type == 'range':
448 session = parent['content']['session']
452 session = parent['content']['session']
449 start = parent['content']['start']
453 start = parent['content']['start']
450 stop = parent['content']['stop']
454 stop = parent['content']['stop']
451 hist = self.shell.history_manager.get_range(session, start, stop,
455 hist = self.shell.history_manager.get_range(session, start, stop,
452 raw=raw, output=output)
456 raw=raw, output=output)
453
457
454 elif hist_access_type == 'search':
458 elif hist_access_type == 'search':
455 pattern = parent['content']['pattern']
459 pattern = parent['content']['pattern']
456 hist = self.shell.history_manager.search(pattern, raw=raw,
460 hist = self.shell.history_manager.search(pattern, raw=raw,
457 output=output)
461 output=output)
458
462
459 else:
463 else:
460 hist = []
464 hist = []
461 hist = list(hist)
465 hist = list(hist)
462 content = {'history' : hist}
466 content = {'history' : hist}
463 content = json_clean(content)
467 content = json_clean(content)
464 msg = self.session.send(socket, 'history_reply',
468 msg = self.session.send(socket, 'history_reply',
465 content, parent, ident)
469 content, parent, ident)
466 self.log.debug("Sending history reply with %i entries", len(hist))
470 self.log.debug("Sending history reply with %i entries", len(hist))
467
471
468 def connect_request(self, socket, ident, parent):
472 def connect_request(self, socket, ident, parent):
469 if self._recorded_ports is not None:
473 if self._recorded_ports is not None:
470 content = self._recorded_ports.copy()
474 content = self._recorded_ports.copy()
471 else:
475 else:
472 content = {}
476 content = {}
473 msg = self.session.send(socket, 'connect_reply',
477 msg = self.session.send(socket, 'connect_reply',
474 content, parent, ident)
478 content, parent, ident)
475 self.log.debug("%s", msg)
479 self.log.debug("%s", msg)
476
480
477 def shutdown_request(self, ident, parent):
481 def shutdown_request(self, ident, parent):
478 self.shell.exit_now = True
482 self.shell.exit_now = True
479 self._shutdown_message = self.session.msg(u'shutdown_reply',
483 self._shutdown_message = self.session.msg(u'shutdown_reply',
480 parent['content'], parent)
484 parent['content'], parent)
481 sys.exit(0)
485 sys.exit(0)
482
486
483 #---------------------------------------------------------------------------
487 #---------------------------------------------------------------------------
484 # Engine methods
488 # Engine methods
485 #---------------------------------------------------------------------------
489 #---------------------------------------------------------------------------
486
490
487 def apply_request(self, socket, ident, parent):
491 def apply_request(self, socket, ident, parent):
488 try:
492 try:
489 content = parent[u'content']
493 content = parent[u'content']
490 bufs = parent[u'buffers']
494 bufs = parent[u'buffers']
491 msg_id = parent['header']['msg_id']
495 msg_id = parent['header']['msg_id']
492 # bound = parent['header'].get('bound', False)
496 # bound = parent['header'].get('bound', False)
493 except:
497 except:
494 self.log.error("Got bad msg: %s", parent, exc_info=True)
498 self.log.error("Got bad msg: %s", parent, exc_info=True)
495 return
499 return
496 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
500 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
497 # self.iopub_stream.send(pyin_msg)
501 # self.iopub_stream.send(pyin_msg)
498 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
502 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
499 sub = {'dependencies_met' : True, 'engine' : self.ident,
503 sub = {'dependencies_met' : True, 'engine' : self.ident,
500 'started': datetime.now()}
504 'started': datetime.now()}
501 try:
505 try:
502 # allow for not overriding displayhook
506 # allow for not overriding displayhook
503 if hasattr(sys.displayhook, 'set_parent'):
507 if hasattr(sys.displayhook, 'set_parent'):
504 sys.displayhook.set_parent(parent)
508 sys.displayhook.set_parent(parent)
505 sys.stdout.set_parent(parent)
509 sys.stdout.set_parent(parent)
506 sys.stderr.set_parent(parent)
510 sys.stderr.set_parent(parent)
507 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
511 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
508 working = self.user_ns
512 working = self.user_ns
509 # suffix =
513 # suffix =
510 prefix = "_"+str(msg_id).replace("-","")+"_"
514 prefix = "_"+str(msg_id).replace("-","")+"_"
511
515
512 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
516 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
513 # if bound:
517 # if bound:
514 # bound_ns = Namespace(working)
518 # bound_ns = Namespace(working)
515 # args = [bound_ns]+list(args)
519 # args = [bound_ns]+list(args)
516
520
517 fname = getattr(f, '__name__', 'f')
521 fname = getattr(f, '__name__', 'f')
518
522
519 fname = prefix+"f"
523 fname = prefix+"f"
520 argname = prefix+"args"
524 argname = prefix+"args"
521 kwargname = prefix+"kwargs"
525 kwargname = prefix+"kwargs"
522 resultname = prefix+"result"
526 resultname = prefix+"result"
523
527
524 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
528 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
525 # print ns
529 # print ns
526 working.update(ns)
530 working.update(ns)
527 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
531 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
528 try:
532 try:
529 exec code in working,working
533 exec code in working,working
530 result = working.get(resultname)
534 result = working.get(resultname)
531 finally:
535 finally:
532 for key in ns.iterkeys():
536 for key in ns.iterkeys():
533 working.pop(key)
537 working.pop(key)
534 # if bound:
538 # if bound:
535 # working.update(bound_ns)
539 # working.update(bound_ns)
536
540
537 packed_result,buf = serialize_object(result)
541 packed_result,buf = serialize_object(result)
538 result_buf = [packed_result]+buf
542 result_buf = [packed_result]+buf
539 except:
543 except:
540 exc_content = self._wrap_exception('apply')
544 exc_content = self._wrap_exception('apply')
541 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
545 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
542 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
546 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
543 ident=asbytes('%s.pyerr'%self.prefix))
547 ident=py3compat.str_to_bytes('%s.pyerr'%self.prefix))
544 reply_content = exc_content
548 reply_content = exc_content
545 result_buf = []
549 result_buf = []
546
550
547 if exc_content['ename'] == 'UnmetDependency':
551 if exc_content['ename'] == 'UnmetDependency':
548 sub['dependencies_met'] = False
552 sub['dependencies_met'] = False
549 else:
553 else:
550 reply_content = {'status' : 'ok'}
554 reply_content = {'status' : 'ok'}
551
555
552 # put 'ok'/'error' status in header, for scheduler introspection:
556 # put 'ok'/'error' status in header, for scheduler introspection:
553 sub['status'] = reply_content['status']
557 sub['status'] = reply_content['status']
554
558
555 # flush i/o
559 # flush i/o
556 sys.stdout.flush()
560 sys.stdout.flush()
557 sys.stderr.flush()
561 sys.stderr.flush()
558
562
559 reply_msg = self.session.send(socket, u'apply_reply', reply_content,
563 reply_msg = self.session.send(socket, u'apply_reply', reply_content,
560 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
564 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
561
565
562 #---------------------------------------------------------------------------
566 #---------------------------------------------------------------------------
563 # Control messages
567 # Control messages
564 #---------------------------------------------------------------------------
568 #---------------------------------------------------------------------------
565
569
566 def abort_request(self, socket, ident, parent):
570 def abort_request(self, socket, ident, parent):
567 """abort a specifig msg by id"""
571 """abort a specifig msg by id"""
568 msg_ids = parent['content'].get('msg_ids', None)
572 msg_ids = parent['content'].get('msg_ids', None)
569 if isinstance(msg_ids, basestring):
573 if isinstance(msg_ids, basestring):
570 msg_ids = [msg_ids]
574 msg_ids = [msg_ids]
571 if not msg_ids:
575 if not msg_ids:
572 self.abort_queues()
576 self.abort_queues()
573 for mid in msg_ids:
577 for mid in msg_ids:
574 self.aborted.add(str(mid))
578 self.aborted.add(str(mid))
575
579
576 content = dict(status='ok')
580 content = dict(status='ok')
577 reply_msg = self.session.send(socket, 'abort_reply', content=content,
581 reply_msg = self.session.send(socket, 'abort_reply', content=content,
578 parent=parent, ident=ident)
582 parent=parent, ident=ident)
579 self.log.debug("%s", reply_msg)
583 self.log.debug("%s", reply_msg)
580
584
581 def clear_request(self, socket, idents, parent):
585 def clear_request(self, socket, idents, parent):
582 """Clear our namespace."""
586 """Clear our namespace."""
583 self.user_ns = {}
587 self.user_ns = {}
584 msg = self.session.send(socket, 'clear_reply', ident=idents, parent=parent,
588 msg = self.session.send(socket, 'clear_reply', ident=idents, parent=parent,
585 content = dict(status='ok'))
589 content = dict(status='ok'))
586 self._initial_exec_lines()
590 self._initial_exec_lines()
587
591
588
592
589 #---------------------------------------------------------------------------
593 #---------------------------------------------------------------------------
590 # Protected interface
594 # Protected interface
591 #---------------------------------------------------------------------------
595 #---------------------------------------------------------------------------
592
596
593 def _abort_queues(self):
597 def _abort_queues(self):
594 for socket in self.shell_sockets:
598 for socket in self.shell_sockets:
595 if socket:
599 if socket:
596 self._abort_queue(socket)
600 self._abort_queue(socket)
597
601
598 def _abort_queue(self, socket):
602 def _abort_queue(self, socket):
599 while True:
603 while True:
600 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
604 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
601 if msg is None:
605 if msg is None:
602 return
606 return
603
607
604 self.log.info("Aborting:")
608 self.log.info("Aborting:")
605 self.log.info("%s", msg)
609 self.log.info("%s", msg)
606 msg_type = msg['header']['msg_type']
610 msg_type = msg['header']['msg_type']
607 reply_type = msg_type.split('_')[0] + '_reply'
611 reply_type = msg_type.split('_')[0] + '_reply'
608 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
612 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
609 # self.reply_socket.send(ident,zmq.SNDMORE)
613 # self.reply_socket.send(ident,zmq.SNDMORE)
610 # self.reply_socket.send_json(reply_msg)
614 # self.reply_socket.send_json(reply_msg)
611 reply_msg = self.session.send(socket, reply_type,
615 reply_msg = self.session.send(socket, reply_type,
612 content={'status' : 'aborted'}, parent=msg, ident=idents)
616 content={'status' : 'aborted'}, parent=msg, ident=idents)
613 self.log.debug("%s", reply_msg)
617 self.log.debug("%s", reply_msg)
614 # We need to wait a bit for requests to come in. This can probably
618 # We need to wait a bit for requests to come in. This can probably
615 # be set shorter for true asynchronous clients.
619 # be set shorter for true asynchronous clients.
616 time.sleep(0.05)
620 time.sleep(0.05)
617
621
618
622
619 def _no_raw_input(self):
623 def _no_raw_input(self):
620 """Raise StdinNotImplentedError if active frontend doesn't support
624 """Raise StdinNotImplentedError if active frontend doesn't support
621 stdin."""
625 stdin."""
622 raise StdinNotImplementedError("raw_input was called, but this "
626 raise StdinNotImplementedError("raw_input was called, but this "
623 "frontend does not support stdin.")
627 "frontend does not support stdin.")
624
628
625 def _raw_input(self, prompt, ident, parent):
629 def _raw_input(self, prompt, ident, parent):
626 # Flush output before making the request.
630 # Flush output before making the request.
627 sys.stderr.flush()
631 sys.stderr.flush()
628 sys.stdout.flush()
632 sys.stdout.flush()
629
633
630 # Send the input request.
634 # Send the input request.
631 content = json_clean(dict(prompt=prompt))
635 content = json_clean(dict(prompt=prompt))
632 self.session.send(self.stdin_socket, u'input_request', content, parent,
636 self.session.send(self.stdin_socket, u'input_request', content, parent,
633 ident=ident)
637 ident=ident)
634
638
635 # Await a response.
639 # Await a response.
636 while True:
640 while True:
637 try:
641 try:
638 ident, reply = self.session.recv(self.stdin_socket, 0)
642 ident, reply = self.session.recv(self.stdin_socket, 0)
639 except Exception:
643 except Exception:
640 self.log.warn("Invalid Message:", exc_info=True)
644 self.log.warn("Invalid Message:", exc_info=True)
641 else:
645 else:
642 break
646 break
643 try:
647 try:
644 value = reply['content']['value']
648 value = reply['content']['value']
645 except:
649 except:
646 self.log.error("Got bad raw_input reply: ")
650 self.log.error("Got bad raw_input reply: ")
647 self.log.error("%s", parent)
651 self.log.error("%s", parent)
648 value = ''
652 value = ''
649 if value == '\x04':
653 if value == '\x04':
650 # EOF
654 # EOF
651 raise EOFError
655 raise EOFError
652 return value
656 return value
653
657
654 def _complete(self, msg):
658 def _complete(self, msg):
655 c = msg['content']
659 c = msg['content']
656 try:
660 try:
657 cpos = int(c['cursor_pos'])
661 cpos = int(c['cursor_pos'])
658 except:
662 except:
659 # If we don't get something that we can convert to an integer, at
663 # If we don't get something that we can convert to an integer, at
660 # least attempt the completion guessing the cursor is at the end of
664 # least attempt the completion guessing the cursor is at the end of
661 # the text, if there's any, and otherwise of the line
665 # the text, if there's any, and otherwise of the line
662 cpos = len(c['text'])
666 cpos = len(c['text'])
663 if cpos==0:
667 if cpos==0:
664 cpos = len(c['line'])
668 cpos = len(c['line'])
665 return self.shell.complete(c['text'], c['line'], cpos)
669 return self.shell.complete(c['text'], c['line'], cpos)
666
670
667 def _object_info(self, context):
671 def _object_info(self, context):
668 symbol, leftover = self._symbol_from_context(context)
672 symbol, leftover = self._symbol_from_context(context)
669 if symbol is not None and not leftover:
673 if symbol is not None and not leftover:
670 doc = getattr(symbol, '__doc__', '')
674 doc = getattr(symbol, '__doc__', '')
671 else:
675 else:
672 doc = ''
676 doc = ''
673 object_info = dict(docstring = doc)
677 object_info = dict(docstring = doc)
674 return object_info
678 return object_info
675
679
676 def _symbol_from_context(self, context):
680 def _symbol_from_context(self, context):
677 if not context:
681 if not context:
678 return None, context
682 return None, context
679
683
680 base_symbol_string = context[0]
684 base_symbol_string = context[0]
681 symbol = self.shell.user_ns.get(base_symbol_string, None)
685 symbol = self.shell.user_ns.get(base_symbol_string, None)
682 if symbol is None:
686 if symbol is None:
683 symbol = __builtin__.__dict__.get(base_symbol_string, None)
687 symbol = __builtin__.__dict__.get(base_symbol_string, None)
684 if symbol is None:
688 if symbol is None:
685 return None, context
689 return None, context
686
690
687 context = context[1:]
691 context = context[1:]
688 for i, name in enumerate(context):
692 for i, name in enumerate(context):
689 new_symbol = getattr(symbol, name, None)
693 new_symbol = getattr(symbol, name, None)
690 if new_symbol is None:
694 if new_symbol is None:
691 return symbol, context[i:]
695 return symbol, context[i:]
692 else:
696 else:
693 symbol = new_symbol
697 symbol = new_symbol
694
698
695 return symbol, []
699 return symbol, []
696
700
697 def _at_shutdown(self):
701 def _at_shutdown(self):
698 """Actions taken at shutdown by the kernel, called by python's atexit.
702 """Actions taken at shutdown by the kernel, called by python's atexit.
699 """
703 """
700 # io.rprint("Kernel at_shutdown") # dbg
704 # io.rprint("Kernel at_shutdown") # dbg
701 if self._shutdown_message is not None:
705 if self._shutdown_message is not None:
702 self.session.send(self.shell_socket, self._shutdown_message)
706 self.session.send(self.shell_socket, self._shutdown_message)
703 self.session.send(self.iopub_socket, self._shutdown_message)
707 self.session.send(self.iopub_socket, self._shutdown_message)
704 self.log.debug("%s", self._shutdown_message)
708 self.log.debug("%s", self._shutdown_message)
705 # A very short sleep to give zmq time to flush its message buffers
709 # A very short sleep to give zmq time to flush its message buffers
706 # before Python truly shuts down.
710 # before Python truly shuts down.
707 time.sleep(0.01)
711 time.sleep(0.01)
708
712
709 #-----------------------------------------------------------------------------
713 #-----------------------------------------------------------------------------
710 # Aliases and Flags for the IPKernelApp
714 # Aliases and Flags for the IPKernelApp
711 #-----------------------------------------------------------------------------
715 #-----------------------------------------------------------------------------
712
716
713 flags = dict(kernel_flags)
717 flags = dict(kernel_flags)
714 flags.update(shell_flags)
718 flags.update(shell_flags)
715
719
716 addflag = lambda *args: flags.update(boolean_flag(*args))
720 addflag = lambda *args: flags.update(boolean_flag(*args))
717
721
718 flags['pylab'] = (
722 flags['pylab'] = (
719 {'IPKernelApp' : {'pylab' : 'auto'}},
723 {'IPKernelApp' : {'pylab' : 'auto'}},
720 """Pre-load matplotlib and numpy for interactive use with
724 """Pre-load matplotlib and numpy for interactive use with
721 the default matplotlib backend."""
725 the default matplotlib backend."""
722 )
726 )
723
727
724 aliases = dict(kernel_aliases)
728 aliases = dict(kernel_aliases)
725 aliases.update(shell_aliases)
729 aliases.update(shell_aliases)
726
730
727 # it's possible we don't want short aliases for *all* of these:
731 # it's possible we don't want short aliases for *all* of these:
728 aliases.update(dict(
732 aliases.update(dict(
729 pylab='IPKernelApp.pylab',
733 pylab='IPKernelApp.pylab',
730 ))
734 ))
731
735
732 #-----------------------------------------------------------------------------
736 #-----------------------------------------------------------------------------
733 # The IPKernelApp class
737 # The IPKernelApp class
734 #-----------------------------------------------------------------------------
738 #-----------------------------------------------------------------------------
735
739
736 class IPKernelApp(KernelApp, InteractiveShellApp):
740 class IPKernelApp(KernelApp, InteractiveShellApp):
737 name = 'ipkernel'
741 name = 'ipkernel'
738
742
739 aliases = Dict(aliases)
743 aliases = Dict(aliases)
740 flags = Dict(flags)
744 flags = Dict(flags)
741 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
745 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
742
746
743 # configurables
747 # configurables
744 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
748 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
745 config=True,
749 config=True,
746 help="""Pre-load matplotlib and numpy for interactive use,
750 help="""Pre-load matplotlib and numpy for interactive use,
747 selecting a particular matplotlib backend and loop integration.
751 selecting a particular matplotlib backend and loop integration.
748 """
752 """
749 )
753 )
750
754
751 @catch_config_error
755 @catch_config_error
752 def initialize(self, argv=None):
756 def initialize(self, argv=None):
753 super(IPKernelApp, self).initialize(argv)
757 super(IPKernelApp, self).initialize(argv)
754 self.init_path()
758 self.init_path()
755 self.init_shell()
759 self.init_shell()
756 self.init_extensions()
760 self.init_extensions()
757 self.init_code()
761 self.init_code()
758
762
759 def init_kernel(self):
763 def init_kernel(self):
760
764
761 kernel = Kernel(config=self.config, session=self.session,
765 kernel = Kernel(config=self.config, session=self.session,
762 shell_sockets=[self.shell_socket],
766 shell_sockets=[self.shell_socket],
763 iopub_socket=self.iopub_socket,
767 iopub_socket=self.iopub_socket,
764 stdin_socket=self.stdin_socket,
768 stdin_socket=self.stdin_socket,
765 log=self.log,
769 log=self.log,
766 profile_dir=self.profile_dir,
770 profile_dir=self.profile_dir,
767 )
771 )
768 self.kernel = kernel
772 self.kernel = kernel
769 kernel.record_ports(self.ports)
773 kernel.record_ports(self.ports)
770 shell = kernel.shell
774 shell = kernel.shell
771 if self.pylab:
775 if self.pylab:
772 try:
776 try:
773 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
777 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
774 shell.enable_pylab(gui, import_all=self.pylab_import_all)
778 shell.enable_pylab(gui, import_all=self.pylab_import_all)
775 except Exception:
779 except Exception:
776 self.log.error("Pylab initialization failed", exc_info=True)
780 self.log.error("Pylab initialization failed", exc_info=True)
777 # print exception straight to stdout, because normally
781 # print exception straight to stdout, because normally
778 # _showtraceback associates the reply with an execution,
782 # _showtraceback associates the reply with an execution,
779 # which means frontends will never draw it, as this exception
783 # which means frontends will never draw it, as this exception
780 # is not associated with any execute request.
784 # is not associated with any execute request.
781
785
782 # replace pyerr-sending traceback with stdout
786 # replace pyerr-sending traceback with stdout
783 _showtraceback = shell._showtraceback
787 _showtraceback = shell._showtraceback
784 def print_tb(etype, evalue, stb):
788 def print_tb(etype, evalue, stb):
785 print ("Error initializing pylab, pylab mode will not "
789 print ("Error initializing pylab, pylab mode will not "
786 "be active", file=io.stderr)
790 "be active", file=io.stderr)
787 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
791 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
788 shell._showtraceback = print_tb
792 shell._showtraceback = print_tb
789
793
790 # send the traceback over stdout
794 # send the traceback over stdout
791 shell.showtraceback(tb_offset=0)
795 shell.showtraceback(tb_offset=0)
792
796
793 # restore proper _showtraceback method
797 # restore proper _showtraceback method
794 shell._showtraceback = _showtraceback
798 shell._showtraceback = _showtraceback
795
799
796
800
797 def init_shell(self):
801 def init_shell(self):
798 self.shell = self.kernel.shell
802 self.shell = self.kernel.shell
799 self.shell.configurables.append(self)
803 self.shell.configurables.append(self)
800
804
801
805
802 #-----------------------------------------------------------------------------
806 #-----------------------------------------------------------------------------
803 # Kernel main and launch functions
807 # Kernel main and launch functions
804 #-----------------------------------------------------------------------------
808 #-----------------------------------------------------------------------------
805
809
806 def launch_kernel(*args, **kwargs):
810 def launch_kernel(*args, **kwargs):
807 """Launches a localhost IPython kernel, binding to the specified ports.
811 """Launches a localhost IPython kernel, binding to the specified ports.
808
812
809 This function simply calls entry_point.base_launch_kernel with the right
813 This function simply calls entry_point.base_launch_kernel with the right
810 first command to start an ipkernel. See base_launch_kernel for arguments.
814 first command to start an ipkernel. See base_launch_kernel for arguments.
811
815
812 Returns
816 Returns
813 -------
817 -------
814 A tuple of form:
818 A tuple of form:
815 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
819 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
816 where kernel_process is a Popen object and the ports are integers.
820 where kernel_process is a Popen object and the ports are integers.
817 """
821 """
818 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
822 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
819 *args, **kwargs)
823 *args, **kwargs)
820
824
821
825
822 def embed_kernel(module=None, local_ns=None, **kwargs):
826 def embed_kernel(module=None, local_ns=None, **kwargs):
823 """Embed and start an IPython kernel in a given scope.
827 """Embed and start an IPython kernel in a given scope.
824
828
825 Parameters
829 Parameters
826 ----------
830 ----------
827 module : ModuleType, optional
831 module : ModuleType, optional
828 The module to load into IPython globals (default: caller)
832 The module to load into IPython globals (default: caller)
829 local_ns : dict, optional
833 local_ns : dict, optional
830 The namespace to load into IPython user namespace (default: caller)
834 The namespace to load into IPython user namespace (default: caller)
831
835
832 kwargs : various, optional
836 kwargs : various, optional
833 Further keyword args are relayed to the KernelApp constructor,
837 Further keyword args are relayed to the KernelApp constructor,
834 allowing configuration of the Kernel. Will only have an effect
838 allowing configuration of the Kernel. Will only have an effect
835 on the first embed_kernel call for a given process.
839 on the first embed_kernel call for a given process.
836
840
837 """
841 """
838 # get the app if it exists, or set it up if it doesn't
842 # get the app if it exists, or set it up if it doesn't
839 if IPKernelApp.initialized():
843 if IPKernelApp.initialized():
840 app = IPKernelApp.instance()
844 app = IPKernelApp.instance()
841 else:
845 else:
842 app = IPKernelApp.instance(**kwargs)
846 app = IPKernelApp.instance(**kwargs)
843 app.initialize([])
847 app.initialize([])
844
848
845 # load the calling scope if not given
849 # load the calling scope if not given
846 (caller_module, caller_locals) = extract_module_locals(1)
850 (caller_module, caller_locals) = extract_module_locals(1)
847 if module is None:
851 if module is None:
848 module = caller_module
852 module = caller_module
849 if local_ns is None:
853 if local_ns is None:
850 local_ns = caller_locals
854 local_ns = caller_locals
851
855
852 app.kernel.user_module = module
856 app.kernel.user_module = module
853 app.kernel.user_ns = local_ns
857 app.kernel.user_ns = local_ns
854 app.start()
858 app.start()
855
859
856 def main():
860 def main():
857 """Run an IPKernel as an application"""
861 """Run an IPKernel as an application"""
858 app = IPKernelApp.instance()
862 app = IPKernelApp.instance()
859 app.initialize()
863 app.initialize()
860 app.start()
864 app.start()
861
865
862
866
863 if __name__ == '__main__':
867 if __name__ == '__main__':
864 main()
868 main()
General Comments 0
You need to be logged in to leave comments. Login now