##// END OF EJS Templates
Remove uses of iterkeys
Thomas Kluyver -
Show More
@@ -1,338 +1,338 b''
1 """Implementations for various useful completers.
1 """Implementations for various useful completers.
2
2
3 These are all loaded by default by IPython.
3 These are all loaded by default by IPython.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team.
6 # Copyright (C) 2010-2011 The IPython Development Team.
7 #
7 #
8 # Distributed under the terms of the BSD License.
8 # Distributed under the terms of the BSD License.
9 #
9 #
10 # The full license is in the file COPYING.txt, distributed with this software.
10 # The full license is in the file COPYING.txt, distributed with this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Stdlib imports
18 # Stdlib imports
19 import glob
19 import glob
20 import imp
20 import imp
21 import inspect
21 import inspect
22 import os
22 import os
23 import re
23 import re
24 import sys
24 import sys
25
25
26 # Third-party imports
26 # Third-party imports
27 from time import time
27 from time import time
28 from zipimport import zipimporter
28 from zipimport import zipimporter
29
29
30 # Our own imports
30 # Our own imports
31 from IPython.core.completer import expand_user, compress_user
31 from IPython.core.completer import expand_user, compress_user
32 from IPython.core.error import TryNext
32 from IPython.core.error import TryNext
33 from IPython.utils._process_common import arg_split
33 from IPython.utils._process_common import arg_split
34 from IPython.utils.py3compat import string_types
34 from IPython.utils.py3compat import string_types
35
35
36 # FIXME: this should be pulled in with the right call via the component system
36 # FIXME: this should be pulled in with the right call via the component system
37 from IPython import get_ipython
37 from IPython import get_ipython
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Globals and constants
40 # Globals and constants
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 # Time in seconds after which the rootmodules will be stored permanently in the
43 # Time in seconds after which the rootmodules will be stored permanently in the
44 # ipython ip.db database (kept in the user's .ipython dir).
44 # ipython ip.db database (kept in the user's .ipython dir).
45 TIMEOUT_STORAGE = 2
45 TIMEOUT_STORAGE = 2
46
46
47 # Time in seconds after which we give up
47 # Time in seconds after which we give up
48 TIMEOUT_GIVEUP = 20
48 TIMEOUT_GIVEUP = 20
49
49
50 # Regular expression for the python import statement
50 # Regular expression for the python import statement
51 import_re = re.compile(r'(?P<name>[a-zA-Z_][a-zA-Z0-9_]*?)'
51 import_re = re.compile(r'(?P<name>[a-zA-Z_][a-zA-Z0-9_]*?)'
52 r'(?P<package>[/\\]__init__)?'
52 r'(?P<package>[/\\]__init__)?'
53 r'(?P<suffix>%s)$' %
53 r'(?P<suffix>%s)$' %
54 r'|'.join(re.escape(s[0]) for s in imp.get_suffixes()))
54 r'|'.join(re.escape(s[0]) for s in imp.get_suffixes()))
55
55
56 # RE for the ipython %run command (python + ipython scripts)
56 # RE for the ipython %run command (python + ipython scripts)
57 magic_run_re = re.compile(r'.*(\.ipy|\.py[w]?)$')
57 magic_run_re = re.compile(r'.*(\.ipy|\.py[w]?)$')
58
58
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60 # Local utilities
60 # Local utilities
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62
62
63 def module_list(path):
63 def module_list(path):
64 """
64 """
65 Return the list containing the names of the modules available in the given
65 Return the list containing the names of the modules available in the given
66 folder.
66 folder.
67 """
67 """
68 # sys.path has the cwd as an empty string, but isdir/listdir need it as '.'
68 # sys.path has the cwd as an empty string, but isdir/listdir need it as '.'
69 if path == '':
69 if path == '':
70 path = '.'
70 path = '.'
71
71
72 # A few local constants to be used in loops below
72 # A few local constants to be used in loops below
73 pjoin = os.path.join
73 pjoin = os.path.join
74
74
75 if os.path.isdir(path):
75 if os.path.isdir(path):
76 # Build a list of all files in the directory and all files
76 # Build a list of all files in the directory and all files
77 # in its subdirectories. For performance reasons, do not
77 # in its subdirectories. For performance reasons, do not
78 # recurse more than one level into subdirectories.
78 # recurse more than one level into subdirectories.
79 files = []
79 files = []
80 for root, dirs, nondirs in os.walk(path):
80 for root, dirs, nondirs in os.walk(path):
81 subdir = root[len(path)+1:]
81 subdir = root[len(path)+1:]
82 if subdir:
82 if subdir:
83 files.extend(pjoin(subdir, f) for f in nondirs)
83 files.extend(pjoin(subdir, f) for f in nondirs)
84 dirs[:] = [] # Do not recurse into additional subdirectories.
84 dirs[:] = [] # Do not recurse into additional subdirectories.
85 else:
85 else:
86 files.extend(nondirs)
86 files.extend(nondirs)
87
87
88 else:
88 else:
89 try:
89 try:
90 files = list(zipimporter(path)._files.keys())
90 files = list(zipimporter(path)._files.keys())
91 except:
91 except:
92 files = []
92 files = []
93
93
94 # Build a list of modules which match the import_re regex.
94 # Build a list of modules which match the import_re regex.
95 modules = []
95 modules = []
96 for f in files:
96 for f in files:
97 m = import_re.match(f)
97 m = import_re.match(f)
98 if m:
98 if m:
99 modules.append(m.group('name'))
99 modules.append(m.group('name'))
100 return list(set(modules))
100 return list(set(modules))
101
101
102
102
103 def get_root_modules():
103 def get_root_modules():
104 """
104 """
105 Returns a list containing the names of all the modules available in the
105 Returns a list containing the names of all the modules available in the
106 folders of the pythonpath.
106 folders of the pythonpath.
107
107
108 ip.db['rootmodules_cache'] maps sys.path entries to list of modules.
108 ip.db['rootmodules_cache'] maps sys.path entries to list of modules.
109 """
109 """
110 ip = get_ipython()
110 ip = get_ipython()
111 rootmodules_cache = ip.db.get('rootmodules_cache', {})
111 rootmodules_cache = ip.db.get('rootmodules_cache', {})
112 rootmodules = list(sys.builtin_module_names)
112 rootmodules = list(sys.builtin_module_names)
113 start_time = time()
113 start_time = time()
114 store = False
114 store = False
115 for path in sys.path:
115 for path in sys.path:
116 try:
116 try:
117 modules = rootmodules_cache[path]
117 modules = rootmodules_cache[path]
118 except KeyError:
118 except KeyError:
119 modules = module_list(path)
119 modules = module_list(path)
120 try:
120 try:
121 modules.remove('__init__')
121 modules.remove('__init__')
122 except ValueError:
122 except ValueError:
123 pass
123 pass
124 if path not in ('', '.'): # cwd modules should not be cached
124 if path not in ('', '.'): # cwd modules should not be cached
125 rootmodules_cache[path] = modules
125 rootmodules_cache[path] = modules
126 if time() - start_time > TIMEOUT_STORAGE and not store:
126 if time() - start_time > TIMEOUT_STORAGE and not store:
127 store = True
127 store = True
128 print("\nCaching the list of root modules, please wait!")
128 print("\nCaching the list of root modules, please wait!")
129 print("(This will only be done once - type '%rehashx' to "
129 print("(This will only be done once - type '%rehashx' to "
130 "reset cache!)\n")
130 "reset cache!)\n")
131 sys.stdout.flush()
131 sys.stdout.flush()
132 if time() - start_time > TIMEOUT_GIVEUP:
132 if time() - start_time > TIMEOUT_GIVEUP:
133 print("This is taking too long, we give up.\n")
133 print("This is taking too long, we give up.\n")
134 return []
134 return []
135 rootmodules.extend(modules)
135 rootmodules.extend(modules)
136 if store:
136 if store:
137 ip.db['rootmodules_cache'] = rootmodules_cache
137 ip.db['rootmodules_cache'] = rootmodules_cache
138 rootmodules = list(set(rootmodules))
138 rootmodules = list(set(rootmodules))
139 return rootmodules
139 return rootmodules
140
140
141
141
142 def is_importable(module, attr, only_modules):
142 def is_importable(module, attr, only_modules):
143 if only_modules:
143 if only_modules:
144 return inspect.ismodule(getattr(module, attr))
144 return inspect.ismodule(getattr(module, attr))
145 else:
145 else:
146 return not(attr[:2] == '__' and attr[-2:] == '__')
146 return not(attr[:2] == '__' and attr[-2:] == '__')
147
147
148
148
149 def try_import(mod, only_modules=False):
149 def try_import(mod, only_modules=False):
150 try:
150 try:
151 m = __import__(mod)
151 m = __import__(mod)
152 except:
152 except:
153 return []
153 return []
154 mods = mod.split('.')
154 mods = mod.split('.')
155 for module in mods[1:]:
155 for module in mods[1:]:
156 m = getattr(m, module)
156 m = getattr(m, module)
157
157
158 m_is_init = hasattr(m, '__file__') and '__init__' in m.__file__
158 m_is_init = hasattr(m, '__file__') and '__init__' in m.__file__
159
159
160 completions = []
160 completions = []
161 if (not hasattr(m, '__file__')) or (not only_modules) or m_is_init:
161 if (not hasattr(m, '__file__')) or (not only_modules) or m_is_init:
162 completions.extend( [attr for attr in dir(m) if
162 completions.extend( [attr for attr in dir(m) if
163 is_importable(m, attr, only_modules)])
163 is_importable(m, attr, only_modules)])
164
164
165 completions.extend(getattr(m, '__all__', []))
165 completions.extend(getattr(m, '__all__', []))
166 if m_is_init:
166 if m_is_init:
167 completions.extend(module_list(os.path.dirname(m.__file__)))
167 completions.extend(module_list(os.path.dirname(m.__file__)))
168 completions = set(completions)
168 completions = set(completions)
169 if '__init__' in completions:
169 if '__init__' in completions:
170 completions.remove('__init__')
170 completions.remove('__init__')
171 return list(completions)
171 return list(completions)
172
172
173
173
174 #-----------------------------------------------------------------------------
174 #-----------------------------------------------------------------------------
175 # Completion-related functions.
175 # Completion-related functions.
176 #-----------------------------------------------------------------------------
176 #-----------------------------------------------------------------------------
177
177
178 def quick_completer(cmd, completions):
178 def quick_completer(cmd, completions):
179 """ Easily create a trivial completer for a command.
179 """ Easily create a trivial completer for a command.
180
180
181 Takes either a list of completions, or all completions in string (that will
181 Takes either a list of completions, or all completions in string (that will
182 be split on whitespace).
182 be split on whitespace).
183
183
184 Example::
184 Example::
185
185
186 [d:\ipython]|1> import ipy_completers
186 [d:\ipython]|1> import ipy_completers
187 [d:\ipython]|2> ipy_completers.quick_completer('foo', ['bar','baz'])
187 [d:\ipython]|2> ipy_completers.quick_completer('foo', ['bar','baz'])
188 [d:\ipython]|3> foo b<TAB>
188 [d:\ipython]|3> foo b<TAB>
189 bar baz
189 bar baz
190 [d:\ipython]|3> foo ba
190 [d:\ipython]|3> foo ba
191 """
191 """
192
192
193 if isinstance(completions, string_types):
193 if isinstance(completions, string_types):
194 completions = completions.split()
194 completions = completions.split()
195
195
196 def do_complete(self, event):
196 def do_complete(self, event):
197 return completions
197 return completions
198
198
199 get_ipython().set_hook('complete_command',do_complete, str_key = cmd)
199 get_ipython().set_hook('complete_command',do_complete, str_key = cmd)
200
200
201 def module_completion(line):
201 def module_completion(line):
202 """
202 """
203 Returns a list containing the completion possibilities for an import line.
203 Returns a list containing the completion possibilities for an import line.
204
204
205 The line looks like this :
205 The line looks like this :
206 'import xml.d'
206 'import xml.d'
207 'from xml.dom import'
207 'from xml.dom import'
208 """
208 """
209
209
210 words = line.split(' ')
210 words = line.split(' ')
211 nwords = len(words)
211 nwords = len(words)
212
212
213 # from whatever <tab> -> 'import '
213 # from whatever <tab> -> 'import '
214 if nwords == 3 and words[0] == 'from':
214 if nwords == 3 and words[0] == 'from':
215 return ['import ']
215 return ['import ']
216
216
217 # 'from xy<tab>' or 'import xy<tab>'
217 # 'from xy<tab>' or 'import xy<tab>'
218 if nwords < 3 and (words[0] in ['import','from']) :
218 if nwords < 3 and (words[0] in ['import','from']) :
219 if nwords == 1:
219 if nwords == 1:
220 return get_root_modules()
220 return get_root_modules()
221 mod = words[1].split('.')
221 mod = words[1].split('.')
222 if len(mod) < 2:
222 if len(mod) < 2:
223 return get_root_modules()
223 return get_root_modules()
224 completion_list = try_import('.'.join(mod[:-1]), True)
224 completion_list = try_import('.'.join(mod[:-1]), True)
225 return ['.'.join(mod[:-1] + [el]) for el in completion_list]
225 return ['.'.join(mod[:-1] + [el]) for el in completion_list]
226
226
227 # 'from xyz import abc<tab>'
227 # 'from xyz import abc<tab>'
228 if nwords >= 3 and words[0] == 'from':
228 if nwords >= 3 and words[0] == 'from':
229 mod = words[1]
229 mod = words[1]
230 return try_import(mod)
230 return try_import(mod)
231
231
232 #-----------------------------------------------------------------------------
232 #-----------------------------------------------------------------------------
233 # Completers
233 # Completers
234 #-----------------------------------------------------------------------------
234 #-----------------------------------------------------------------------------
235 # These all have the func(self, event) signature to be used as custom
235 # These all have the func(self, event) signature to be used as custom
236 # completers
236 # completers
237
237
238 def module_completer(self,event):
238 def module_completer(self,event):
239 """Give completions after user has typed 'import ...' or 'from ...'"""
239 """Give completions after user has typed 'import ...' or 'from ...'"""
240
240
241 # This works in all versions of python. While 2.5 has
241 # This works in all versions of python. While 2.5 has
242 # pkgutil.walk_packages(), that particular routine is fairly dangerous,
242 # pkgutil.walk_packages(), that particular routine is fairly dangerous,
243 # since it imports *EVERYTHING* on sys.path. That is: a) very slow b) full
243 # since it imports *EVERYTHING* on sys.path. That is: a) very slow b) full
244 # of possibly problematic side effects.
244 # of possibly problematic side effects.
245 # This search the folders in the sys.path for available modules.
245 # This search the folders in the sys.path for available modules.
246
246
247 return module_completion(event.line)
247 return module_completion(event.line)
248
248
249 # FIXME: there's a lot of logic common to the run, cd and builtin file
249 # FIXME: there's a lot of logic common to the run, cd and builtin file
250 # completers, that is currently reimplemented in each.
250 # completers, that is currently reimplemented in each.
251
251
252 def magic_run_completer(self, event):
252 def magic_run_completer(self, event):
253 """Complete files that end in .py or .ipy for the %run command.
253 """Complete files that end in .py or .ipy for the %run command.
254 """
254 """
255 comps = arg_split(event.line, strict=False)
255 comps = arg_split(event.line, strict=False)
256 relpath = (len(comps) > 1 and comps[-1] or '').strip("'\"")
256 relpath = (len(comps) > 1 and comps[-1] or '').strip("'\"")
257
257
258 #print("\nev=", event) # dbg
258 #print("\nev=", event) # dbg
259 #print("rp=", relpath) # dbg
259 #print("rp=", relpath) # dbg
260 #print('comps=', comps) # dbg
260 #print('comps=', comps) # dbg
261
261
262 lglob = glob.glob
262 lglob = glob.glob
263 isdir = os.path.isdir
263 isdir = os.path.isdir
264 relpath, tilde_expand, tilde_val = expand_user(relpath)
264 relpath, tilde_expand, tilde_val = expand_user(relpath)
265
265
266 dirs = [f.replace('\\','/') + "/" for f in lglob(relpath+'*') if isdir(f)]
266 dirs = [f.replace('\\','/') + "/" for f in lglob(relpath+'*') if isdir(f)]
267
267
268 # Find if the user has already typed the first filename, after which we
268 # Find if the user has already typed the first filename, after which we
269 # should complete on all files, since after the first one other files may
269 # should complete on all files, since after the first one other files may
270 # be arguments to the input script.
270 # be arguments to the input script.
271
271
272 if filter(magic_run_re.match, comps):
272 if filter(magic_run_re.match, comps):
273 pys = [f.replace('\\','/') for f in lglob('*')]
273 pys = [f.replace('\\','/') for f in lglob('*')]
274 else:
274 else:
275 pys = [f.replace('\\','/')
275 pys = [f.replace('\\','/')
276 for f in lglob(relpath+'*.py') + lglob(relpath+'*.ipy') +
276 for f in lglob(relpath+'*.py') + lglob(relpath+'*.ipy') +
277 lglob(relpath + '*.pyw')]
277 lglob(relpath + '*.pyw')]
278 #print('run comp:', dirs+pys) # dbg
278 #print('run comp:', dirs+pys) # dbg
279 return [compress_user(p, tilde_expand, tilde_val) for p in dirs+pys]
279 return [compress_user(p, tilde_expand, tilde_val) for p in dirs+pys]
280
280
281
281
282 def cd_completer(self, event):
282 def cd_completer(self, event):
283 """Completer function for cd, which only returns directories."""
283 """Completer function for cd, which only returns directories."""
284 ip = get_ipython()
284 ip = get_ipython()
285 relpath = event.symbol
285 relpath = event.symbol
286
286
287 #print(event) # dbg
287 #print(event) # dbg
288 if event.line.endswith('-b') or ' -b ' in event.line:
288 if event.line.endswith('-b') or ' -b ' in event.line:
289 # return only bookmark completions
289 # return only bookmark completions
290 bkms = self.db.get('bookmarks', None)
290 bkms = self.db.get('bookmarks', None)
291 if bkms:
291 if bkms:
292 return bkms.keys()
292 return bkms.keys()
293 else:
293 else:
294 return []
294 return []
295
295
296 if event.symbol == '-':
296 if event.symbol == '-':
297 width_dh = str(len(str(len(ip.user_ns['_dh']) + 1)))
297 width_dh = str(len(str(len(ip.user_ns['_dh']) + 1)))
298 # jump in directory history by number
298 # jump in directory history by number
299 fmt = '-%0' + width_dh +'d [%s]'
299 fmt = '-%0' + width_dh +'d [%s]'
300 ents = [ fmt % (i,s) for i,s in enumerate(ip.user_ns['_dh'])]
300 ents = [ fmt % (i,s) for i,s in enumerate(ip.user_ns['_dh'])]
301 if len(ents) > 1:
301 if len(ents) > 1:
302 return ents
302 return ents
303 return []
303 return []
304
304
305 if event.symbol.startswith('--'):
305 if event.symbol.startswith('--'):
306 return ["--" + os.path.basename(d) for d in ip.user_ns['_dh']]
306 return ["--" + os.path.basename(d) for d in ip.user_ns['_dh']]
307
307
308 # Expand ~ in path and normalize directory separators.
308 # Expand ~ in path and normalize directory separators.
309 relpath, tilde_expand, tilde_val = expand_user(relpath)
309 relpath, tilde_expand, tilde_val = expand_user(relpath)
310 relpath = relpath.replace('\\','/')
310 relpath = relpath.replace('\\','/')
311
311
312 found = []
312 found = []
313 for d in [f.replace('\\','/') + '/' for f in glob.glob(relpath+'*')
313 for d in [f.replace('\\','/') + '/' for f in glob.glob(relpath+'*')
314 if os.path.isdir(f)]:
314 if os.path.isdir(f)]:
315 if ' ' in d:
315 if ' ' in d:
316 # we don't want to deal with any of that, complex code
316 # we don't want to deal with any of that, complex code
317 # for this is elsewhere
317 # for this is elsewhere
318 raise TryNext
318 raise TryNext
319
319
320 found.append(d)
320 found.append(d)
321
321
322 if not found:
322 if not found:
323 if os.path.isdir(relpath):
323 if os.path.isdir(relpath):
324 return [compress_user(relpath, tilde_expand, tilde_val)]
324 return [compress_user(relpath, tilde_expand, tilde_val)]
325
325
326 # if no completions so far, try bookmarks
326 # if no completions so far, try bookmarks
327 bks = self.db.get('bookmarks',{}).iterkeys()
327 bks = self.db.get('bookmarks',{})
328 bkmatches = [s for s in bks if s.startswith(event.symbol)]
328 bkmatches = [s for s in bks if s.startswith(event.symbol)]
329 if bkmatches:
329 if bkmatches:
330 return bkmatches
330 return bkmatches
331
331
332 raise TryNext
332 raise TryNext
333
333
334 return [compress_user(p, tilde_expand, tilde_val) for p in found]
334 return [compress_user(p, tilde_expand, tilde_val) for p in found]
335
335
336 def reset_completer(self, event):
336 def reset_completer(self, event):
337 "A completer for %reset magic"
337 "A completer for %reset magic"
338 return '-f -s in out array dhist'.split()
338 return '-f -s in out array dhist'.split()
@@ -1,789 +1,789 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """An interactive kernel that talks to frontends over 0MQ."""
2 """An interactive kernel that talks to frontends over 0MQ."""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Imports
5 # Imports
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 from __future__ import print_function
7 from __future__ import print_function
8
8
9 # Standard library imports
9 # Standard library imports
10 import sys
10 import sys
11 import time
11 import time
12 import traceback
12 import traceback
13 import logging
13 import logging
14 import uuid
14 import uuid
15
15
16 from datetime import datetime
16 from datetime import datetime
17 from signal import (
17 from signal import (
18 signal, default_int_handler, SIGINT
18 signal, default_int_handler, SIGINT
19 )
19 )
20
20
21 # System library imports
21 # System library imports
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # Local imports
26 # Local imports
27 from IPython.config.configurable import Configurable
27 from IPython.config.configurable import Configurable
28 from IPython.core.error import StdinNotImplementedError
28 from IPython.core.error import StdinNotImplementedError
29 from IPython.core import release
29 from IPython.core import release
30 from IPython.utils import py3compat
30 from IPython.utils import py3compat
31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
32 from IPython.utils.jsonutil import json_clean
32 from IPython.utils.jsonutil import json_clean
33 from IPython.utils.traitlets import (
33 from IPython.utils.traitlets import (
34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
35 Type
35 Type
36 )
36 )
37
37
38 from .serialize import serialize_object, unpack_apply_message
38 from .serialize import serialize_object, unpack_apply_message
39 from .session import Session
39 from .session import Session
40 from .zmqshell import ZMQInteractiveShell
40 from .zmqshell import ZMQInteractiveShell
41
41
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Main kernel class
44 # Main kernel class
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 protocol_version = list(release.kernel_protocol_version_info)
47 protocol_version = list(release.kernel_protocol_version_info)
48 ipython_version = list(release.version_info)
48 ipython_version = list(release.version_info)
49 language_version = list(sys.version_info[:3])
49 language_version = list(sys.version_info[:3])
50
50
51
51
52 class Kernel(Configurable):
52 class Kernel(Configurable):
53
53
54 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
55 # Kernel interface
55 # Kernel interface
56 #---------------------------------------------------------------------------
56 #---------------------------------------------------------------------------
57
57
58 # attribute to override with a GUI
58 # attribute to override with a GUI
59 eventloop = Any(None)
59 eventloop = Any(None)
60 def _eventloop_changed(self, name, old, new):
60 def _eventloop_changed(self, name, old, new):
61 """schedule call to eventloop from IOLoop"""
61 """schedule call to eventloop from IOLoop"""
62 loop = ioloop.IOLoop.instance()
62 loop = ioloop.IOLoop.instance()
63 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
63 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
64
64
65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
66 shell_class = Type(ZMQInteractiveShell)
66 shell_class = Type(ZMQInteractiveShell)
67
67
68 session = Instance(Session)
68 session = Instance(Session)
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 shell_streams = List()
70 shell_streams = List()
71 control_stream = Instance(ZMQStream)
71 control_stream = Instance(ZMQStream)
72 iopub_socket = Instance(zmq.Socket)
72 iopub_socket = Instance(zmq.Socket)
73 stdin_socket = Instance(zmq.Socket)
73 stdin_socket = Instance(zmq.Socket)
74 log = Instance(logging.Logger)
74 log = Instance(logging.Logger)
75
75
76 user_module = Any()
76 user_module = Any()
77 def _user_module_changed(self, name, old, new):
77 def _user_module_changed(self, name, old, new):
78 if self.shell is not None:
78 if self.shell is not None:
79 self.shell.user_module = new
79 self.shell.user_module = new
80
80
81 user_ns = Instance(dict, args=None, allow_none=True)
81 user_ns = Instance(dict, args=None, allow_none=True)
82 def _user_ns_changed(self, name, old, new):
82 def _user_ns_changed(self, name, old, new):
83 if self.shell is not None:
83 if self.shell is not None:
84 self.shell.user_ns = new
84 self.shell.user_ns = new
85 self.shell.init_user_ns()
85 self.shell.init_user_ns()
86
86
87 # identities:
87 # identities:
88 int_id = Integer(-1)
88 int_id = Integer(-1)
89 ident = Unicode()
89 ident = Unicode()
90
90
91 def _ident_default(self):
91 def _ident_default(self):
92 return unicode_type(uuid.uuid4())
92 return unicode_type(uuid.uuid4())
93
93
94
94
95 # Private interface
95 # Private interface
96
96
97 # Time to sleep after flushing the stdout/err buffers in each execute
97 # Time to sleep after flushing the stdout/err buffers in each execute
98 # cycle. While this introduces a hard limit on the minimal latency of the
98 # cycle. While this introduces a hard limit on the minimal latency of the
99 # execute cycle, it helps prevent output synchronization problems for
99 # execute cycle, it helps prevent output synchronization problems for
100 # clients.
100 # clients.
101 # Units are in seconds. The minimum zmq latency on local host is probably
101 # Units are in seconds. The minimum zmq latency on local host is probably
102 # ~150 microseconds, set this to 500us for now. We may need to increase it
102 # ~150 microseconds, set this to 500us for now. We may need to increase it
103 # a little if it's not enough after more interactive testing.
103 # a little if it's not enough after more interactive testing.
104 _execute_sleep = Float(0.0005, config=True)
104 _execute_sleep = Float(0.0005, config=True)
105
105
106 # Frequency of the kernel's event loop.
106 # Frequency of the kernel's event loop.
107 # Units are in seconds, kernel subclasses for GUI toolkits may need to
107 # Units are in seconds, kernel subclasses for GUI toolkits may need to
108 # adapt to milliseconds.
108 # adapt to milliseconds.
109 _poll_interval = Float(0.05, config=True)
109 _poll_interval = Float(0.05, config=True)
110
110
111 # If the shutdown was requested over the network, we leave here the
111 # If the shutdown was requested over the network, we leave here the
112 # necessary reply message so it can be sent by our registered atexit
112 # necessary reply message so it can be sent by our registered atexit
113 # handler. This ensures that the reply is only sent to clients truly at
113 # handler. This ensures that the reply is only sent to clients truly at
114 # the end of our shutdown process (which happens after the underlying
114 # the end of our shutdown process (which happens after the underlying
115 # IPython shell's own shutdown).
115 # IPython shell's own shutdown).
116 _shutdown_message = None
116 _shutdown_message = None
117
117
118 # This is a dict of port number that the kernel is listening on. It is set
118 # This is a dict of port number that the kernel is listening on. It is set
119 # by record_ports and used by connect_request.
119 # by record_ports and used by connect_request.
120 _recorded_ports = Dict()
120 _recorded_ports = Dict()
121
121
122 # A reference to the Python builtin 'raw_input' function.
122 # A reference to the Python builtin 'raw_input' function.
123 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
123 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
124 _sys_raw_input = Any()
124 _sys_raw_input = Any()
125 _sys_eval_input = Any()
125 _sys_eval_input = Any()
126
126
127 # set of aborted msg_ids
127 # set of aborted msg_ids
128 aborted = Set()
128 aborted = Set()
129
129
130
130
131 def __init__(self, **kwargs):
131 def __init__(self, **kwargs):
132 super(Kernel, self).__init__(**kwargs)
132 super(Kernel, self).__init__(**kwargs)
133
133
134 # Initialize the InteractiveShell subclass
134 # Initialize the InteractiveShell subclass
135 self.shell = self.shell_class.instance(parent=self,
135 self.shell = self.shell_class.instance(parent=self,
136 profile_dir = self.profile_dir,
136 profile_dir = self.profile_dir,
137 user_module = self.user_module,
137 user_module = self.user_module,
138 user_ns = self.user_ns,
138 user_ns = self.user_ns,
139 kernel = self,
139 kernel = self,
140 )
140 )
141 self.shell.displayhook.session = self.session
141 self.shell.displayhook.session = self.session
142 self.shell.displayhook.pub_socket = self.iopub_socket
142 self.shell.displayhook.pub_socket = self.iopub_socket
143 self.shell.displayhook.topic = self._topic('pyout')
143 self.shell.displayhook.topic = self._topic('pyout')
144 self.shell.display_pub.session = self.session
144 self.shell.display_pub.session = self.session
145 self.shell.display_pub.pub_socket = self.iopub_socket
145 self.shell.display_pub.pub_socket = self.iopub_socket
146 self.shell.data_pub.session = self.session
146 self.shell.data_pub.session = self.session
147 self.shell.data_pub.pub_socket = self.iopub_socket
147 self.shell.data_pub.pub_socket = self.iopub_socket
148
148
149 # TMP - hack while developing
149 # TMP - hack while developing
150 self.shell._reply_content = None
150 self.shell._reply_content = None
151
151
152 # Build dict of handlers for message types
152 # Build dict of handlers for message types
153 msg_types = [ 'execute_request', 'complete_request',
153 msg_types = [ 'execute_request', 'complete_request',
154 'object_info_request', 'history_request',
154 'object_info_request', 'history_request',
155 'kernel_info_request',
155 'kernel_info_request',
156 'connect_request', 'shutdown_request',
156 'connect_request', 'shutdown_request',
157 'apply_request',
157 'apply_request',
158 ]
158 ]
159 self.shell_handlers = {}
159 self.shell_handlers = {}
160 for msg_type in msg_types:
160 for msg_type in msg_types:
161 self.shell_handlers[msg_type] = getattr(self, msg_type)
161 self.shell_handlers[msg_type] = getattr(self, msg_type)
162
162
163 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
163 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
164 comm_manager = self.shell.comm_manager
164 comm_manager = self.shell.comm_manager
165 for msg_type in comm_msg_types:
165 for msg_type in comm_msg_types:
166 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
166 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
167
167
168 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
168 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
169 self.control_handlers = {}
169 self.control_handlers = {}
170 for msg_type in control_msg_types:
170 for msg_type in control_msg_types:
171 self.control_handlers[msg_type] = getattr(self, msg_type)
171 self.control_handlers[msg_type] = getattr(self, msg_type)
172
172
173
173
174 def dispatch_control(self, msg):
174 def dispatch_control(self, msg):
175 """dispatch control requests"""
175 """dispatch control requests"""
176 idents,msg = self.session.feed_identities(msg, copy=False)
176 idents,msg = self.session.feed_identities(msg, copy=False)
177 try:
177 try:
178 msg = self.session.unserialize(msg, content=True, copy=False)
178 msg = self.session.unserialize(msg, content=True, copy=False)
179 except:
179 except:
180 self.log.error("Invalid Control Message", exc_info=True)
180 self.log.error("Invalid Control Message", exc_info=True)
181 return
181 return
182
182
183 self.log.debug("Control received: %s", msg)
183 self.log.debug("Control received: %s", msg)
184
184
185 header = msg['header']
185 header = msg['header']
186 msg_id = header['msg_id']
186 msg_id = header['msg_id']
187 msg_type = header['msg_type']
187 msg_type = header['msg_type']
188
188
189 handler = self.control_handlers.get(msg_type, None)
189 handler = self.control_handlers.get(msg_type, None)
190 if handler is None:
190 if handler is None:
191 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
191 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
192 else:
192 else:
193 try:
193 try:
194 handler(self.control_stream, idents, msg)
194 handler(self.control_stream, idents, msg)
195 except Exception:
195 except Exception:
196 self.log.error("Exception in control handler:", exc_info=True)
196 self.log.error("Exception in control handler:", exc_info=True)
197
197
198 def dispatch_shell(self, stream, msg):
198 def dispatch_shell(self, stream, msg):
199 """dispatch shell requests"""
199 """dispatch shell requests"""
200 # flush control requests first
200 # flush control requests first
201 if self.control_stream:
201 if self.control_stream:
202 self.control_stream.flush()
202 self.control_stream.flush()
203
203
204 idents,msg = self.session.feed_identities(msg, copy=False)
204 idents,msg = self.session.feed_identities(msg, copy=False)
205 try:
205 try:
206 msg = self.session.unserialize(msg, content=True, copy=False)
206 msg = self.session.unserialize(msg, content=True, copy=False)
207 except:
207 except:
208 self.log.error("Invalid Message", exc_info=True)
208 self.log.error("Invalid Message", exc_info=True)
209 return
209 return
210
210
211 header = msg['header']
211 header = msg['header']
212 msg_id = header['msg_id']
212 msg_id = header['msg_id']
213 msg_type = msg['header']['msg_type']
213 msg_type = msg['header']['msg_type']
214
214
215 # Print some info about this message and leave a '--->' marker, so it's
215 # Print some info about this message and leave a '--->' marker, so it's
216 # easier to trace visually the message chain when debugging. Each
216 # easier to trace visually the message chain when debugging. Each
217 # handler prints its message at the end.
217 # handler prints its message at the end.
218 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
218 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
219 self.log.debug(' Content: %s\n --->\n ', msg['content'])
219 self.log.debug(' Content: %s\n --->\n ', msg['content'])
220
220
221 if msg_id in self.aborted:
221 if msg_id in self.aborted:
222 self.aborted.remove(msg_id)
222 self.aborted.remove(msg_id)
223 # is it safe to assume a msg_id will not be resubmitted?
223 # is it safe to assume a msg_id will not be resubmitted?
224 reply_type = msg_type.split('_')[0] + '_reply'
224 reply_type = msg_type.split('_')[0] + '_reply'
225 status = {'status' : 'aborted'}
225 status = {'status' : 'aborted'}
226 md = {'engine' : self.ident}
226 md = {'engine' : self.ident}
227 md.update(status)
227 md.update(status)
228 reply_msg = self.session.send(stream, reply_type, metadata=md,
228 reply_msg = self.session.send(stream, reply_type, metadata=md,
229 content=status, parent=msg, ident=idents)
229 content=status, parent=msg, ident=idents)
230 return
230 return
231
231
232 handler = self.shell_handlers.get(msg_type, None)
232 handler = self.shell_handlers.get(msg_type, None)
233 if handler is None:
233 if handler is None:
234 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
234 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
235 else:
235 else:
236 # ensure default_int_handler during handler call
236 # ensure default_int_handler during handler call
237 sig = signal(SIGINT, default_int_handler)
237 sig = signal(SIGINT, default_int_handler)
238 try:
238 try:
239 handler(stream, idents, msg)
239 handler(stream, idents, msg)
240 except Exception:
240 except Exception:
241 self.log.error("Exception in message handler:", exc_info=True)
241 self.log.error("Exception in message handler:", exc_info=True)
242 finally:
242 finally:
243 signal(SIGINT, sig)
243 signal(SIGINT, sig)
244
244
245 def enter_eventloop(self):
245 def enter_eventloop(self):
246 """enter eventloop"""
246 """enter eventloop"""
247 self.log.info("entering eventloop")
247 self.log.info("entering eventloop")
248 # restore default_int_handler
248 # restore default_int_handler
249 signal(SIGINT, default_int_handler)
249 signal(SIGINT, default_int_handler)
250 while self.eventloop is not None:
250 while self.eventloop is not None:
251 try:
251 try:
252 self.eventloop(self)
252 self.eventloop(self)
253 except KeyboardInterrupt:
253 except KeyboardInterrupt:
254 # Ctrl-C shouldn't crash the kernel
254 # Ctrl-C shouldn't crash the kernel
255 self.log.error("KeyboardInterrupt caught in kernel")
255 self.log.error("KeyboardInterrupt caught in kernel")
256 continue
256 continue
257 else:
257 else:
258 # eventloop exited cleanly, this means we should stop (right?)
258 # eventloop exited cleanly, this means we should stop (right?)
259 self.eventloop = None
259 self.eventloop = None
260 break
260 break
261 self.log.info("exiting eventloop")
261 self.log.info("exiting eventloop")
262
262
263 def start(self):
263 def start(self):
264 """register dispatchers for streams"""
264 """register dispatchers for streams"""
265 self.shell.exit_now = False
265 self.shell.exit_now = False
266 if self.control_stream:
266 if self.control_stream:
267 self.control_stream.on_recv(self.dispatch_control, copy=False)
267 self.control_stream.on_recv(self.dispatch_control, copy=False)
268
268
269 def make_dispatcher(stream):
269 def make_dispatcher(stream):
270 def dispatcher(msg):
270 def dispatcher(msg):
271 return self.dispatch_shell(stream, msg)
271 return self.dispatch_shell(stream, msg)
272 return dispatcher
272 return dispatcher
273
273
274 for s in self.shell_streams:
274 for s in self.shell_streams:
275 s.on_recv(make_dispatcher(s), copy=False)
275 s.on_recv(make_dispatcher(s), copy=False)
276
276
277 # publish idle status
277 # publish idle status
278 self._publish_status('starting')
278 self._publish_status('starting')
279
279
280 def do_one_iteration(self):
280 def do_one_iteration(self):
281 """step eventloop just once"""
281 """step eventloop just once"""
282 if self.control_stream:
282 if self.control_stream:
283 self.control_stream.flush()
283 self.control_stream.flush()
284 for stream in self.shell_streams:
284 for stream in self.shell_streams:
285 # handle at most one request per iteration
285 # handle at most one request per iteration
286 stream.flush(zmq.POLLIN, 1)
286 stream.flush(zmq.POLLIN, 1)
287 stream.flush(zmq.POLLOUT)
287 stream.flush(zmq.POLLOUT)
288
288
289
289
290 def record_ports(self, ports):
290 def record_ports(self, ports):
291 """Record the ports that this kernel is using.
291 """Record the ports that this kernel is using.
292
292
293 The creator of the Kernel instance must call this methods if they
293 The creator of the Kernel instance must call this methods if they
294 want the :meth:`connect_request` method to return the port numbers.
294 want the :meth:`connect_request` method to return the port numbers.
295 """
295 """
296 self._recorded_ports = ports
296 self._recorded_ports = ports
297
297
298 #---------------------------------------------------------------------------
298 #---------------------------------------------------------------------------
299 # Kernel request handlers
299 # Kernel request handlers
300 #---------------------------------------------------------------------------
300 #---------------------------------------------------------------------------
301
301
302 def _make_metadata(self, other=None):
302 def _make_metadata(self, other=None):
303 """init metadata dict, for execute/apply_reply"""
303 """init metadata dict, for execute/apply_reply"""
304 new_md = {
304 new_md = {
305 'dependencies_met' : True,
305 'dependencies_met' : True,
306 'engine' : self.ident,
306 'engine' : self.ident,
307 'started': datetime.now(),
307 'started': datetime.now(),
308 }
308 }
309 if other:
309 if other:
310 new_md.update(other)
310 new_md.update(other)
311 return new_md
311 return new_md
312
312
313 def _publish_pyin(self, code, parent, execution_count):
313 def _publish_pyin(self, code, parent, execution_count):
314 """Publish the code request on the pyin stream."""
314 """Publish the code request on the pyin stream."""
315
315
316 self.session.send(self.iopub_socket, u'pyin',
316 self.session.send(self.iopub_socket, u'pyin',
317 {u'code':code, u'execution_count': execution_count},
317 {u'code':code, u'execution_count': execution_count},
318 parent=parent, ident=self._topic('pyin')
318 parent=parent, ident=self._topic('pyin')
319 )
319 )
320
320
321 def _publish_status(self, status, parent=None):
321 def _publish_status(self, status, parent=None):
322 """send status (busy/idle) on IOPub"""
322 """send status (busy/idle) on IOPub"""
323 self.session.send(self.iopub_socket,
323 self.session.send(self.iopub_socket,
324 u'status',
324 u'status',
325 {u'execution_state': status},
325 {u'execution_state': status},
326 parent=parent,
326 parent=parent,
327 ident=self._topic('status'),
327 ident=self._topic('status'),
328 )
328 )
329
329
330
330
331 def execute_request(self, stream, ident, parent):
331 def execute_request(self, stream, ident, parent):
332 """handle an execute_request"""
332 """handle an execute_request"""
333
333
334 self._publish_status(u'busy', parent)
334 self._publish_status(u'busy', parent)
335
335
336 try:
336 try:
337 content = parent[u'content']
337 content = parent[u'content']
338 code = content[u'code']
338 code = content[u'code']
339 silent = content[u'silent']
339 silent = content[u'silent']
340 store_history = content.get(u'store_history', not silent)
340 store_history = content.get(u'store_history', not silent)
341 except:
341 except:
342 self.log.error("Got bad msg: ")
342 self.log.error("Got bad msg: ")
343 self.log.error("%s", parent)
343 self.log.error("%s", parent)
344 return
344 return
345
345
346 md = self._make_metadata(parent['metadata'])
346 md = self._make_metadata(parent['metadata'])
347
347
348 shell = self.shell # we'll need this a lot here
348 shell = self.shell # we'll need this a lot here
349
349
350 # Replace raw_input. Note that is not sufficient to replace
350 # Replace raw_input. Note that is not sufficient to replace
351 # raw_input in the user namespace.
351 # raw_input in the user namespace.
352 if content.get('allow_stdin', False):
352 if content.get('allow_stdin', False):
353 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
353 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
354 input = lambda prompt='': eval(raw_input(prompt))
354 input = lambda prompt='': eval(raw_input(prompt))
355 else:
355 else:
356 raw_input = input = lambda prompt='' : self._no_raw_input()
356 raw_input = input = lambda prompt='' : self._no_raw_input()
357
357
358 if py3compat.PY3:
358 if py3compat.PY3:
359 self._sys_raw_input = builtin_mod.input
359 self._sys_raw_input = builtin_mod.input
360 builtin_mod.input = raw_input
360 builtin_mod.input = raw_input
361 else:
361 else:
362 self._sys_raw_input = builtin_mod.raw_input
362 self._sys_raw_input = builtin_mod.raw_input
363 self._sys_eval_input = builtin_mod.input
363 self._sys_eval_input = builtin_mod.input
364 builtin_mod.raw_input = raw_input
364 builtin_mod.raw_input = raw_input
365 builtin_mod.input = input
365 builtin_mod.input = input
366
366
367 # Set the parent message of the display hook and out streams.
367 # Set the parent message of the display hook and out streams.
368 shell.set_parent(parent)
368 shell.set_parent(parent)
369
369
370 # Re-broadcast our input for the benefit of listening clients, and
370 # Re-broadcast our input for the benefit of listening clients, and
371 # start computing output
371 # start computing output
372 if not silent:
372 if not silent:
373 self._publish_pyin(code, parent, shell.execution_count)
373 self._publish_pyin(code, parent, shell.execution_count)
374
374
375 reply_content = {}
375 reply_content = {}
376 try:
376 try:
377 # FIXME: the shell calls the exception handler itself.
377 # FIXME: the shell calls the exception handler itself.
378 shell.run_cell(code, store_history=store_history, silent=silent)
378 shell.run_cell(code, store_history=store_history, silent=silent)
379 except:
379 except:
380 status = u'error'
380 status = u'error'
381 # FIXME: this code right now isn't being used yet by default,
381 # FIXME: this code right now isn't being used yet by default,
382 # because the run_cell() call above directly fires off exception
382 # because the run_cell() call above directly fires off exception
383 # reporting. This code, therefore, is only active in the scenario
383 # reporting. This code, therefore, is only active in the scenario
384 # where runlines itself has an unhandled exception. We need to
384 # where runlines itself has an unhandled exception. We need to
385 # uniformize this, for all exception construction to come from a
385 # uniformize this, for all exception construction to come from a
386 # single location in the codbase.
386 # single location in the codbase.
387 etype, evalue, tb = sys.exc_info()
387 etype, evalue, tb = sys.exc_info()
388 tb_list = traceback.format_exception(etype, evalue, tb)
388 tb_list = traceback.format_exception(etype, evalue, tb)
389 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
389 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
390 else:
390 else:
391 status = u'ok'
391 status = u'ok'
392 finally:
392 finally:
393 # Restore raw_input.
393 # Restore raw_input.
394 if py3compat.PY3:
394 if py3compat.PY3:
395 builtin_mod.input = self._sys_raw_input
395 builtin_mod.input = self._sys_raw_input
396 else:
396 else:
397 builtin_mod.raw_input = self._sys_raw_input
397 builtin_mod.raw_input = self._sys_raw_input
398 builtin_mod.input = self._sys_eval_input
398 builtin_mod.input = self._sys_eval_input
399
399
400 reply_content[u'status'] = status
400 reply_content[u'status'] = status
401
401
402 # Return the execution counter so clients can display prompts
402 # Return the execution counter so clients can display prompts
403 reply_content['execution_count'] = shell.execution_count - 1
403 reply_content['execution_count'] = shell.execution_count - 1
404
404
405 # FIXME - fish exception info out of shell, possibly left there by
405 # FIXME - fish exception info out of shell, possibly left there by
406 # runlines. We'll need to clean up this logic later.
406 # runlines. We'll need to clean up this logic later.
407 if shell._reply_content is not None:
407 if shell._reply_content is not None:
408 reply_content.update(shell._reply_content)
408 reply_content.update(shell._reply_content)
409 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
409 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
410 reply_content['engine_info'] = e_info
410 reply_content['engine_info'] = e_info
411 # reset after use
411 # reset after use
412 shell._reply_content = None
412 shell._reply_content = None
413
413
414 if 'traceback' in reply_content:
414 if 'traceback' in reply_content:
415 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
415 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
416
416
417
417
418 # At this point, we can tell whether the main code execution succeeded
418 # At this point, we can tell whether the main code execution succeeded
419 # or not. If it did, we proceed to evaluate user_variables/expressions
419 # or not. If it did, we proceed to evaluate user_variables/expressions
420 if reply_content['status'] == 'ok':
420 if reply_content['status'] == 'ok':
421 reply_content[u'user_variables'] = \
421 reply_content[u'user_variables'] = \
422 shell.user_variables(content.get(u'user_variables', []))
422 shell.user_variables(content.get(u'user_variables', []))
423 reply_content[u'user_expressions'] = \
423 reply_content[u'user_expressions'] = \
424 shell.user_expressions(content.get(u'user_expressions', {}))
424 shell.user_expressions(content.get(u'user_expressions', {}))
425 else:
425 else:
426 # If there was an error, don't even try to compute variables or
426 # If there was an error, don't even try to compute variables or
427 # expressions
427 # expressions
428 reply_content[u'user_variables'] = {}
428 reply_content[u'user_variables'] = {}
429 reply_content[u'user_expressions'] = {}
429 reply_content[u'user_expressions'] = {}
430
430
431 # Payloads should be retrieved regardless of outcome, so we can both
431 # Payloads should be retrieved regardless of outcome, so we can both
432 # recover partial output (that could have been generated early in a
432 # recover partial output (that could have been generated early in a
433 # block, before an error) and clear the payload system always.
433 # block, before an error) and clear the payload system always.
434 reply_content[u'payload'] = shell.payload_manager.read_payload()
434 reply_content[u'payload'] = shell.payload_manager.read_payload()
435 # Be agressive about clearing the payload because we don't want
435 # Be agressive about clearing the payload because we don't want
436 # it to sit in memory until the next execute_request comes in.
436 # it to sit in memory until the next execute_request comes in.
437 shell.payload_manager.clear_payload()
437 shell.payload_manager.clear_payload()
438
438
439 # Flush output before sending the reply.
439 # Flush output before sending the reply.
440 sys.stdout.flush()
440 sys.stdout.flush()
441 sys.stderr.flush()
441 sys.stderr.flush()
442 # FIXME: on rare occasions, the flush doesn't seem to make it to the
442 # FIXME: on rare occasions, the flush doesn't seem to make it to the
443 # clients... This seems to mitigate the problem, but we definitely need
443 # clients... This seems to mitigate the problem, but we definitely need
444 # to better understand what's going on.
444 # to better understand what's going on.
445 if self._execute_sleep:
445 if self._execute_sleep:
446 time.sleep(self._execute_sleep)
446 time.sleep(self._execute_sleep)
447
447
448 # Send the reply.
448 # Send the reply.
449 reply_content = json_clean(reply_content)
449 reply_content = json_clean(reply_content)
450
450
451 md['status'] = reply_content['status']
451 md['status'] = reply_content['status']
452 if reply_content['status'] == 'error' and \
452 if reply_content['status'] == 'error' and \
453 reply_content['ename'] == 'UnmetDependency':
453 reply_content['ename'] == 'UnmetDependency':
454 md['dependencies_met'] = False
454 md['dependencies_met'] = False
455
455
456 reply_msg = self.session.send(stream, u'execute_reply',
456 reply_msg = self.session.send(stream, u'execute_reply',
457 reply_content, parent, metadata=md,
457 reply_content, parent, metadata=md,
458 ident=ident)
458 ident=ident)
459
459
460 self.log.debug("%s", reply_msg)
460 self.log.debug("%s", reply_msg)
461
461
462 if not silent and reply_msg['content']['status'] == u'error':
462 if not silent and reply_msg['content']['status'] == u'error':
463 self._abort_queues()
463 self._abort_queues()
464
464
465 self._publish_status(u'idle', parent)
465 self._publish_status(u'idle', parent)
466
466
467 def complete_request(self, stream, ident, parent):
467 def complete_request(self, stream, ident, parent):
468 txt, matches = self._complete(parent)
468 txt, matches = self._complete(parent)
469 matches = {'matches' : matches,
469 matches = {'matches' : matches,
470 'matched_text' : txt,
470 'matched_text' : txt,
471 'status' : 'ok'}
471 'status' : 'ok'}
472 matches = json_clean(matches)
472 matches = json_clean(matches)
473 completion_msg = self.session.send(stream, 'complete_reply',
473 completion_msg = self.session.send(stream, 'complete_reply',
474 matches, parent, ident)
474 matches, parent, ident)
475 self.log.debug("%s", completion_msg)
475 self.log.debug("%s", completion_msg)
476
476
477 def object_info_request(self, stream, ident, parent):
477 def object_info_request(self, stream, ident, parent):
478 content = parent['content']
478 content = parent['content']
479 object_info = self.shell.object_inspect(content['oname'],
479 object_info = self.shell.object_inspect(content['oname'],
480 detail_level = content.get('detail_level', 0)
480 detail_level = content.get('detail_level', 0)
481 )
481 )
482 # Before we send this object over, we scrub it for JSON usage
482 # Before we send this object over, we scrub it for JSON usage
483 oinfo = json_clean(object_info)
483 oinfo = json_clean(object_info)
484 msg = self.session.send(stream, 'object_info_reply',
484 msg = self.session.send(stream, 'object_info_reply',
485 oinfo, parent, ident)
485 oinfo, parent, ident)
486 self.log.debug("%s", msg)
486 self.log.debug("%s", msg)
487
487
488 def history_request(self, stream, ident, parent):
488 def history_request(self, stream, ident, parent):
489 # We need to pull these out, as passing **kwargs doesn't work with
489 # We need to pull these out, as passing **kwargs doesn't work with
490 # unicode keys before Python 2.6.5.
490 # unicode keys before Python 2.6.5.
491 hist_access_type = parent['content']['hist_access_type']
491 hist_access_type = parent['content']['hist_access_type']
492 raw = parent['content']['raw']
492 raw = parent['content']['raw']
493 output = parent['content']['output']
493 output = parent['content']['output']
494 if hist_access_type == 'tail':
494 if hist_access_type == 'tail':
495 n = parent['content']['n']
495 n = parent['content']['n']
496 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
496 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
497 include_latest=True)
497 include_latest=True)
498
498
499 elif hist_access_type == 'range':
499 elif hist_access_type == 'range':
500 session = parent['content']['session']
500 session = parent['content']['session']
501 start = parent['content']['start']
501 start = parent['content']['start']
502 stop = parent['content']['stop']
502 stop = parent['content']['stop']
503 hist = self.shell.history_manager.get_range(session, start, stop,
503 hist = self.shell.history_manager.get_range(session, start, stop,
504 raw=raw, output=output)
504 raw=raw, output=output)
505
505
506 elif hist_access_type == 'search':
506 elif hist_access_type == 'search':
507 n = parent['content'].get('n')
507 n = parent['content'].get('n')
508 unique = parent['content'].get('unique', False)
508 unique = parent['content'].get('unique', False)
509 pattern = parent['content']['pattern']
509 pattern = parent['content']['pattern']
510 hist = self.shell.history_manager.search(
510 hist = self.shell.history_manager.search(
511 pattern, raw=raw, output=output, n=n, unique=unique)
511 pattern, raw=raw, output=output, n=n, unique=unique)
512
512
513 else:
513 else:
514 hist = []
514 hist = []
515 hist = list(hist)
515 hist = list(hist)
516 content = {'history' : hist}
516 content = {'history' : hist}
517 content = json_clean(content)
517 content = json_clean(content)
518 msg = self.session.send(stream, 'history_reply',
518 msg = self.session.send(stream, 'history_reply',
519 content, parent, ident)
519 content, parent, ident)
520 self.log.debug("Sending history reply with %i entries", len(hist))
520 self.log.debug("Sending history reply with %i entries", len(hist))
521
521
522 def connect_request(self, stream, ident, parent):
522 def connect_request(self, stream, ident, parent):
523 if self._recorded_ports is not None:
523 if self._recorded_ports is not None:
524 content = self._recorded_ports.copy()
524 content = self._recorded_ports.copy()
525 else:
525 else:
526 content = {}
526 content = {}
527 msg = self.session.send(stream, 'connect_reply',
527 msg = self.session.send(stream, 'connect_reply',
528 content, parent, ident)
528 content, parent, ident)
529 self.log.debug("%s", msg)
529 self.log.debug("%s", msg)
530
530
531 def kernel_info_request(self, stream, ident, parent):
531 def kernel_info_request(self, stream, ident, parent):
532 vinfo = {
532 vinfo = {
533 'protocol_version': protocol_version,
533 'protocol_version': protocol_version,
534 'ipython_version': ipython_version,
534 'ipython_version': ipython_version,
535 'language_version': language_version,
535 'language_version': language_version,
536 'language': 'python',
536 'language': 'python',
537 }
537 }
538 msg = self.session.send(stream, 'kernel_info_reply',
538 msg = self.session.send(stream, 'kernel_info_reply',
539 vinfo, parent, ident)
539 vinfo, parent, ident)
540 self.log.debug("%s", msg)
540 self.log.debug("%s", msg)
541
541
542 def shutdown_request(self, stream, ident, parent):
542 def shutdown_request(self, stream, ident, parent):
543 self.shell.exit_now = True
543 self.shell.exit_now = True
544 content = dict(status='ok')
544 content = dict(status='ok')
545 content.update(parent['content'])
545 content.update(parent['content'])
546 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
546 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
547 # same content, but different msg_id for broadcasting on IOPub
547 # same content, but different msg_id for broadcasting on IOPub
548 self._shutdown_message = self.session.msg(u'shutdown_reply',
548 self._shutdown_message = self.session.msg(u'shutdown_reply',
549 content, parent
549 content, parent
550 )
550 )
551
551
552 self._at_shutdown()
552 self._at_shutdown()
553 # call sys.exit after a short delay
553 # call sys.exit after a short delay
554 loop = ioloop.IOLoop.instance()
554 loop = ioloop.IOLoop.instance()
555 loop.add_timeout(time.time()+0.1, loop.stop)
555 loop.add_timeout(time.time()+0.1, loop.stop)
556
556
557 #---------------------------------------------------------------------------
557 #---------------------------------------------------------------------------
558 # Engine methods
558 # Engine methods
559 #---------------------------------------------------------------------------
559 #---------------------------------------------------------------------------
560
560
561 def apply_request(self, stream, ident, parent):
561 def apply_request(self, stream, ident, parent):
562 try:
562 try:
563 content = parent[u'content']
563 content = parent[u'content']
564 bufs = parent[u'buffers']
564 bufs = parent[u'buffers']
565 msg_id = parent['header']['msg_id']
565 msg_id = parent['header']['msg_id']
566 except:
566 except:
567 self.log.error("Got bad msg: %s", parent, exc_info=True)
567 self.log.error("Got bad msg: %s", parent, exc_info=True)
568 return
568 return
569
569
570 self._publish_status(u'busy', parent)
570 self._publish_status(u'busy', parent)
571
571
572 # Set the parent message of the display hook and out streams.
572 # Set the parent message of the display hook and out streams.
573 shell = self.shell
573 shell = self.shell
574 shell.set_parent(parent)
574 shell.set_parent(parent)
575
575
576 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
576 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
577 # self.iopub_socket.send(pyin_msg)
577 # self.iopub_socket.send(pyin_msg)
578 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
578 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
579 md = self._make_metadata(parent['metadata'])
579 md = self._make_metadata(parent['metadata'])
580 try:
580 try:
581 working = shell.user_ns
581 working = shell.user_ns
582
582
583 prefix = "_"+str(msg_id).replace("-","")+"_"
583 prefix = "_"+str(msg_id).replace("-","")+"_"
584
584
585 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
585 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
586
586
587 fname = getattr(f, '__name__', 'f')
587 fname = getattr(f, '__name__', 'f')
588
588
589 fname = prefix+"f"
589 fname = prefix+"f"
590 argname = prefix+"args"
590 argname = prefix+"args"
591 kwargname = prefix+"kwargs"
591 kwargname = prefix+"kwargs"
592 resultname = prefix+"result"
592 resultname = prefix+"result"
593
593
594 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
594 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
595 # print ns
595 # print ns
596 working.update(ns)
596 working.update(ns)
597 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
597 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
598 try:
598 try:
599 exec(code, shell.user_global_ns, shell.user_ns)
599 exec(code, shell.user_global_ns, shell.user_ns)
600 result = working.get(resultname)
600 result = working.get(resultname)
601 finally:
601 finally:
602 for key in ns.iterkeys():
602 for key in ns:
603 working.pop(key)
603 working.pop(key)
604
604
605 result_buf = serialize_object(result,
605 result_buf = serialize_object(result,
606 buffer_threshold=self.session.buffer_threshold,
606 buffer_threshold=self.session.buffer_threshold,
607 item_threshold=self.session.item_threshold,
607 item_threshold=self.session.item_threshold,
608 )
608 )
609
609
610 except:
610 except:
611 # invoke IPython traceback formatting
611 # invoke IPython traceback formatting
612 shell.showtraceback()
612 shell.showtraceback()
613 # FIXME - fish exception info out of shell, possibly left there by
613 # FIXME - fish exception info out of shell, possibly left there by
614 # run_code. We'll need to clean up this logic later.
614 # run_code. We'll need to clean up this logic later.
615 reply_content = {}
615 reply_content = {}
616 if shell._reply_content is not None:
616 if shell._reply_content is not None:
617 reply_content.update(shell._reply_content)
617 reply_content.update(shell._reply_content)
618 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
618 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
619 reply_content['engine_info'] = e_info
619 reply_content['engine_info'] = e_info
620 # reset after use
620 # reset after use
621 shell._reply_content = None
621 shell._reply_content = None
622
622
623 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
623 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
624 ident=self._topic('pyerr'))
624 ident=self._topic('pyerr'))
625 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
625 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
626 result_buf = []
626 result_buf = []
627
627
628 if reply_content['ename'] == 'UnmetDependency':
628 if reply_content['ename'] == 'UnmetDependency':
629 md['dependencies_met'] = False
629 md['dependencies_met'] = False
630 else:
630 else:
631 reply_content = {'status' : 'ok'}
631 reply_content = {'status' : 'ok'}
632
632
633 # put 'ok'/'error' status in header, for scheduler introspection:
633 # put 'ok'/'error' status in header, for scheduler introspection:
634 md['status'] = reply_content['status']
634 md['status'] = reply_content['status']
635
635
636 # flush i/o
636 # flush i/o
637 sys.stdout.flush()
637 sys.stdout.flush()
638 sys.stderr.flush()
638 sys.stderr.flush()
639
639
640 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
640 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
641 parent=parent, ident=ident,buffers=result_buf, metadata=md)
641 parent=parent, ident=ident,buffers=result_buf, metadata=md)
642
642
643 self._publish_status(u'idle', parent)
643 self._publish_status(u'idle', parent)
644
644
645 #---------------------------------------------------------------------------
645 #---------------------------------------------------------------------------
646 # Control messages
646 # Control messages
647 #---------------------------------------------------------------------------
647 #---------------------------------------------------------------------------
648
648
649 def abort_request(self, stream, ident, parent):
649 def abort_request(self, stream, ident, parent):
650 """abort a specifig msg by id"""
650 """abort a specifig msg by id"""
651 msg_ids = parent['content'].get('msg_ids', None)
651 msg_ids = parent['content'].get('msg_ids', None)
652 if isinstance(msg_ids, string_types):
652 if isinstance(msg_ids, string_types):
653 msg_ids = [msg_ids]
653 msg_ids = [msg_ids]
654 if not msg_ids:
654 if not msg_ids:
655 self.abort_queues()
655 self.abort_queues()
656 for mid in msg_ids:
656 for mid in msg_ids:
657 self.aborted.add(str(mid))
657 self.aborted.add(str(mid))
658
658
659 content = dict(status='ok')
659 content = dict(status='ok')
660 reply_msg = self.session.send(stream, 'abort_reply', content=content,
660 reply_msg = self.session.send(stream, 'abort_reply', content=content,
661 parent=parent, ident=ident)
661 parent=parent, ident=ident)
662 self.log.debug("%s", reply_msg)
662 self.log.debug("%s", reply_msg)
663
663
664 def clear_request(self, stream, idents, parent):
664 def clear_request(self, stream, idents, parent):
665 """Clear our namespace."""
665 """Clear our namespace."""
666 self.shell.reset(False)
666 self.shell.reset(False)
667 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
667 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
668 content = dict(status='ok'))
668 content = dict(status='ok'))
669
669
670
670
671 #---------------------------------------------------------------------------
671 #---------------------------------------------------------------------------
672 # Protected interface
672 # Protected interface
673 #---------------------------------------------------------------------------
673 #---------------------------------------------------------------------------
674
674
675 def _wrap_exception(self, method=None):
675 def _wrap_exception(self, method=None):
676 # import here, because _wrap_exception is only used in parallel,
676 # import here, because _wrap_exception is only used in parallel,
677 # and parallel has higher min pyzmq version
677 # and parallel has higher min pyzmq version
678 from IPython.parallel.error import wrap_exception
678 from IPython.parallel.error import wrap_exception
679 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
679 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
680 content = wrap_exception(e_info)
680 content = wrap_exception(e_info)
681 return content
681 return content
682
682
683 def _topic(self, topic):
683 def _topic(self, topic):
684 """prefixed topic for IOPub messages"""
684 """prefixed topic for IOPub messages"""
685 if self.int_id >= 0:
685 if self.int_id >= 0:
686 base = "engine.%i" % self.int_id
686 base = "engine.%i" % self.int_id
687 else:
687 else:
688 base = "kernel.%s" % self.ident
688 base = "kernel.%s" % self.ident
689
689
690 return py3compat.cast_bytes("%s.%s" % (base, topic))
690 return py3compat.cast_bytes("%s.%s" % (base, topic))
691
691
692 def _abort_queues(self):
692 def _abort_queues(self):
693 for stream in self.shell_streams:
693 for stream in self.shell_streams:
694 if stream:
694 if stream:
695 self._abort_queue(stream)
695 self._abort_queue(stream)
696
696
697 def _abort_queue(self, stream):
697 def _abort_queue(self, stream):
698 poller = zmq.Poller()
698 poller = zmq.Poller()
699 poller.register(stream.socket, zmq.POLLIN)
699 poller.register(stream.socket, zmq.POLLIN)
700 while True:
700 while True:
701 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
701 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
702 if msg is None:
702 if msg is None:
703 return
703 return
704
704
705 self.log.info("Aborting:")
705 self.log.info("Aborting:")
706 self.log.info("%s", msg)
706 self.log.info("%s", msg)
707 msg_type = msg['header']['msg_type']
707 msg_type = msg['header']['msg_type']
708 reply_type = msg_type.split('_')[0] + '_reply'
708 reply_type = msg_type.split('_')[0] + '_reply'
709
709
710 status = {'status' : 'aborted'}
710 status = {'status' : 'aborted'}
711 md = {'engine' : self.ident}
711 md = {'engine' : self.ident}
712 md.update(status)
712 md.update(status)
713 reply_msg = self.session.send(stream, reply_type, metadata=md,
713 reply_msg = self.session.send(stream, reply_type, metadata=md,
714 content=status, parent=msg, ident=idents)
714 content=status, parent=msg, ident=idents)
715 self.log.debug("%s", reply_msg)
715 self.log.debug("%s", reply_msg)
716 # We need to wait a bit for requests to come in. This can probably
716 # We need to wait a bit for requests to come in. This can probably
717 # be set shorter for true asynchronous clients.
717 # be set shorter for true asynchronous clients.
718 poller.poll(50)
718 poller.poll(50)
719
719
720
720
721 def _no_raw_input(self):
721 def _no_raw_input(self):
722 """Raise StdinNotImplentedError if active frontend doesn't support
722 """Raise StdinNotImplentedError if active frontend doesn't support
723 stdin."""
723 stdin."""
724 raise StdinNotImplementedError("raw_input was called, but this "
724 raise StdinNotImplementedError("raw_input was called, but this "
725 "frontend does not support stdin.")
725 "frontend does not support stdin.")
726
726
727 def _raw_input(self, prompt, ident, parent):
727 def _raw_input(self, prompt, ident, parent):
728 # Flush output before making the request.
728 # Flush output before making the request.
729 sys.stderr.flush()
729 sys.stderr.flush()
730 sys.stdout.flush()
730 sys.stdout.flush()
731 # flush the stdin socket, to purge stale replies
731 # flush the stdin socket, to purge stale replies
732 while True:
732 while True:
733 try:
733 try:
734 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
734 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
735 except zmq.ZMQError as e:
735 except zmq.ZMQError as e:
736 if e.errno == zmq.EAGAIN:
736 if e.errno == zmq.EAGAIN:
737 break
737 break
738 else:
738 else:
739 raise
739 raise
740
740
741 # Send the input request.
741 # Send the input request.
742 content = json_clean(dict(prompt=prompt))
742 content = json_clean(dict(prompt=prompt))
743 self.session.send(self.stdin_socket, u'input_request', content, parent,
743 self.session.send(self.stdin_socket, u'input_request', content, parent,
744 ident=ident)
744 ident=ident)
745
745
746 # Await a response.
746 # Await a response.
747 while True:
747 while True:
748 try:
748 try:
749 ident, reply = self.session.recv(self.stdin_socket, 0)
749 ident, reply = self.session.recv(self.stdin_socket, 0)
750 except Exception:
750 except Exception:
751 self.log.warn("Invalid Message:", exc_info=True)
751 self.log.warn("Invalid Message:", exc_info=True)
752 except KeyboardInterrupt:
752 except KeyboardInterrupt:
753 # re-raise KeyboardInterrupt, to truncate traceback
753 # re-raise KeyboardInterrupt, to truncate traceback
754 raise KeyboardInterrupt
754 raise KeyboardInterrupt
755 else:
755 else:
756 break
756 break
757 try:
757 try:
758 value = py3compat.unicode_to_str(reply['content']['value'])
758 value = py3compat.unicode_to_str(reply['content']['value'])
759 except:
759 except:
760 self.log.error("Got bad raw_input reply: ")
760 self.log.error("Got bad raw_input reply: ")
761 self.log.error("%s", parent)
761 self.log.error("%s", parent)
762 value = ''
762 value = ''
763 if value == '\x04':
763 if value == '\x04':
764 # EOF
764 # EOF
765 raise EOFError
765 raise EOFError
766 return value
766 return value
767
767
768 def _complete(self, msg):
768 def _complete(self, msg):
769 c = msg['content']
769 c = msg['content']
770 try:
770 try:
771 cpos = int(c['cursor_pos'])
771 cpos = int(c['cursor_pos'])
772 except:
772 except:
773 # If we don't get something that we can convert to an integer, at
773 # If we don't get something that we can convert to an integer, at
774 # least attempt the completion guessing the cursor is at the end of
774 # least attempt the completion guessing the cursor is at the end of
775 # the text, if there's any, and otherwise of the line
775 # the text, if there's any, and otherwise of the line
776 cpos = len(c['text'])
776 cpos = len(c['text'])
777 if cpos==0:
777 if cpos==0:
778 cpos = len(c['line'])
778 cpos = len(c['line'])
779 return self.shell.complete(c['text'], c['line'], cpos)
779 return self.shell.complete(c['text'], c['line'], cpos)
780
780
781 def _at_shutdown(self):
781 def _at_shutdown(self):
782 """Actions taken at shutdown by the kernel, called by python's atexit.
782 """Actions taken at shutdown by the kernel, called by python's atexit.
783 """
783 """
784 # io.rprint("Kernel at_shutdown") # dbg
784 # io.rprint("Kernel at_shutdown") # dbg
785 if self._shutdown_message is not None:
785 if self._shutdown_message is not None:
786 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
786 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
787 self.log.debug("%s", self._shutdown_message)
787 self.log.debug("%s", self._shutdown_message)
788 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
788 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
789
789
@@ -1,198 +1,198 b''
1 """serialization utilities for apply messages
1 """serialization utilities for apply messages
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 try:
18 try:
19 import cPickle
19 import cPickle
20 pickle = cPickle
20 pickle = cPickle
21 except:
21 except:
22 cPickle = None
22 cPickle = None
23 import pickle
23 import pickle
24
24
25
25
26 # IPython imports
26 # IPython imports
27 from IPython.utils import py3compat
27 from IPython.utils import py3compat
28 from IPython.utils.data import flatten
28 from IPython.utils.data import flatten
29 from IPython.utils.pickleutil import (
29 from IPython.utils.pickleutil import (
30 can, uncan, can_sequence, uncan_sequence, CannedObject,
30 can, uncan, can_sequence, uncan_sequence, CannedObject,
31 istype, sequence_types,
31 istype, sequence_types,
32 )
32 )
33
33
34 if py3compat.PY3:
34 if py3compat.PY3:
35 buffer = memoryview
35 buffer = memoryview
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Serialization Functions
38 # Serialization Functions
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41 # default values for the thresholds:
41 # default values for the thresholds:
42 MAX_ITEMS = 64
42 MAX_ITEMS = 64
43 MAX_BYTES = 1024
43 MAX_BYTES = 1024
44
44
45 def _extract_buffers(obj, threshold=MAX_BYTES):
45 def _extract_buffers(obj, threshold=MAX_BYTES):
46 """extract buffers larger than a certain threshold"""
46 """extract buffers larger than a certain threshold"""
47 buffers = []
47 buffers = []
48 if isinstance(obj, CannedObject) and obj.buffers:
48 if isinstance(obj, CannedObject) and obj.buffers:
49 for i,buf in enumerate(obj.buffers):
49 for i,buf in enumerate(obj.buffers):
50 if len(buf) > threshold:
50 if len(buf) > threshold:
51 # buffer larger than threshold, prevent pickling
51 # buffer larger than threshold, prevent pickling
52 obj.buffers[i] = None
52 obj.buffers[i] = None
53 buffers.append(buf)
53 buffers.append(buf)
54 elif isinstance(buf, buffer):
54 elif isinstance(buf, buffer):
55 # buffer too small for separate send, coerce to bytes
55 # buffer too small for separate send, coerce to bytes
56 # because pickling buffer objects just results in broken pointers
56 # because pickling buffer objects just results in broken pointers
57 obj.buffers[i] = bytes(buf)
57 obj.buffers[i] = bytes(buf)
58 return buffers
58 return buffers
59
59
60 def _restore_buffers(obj, buffers):
60 def _restore_buffers(obj, buffers):
61 """restore buffers extracted by """
61 """restore buffers extracted by """
62 if isinstance(obj, CannedObject) and obj.buffers:
62 if isinstance(obj, CannedObject) and obj.buffers:
63 for i,buf in enumerate(obj.buffers):
63 for i,buf in enumerate(obj.buffers):
64 if buf is None:
64 if buf is None:
65 obj.buffers[i] = buffers.pop(0)
65 obj.buffers[i] = buffers.pop(0)
66
66
67 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
67 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
68 """Serialize an object into a list of sendable buffers.
68 """Serialize an object into a list of sendable buffers.
69
69
70 Parameters
70 Parameters
71 ----------
71 ----------
72
72
73 obj : object
73 obj : object
74 The object to be serialized
74 The object to be serialized
75 buffer_threshold : int
75 buffer_threshold : int
76 The threshold (in bytes) for pulling out data buffers
76 The threshold (in bytes) for pulling out data buffers
77 to avoid pickling them.
77 to avoid pickling them.
78 item_threshold : int
78 item_threshold : int
79 The maximum number of items over which canning will iterate.
79 The maximum number of items over which canning will iterate.
80 Containers (lists, dicts) larger than this will be pickled without
80 Containers (lists, dicts) larger than this will be pickled without
81 introspection.
81 introspection.
82
82
83 Returns
83 Returns
84 -------
84 -------
85 [bufs] : list of buffers representing the serialized object.
85 [bufs] : list of buffers representing the serialized object.
86 """
86 """
87 buffers = []
87 buffers = []
88 if istype(obj, sequence_types) and len(obj) < item_threshold:
88 if istype(obj, sequence_types) and len(obj) < item_threshold:
89 cobj = can_sequence(obj)
89 cobj = can_sequence(obj)
90 for c in cobj:
90 for c in cobj:
91 buffers.extend(_extract_buffers(c, buffer_threshold))
91 buffers.extend(_extract_buffers(c, buffer_threshold))
92 elif istype(obj, dict) and len(obj) < item_threshold:
92 elif istype(obj, dict) and len(obj) < item_threshold:
93 cobj = {}
93 cobj = {}
94 for k in sorted(obj.iterkeys()):
94 for k in sorted(obj):
95 c = can(obj[k])
95 c = can(obj[k])
96 buffers.extend(_extract_buffers(c, buffer_threshold))
96 buffers.extend(_extract_buffers(c, buffer_threshold))
97 cobj[k] = c
97 cobj[k] = c
98 else:
98 else:
99 cobj = can(obj)
99 cobj = can(obj)
100 buffers.extend(_extract_buffers(cobj, buffer_threshold))
100 buffers.extend(_extract_buffers(cobj, buffer_threshold))
101
101
102 buffers.insert(0, pickle.dumps(cobj,-1))
102 buffers.insert(0, pickle.dumps(cobj,-1))
103 return buffers
103 return buffers
104
104
105 def unserialize_object(buffers, g=None):
105 def unserialize_object(buffers, g=None):
106 """reconstruct an object serialized by serialize_object from data buffers.
106 """reconstruct an object serialized by serialize_object from data buffers.
107
107
108 Parameters
108 Parameters
109 ----------
109 ----------
110
110
111 bufs : list of buffers/bytes
111 bufs : list of buffers/bytes
112
112
113 g : globals to be used when uncanning
113 g : globals to be used when uncanning
114
114
115 Returns
115 Returns
116 -------
116 -------
117
117
118 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
118 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
119 """
119 """
120 bufs = list(buffers)
120 bufs = list(buffers)
121 pobj = bufs.pop(0)
121 pobj = bufs.pop(0)
122 if not isinstance(pobj, bytes):
122 if not isinstance(pobj, bytes):
123 # a zmq message
123 # a zmq message
124 pobj = bytes(pobj)
124 pobj = bytes(pobj)
125 canned = pickle.loads(pobj)
125 canned = pickle.loads(pobj)
126 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
126 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
127 for c in canned:
127 for c in canned:
128 _restore_buffers(c, bufs)
128 _restore_buffers(c, bufs)
129 newobj = uncan_sequence(canned, g)
129 newobj = uncan_sequence(canned, g)
130 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
130 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
131 newobj = {}
131 newobj = {}
132 for k in sorted(canned.iterkeys()):
132 for k in sorted(canned):
133 c = canned[k]
133 c = canned[k]
134 _restore_buffers(c, bufs)
134 _restore_buffers(c, bufs)
135 newobj[k] = uncan(c, g)
135 newobj[k] = uncan(c, g)
136 else:
136 else:
137 _restore_buffers(canned, bufs)
137 _restore_buffers(canned, bufs)
138 newobj = uncan(canned, g)
138 newobj = uncan(canned, g)
139
139
140 return newobj, bufs
140 return newobj, bufs
141
141
142 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
142 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
143 """pack up a function, args, and kwargs to be sent over the wire
143 """pack up a function, args, and kwargs to be sent over the wire
144
144
145 Each element of args/kwargs will be canned for special treatment,
145 Each element of args/kwargs will be canned for special treatment,
146 but inspection will not go any deeper than that.
146 but inspection will not go any deeper than that.
147
147
148 Any object whose data is larger than `threshold` will not have their data copied
148 Any object whose data is larger than `threshold` will not have their data copied
149 (only numpy arrays and bytes/buffers support zero-copy)
149 (only numpy arrays and bytes/buffers support zero-copy)
150
150
151 Message will be a list of bytes/buffers of the format:
151 Message will be a list of bytes/buffers of the format:
152
152
153 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
153 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
154
154
155 With length at least two + len(args) + len(kwargs)
155 With length at least two + len(args) + len(kwargs)
156 """
156 """
157
157
158 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
158 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
159
159
160 kw_keys = sorted(kwargs.keys())
160 kw_keys = sorted(kwargs.keys())
161 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
161 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
162
162
163 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
163 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
164
164
165 msg = [pickle.dumps(can(f),-1)]
165 msg = [pickle.dumps(can(f),-1)]
166 msg.append(pickle.dumps(info, -1))
166 msg.append(pickle.dumps(info, -1))
167 msg.extend(arg_bufs)
167 msg.extend(arg_bufs)
168 msg.extend(kwarg_bufs)
168 msg.extend(kwarg_bufs)
169
169
170 return msg
170 return msg
171
171
172 def unpack_apply_message(bufs, g=None, copy=True):
172 def unpack_apply_message(bufs, g=None, copy=True):
173 """unpack f,args,kwargs from buffers packed by pack_apply_message()
173 """unpack f,args,kwargs from buffers packed by pack_apply_message()
174 Returns: original f,args,kwargs"""
174 Returns: original f,args,kwargs"""
175 bufs = list(bufs) # allow us to pop
175 bufs = list(bufs) # allow us to pop
176 assert len(bufs) >= 2, "not enough buffers!"
176 assert len(bufs) >= 2, "not enough buffers!"
177 if not copy:
177 if not copy:
178 for i in range(2):
178 for i in range(2):
179 bufs[i] = bufs[i].bytes
179 bufs[i] = bufs[i].bytes
180 f = uncan(pickle.loads(bufs.pop(0)), g)
180 f = uncan(pickle.loads(bufs.pop(0)), g)
181 info = pickle.loads(bufs.pop(0))
181 info = pickle.loads(bufs.pop(0))
182 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
182 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
183
183
184 args = []
184 args = []
185 for i in range(info['nargs']):
185 for i in range(info['nargs']):
186 arg, arg_bufs = unserialize_object(arg_bufs, g)
186 arg, arg_bufs = unserialize_object(arg_bufs, g)
187 args.append(arg)
187 args.append(arg)
188 args = tuple(args)
188 args = tuple(args)
189 assert not arg_bufs, "Shouldn't be any arg bufs left over"
189 assert not arg_bufs, "Shouldn't be any arg bufs left over"
190
190
191 kwargs = {}
191 kwargs = {}
192 for key in info['kw_keys']:
192 for key in info['kw_keys']:
193 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
193 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
194 kwargs[key] = kwarg
194 kwargs[key] = kwarg
195 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
195 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
196
196
197 return f,args,kwargs
197 return f,args,kwargs
198
198
@@ -1,1854 +1,1854 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 from __future__ import print_function
7 from __future__ import print_function
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2010-2011 The IPython Development Team
9 # Copyright (C) 2010-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import os
19 import os
20 import json
20 import json
21 import sys
21 import sys
22 from threading import Thread, Event
22 from threading import Thread, Event
23 import time
23 import time
24 import warnings
24 import warnings
25 from datetime import datetime
25 from datetime import datetime
26 from getpass import getpass
26 from getpass import getpass
27 from pprint import pprint
27 from pprint import pprint
28
28
29 pjoin = os.path.join
29 pjoin = os.path.join
30
30
31 import zmq
31 import zmq
32 # from zmq.eventloop import ioloop, zmqstream
32 # from zmq.eventloop import ioloop, zmqstream
33
33
34 from IPython.config.configurable import MultipleInstanceError
34 from IPython.config.configurable import MultipleInstanceError
35 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.profiledir import ProfileDir, ProfileDirError
36 from IPython.core.profiledir import ProfileDir, ProfileDirError
37
37
38 from IPython.utils.capture import RichOutput
38 from IPython.utils.capture import RichOutput
39 from IPython.utils.coloransi import TermColors
39 from IPython.utils.coloransi import TermColors
40 from IPython.utils.jsonutil import rekey
40 from IPython.utils.jsonutil import rekey
41 from IPython.utils.localinterfaces import localhost, is_local_ip
41 from IPython.utils.localinterfaces import localhost, is_local_ip
42 from IPython.utils.path import get_ipython_dir
42 from IPython.utils.path import get_ipython_dir
43 from IPython.utils.py3compat import cast_bytes, string_types, xrange
43 from IPython.utils.py3compat import cast_bytes, string_types, xrange
44 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
44 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
45 Dict, List, Bool, Set, Any)
45 Dict, List, Bool, Set, Any)
46 from IPython.external.decorator import decorator
46 from IPython.external.decorator import decorator
47 from IPython.external.ssh import tunnel
47 from IPython.external.ssh import tunnel
48
48
49 from IPython.parallel import Reference
49 from IPython.parallel import Reference
50 from IPython.parallel import error
50 from IPython.parallel import error
51 from IPython.parallel import util
51 from IPython.parallel import util
52
52
53 from IPython.kernel.zmq.session import Session, Message
53 from IPython.kernel.zmq.session import Session, Message
54 from IPython.kernel.zmq import serialize
54 from IPython.kernel.zmq import serialize
55
55
56 from .asyncresult import AsyncResult, AsyncHubResult
56 from .asyncresult import AsyncResult, AsyncHubResult
57 from .view import DirectView, LoadBalancedView
57 from .view import DirectView, LoadBalancedView
58
58
59 #--------------------------------------------------------------------------
59 #--------------------------------------------------------------------------
60 # Decorators for Client methods
60 # Decorators for Client methods
61 #--------------------------------------------------------------------------
61 #--------------------------------------------------------------------------
62
62
63 @decorator
63 @decorator
64 def spin_first(f, self, *args, **kwargs):
64 def spin_first(f, self, *args, **kwargs):
65 """Call spin() to sync state prior to calling the method."""
65 """Call spin() to sync state prior to calling the method."""
66 self.spin()
66 self.spin()
67 return f(self, *args, **kwargs)
67 return f(self, *args, **kwargs)
68
68
69
69
70 #--------------------------------------------------------------------------
70 #--------------------------------------------------------------------------
71 # Classes
71 # Classes
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73
73
74
74
75 class ExecuteReply(RichOutput):
75 class ExecuteReply(RichOutput):
76 """wrapper for finished Execute results"""
76 """wrapper for finished Execute results"""
77 def __init__(self, msg_id, content, metadata):
77 def __init__(self, msg_id, content, metadata):
78 self.msg_id = msg_id
78 self.msg_id = msg_id
79 self._content = content
79 self._content = content
80 self.execution_count = content['execution_count']
80 self.execution_count = content['execution_count']
81 self.metadata = metadata
81 self.metadata = metadata
82
82
83 # RichOutput overrides
83 # RichOutput overrides
84
84
85 @property
85 @property
86 def source(self):
86 def source(self):
87 pyout = self.metadata['pyout']
87 pyout = self.metadata['pyout']
88 if pyout:
88 if pyout:
89 return pyout.get('source', '')
89 return pyout.get('source', '')
90
90
91 @property
91 @property
92 def data(self):
92 def data(self):
93 pyout = self.metadata['pyout']
93 pyout = self.metadata['pyout']
94 if pyout:
94 if pyout:
95 return pyout.get('data', {})
95 return pyout.get('data', {})
96
96
97 @property
97 @property
98 def _metadata(self):
98 def _metadata(self):
99 pyout = self.metadata['pyout']
99 pyout = self.metadata['pyout']
100 if pyout:
100 if pyout:
101 return pyout.get('metadata', {})
101 return pyout.get('metadata', {})
102
102
103 def display(self):
103 def display(self):
104 from IPython.display import publish_display_data
104 from IPython.display import publish_display_data
105 publish_display_data(self.source, self.data, self.metadata)
105 publish_display_data(self.source, self.data, self.metadata)
106
106
107 def _repr_mime_(self, mime):
107 def _repr_mime_(self, mime):
108 if mime not in self.data:
108 if mime not in self.data:
109 return
109 return
110 data = self.data[mime]
110 data = self.data[mime]
111 if mime in self._metadata:
111 if mime in self._metadata:
112 return data, self._metadata[mime]
112 return data, self._metadata[mime]
113 else:
113 else:
114 return data
114 return data
115
115
116 def __getitem__(self, key):
116 def __getitem__(self, key):
117 return self.metadata[key]
117 return self.metadata[key]
118
118
119 def __getattr__(self, key):
119 def __getattr__(self, key):
120 if key not in self.metadata:
120 if key not in self.metadata:
121 raise AttributeError(key)
121 raise AttributeError(key)
122 return self.metadata[key]
122 return self.metadata[key]
123
123
124 def __repr__(self):
124 def __repr__(self):
125 pyout = self.metadata['pyout'] or {'data':{}}
125 pyout = self.metadata['pyout'] or {'data':{}}
126 text_out = pyout['data'].get('text/plain', '')
126 text_out = pyout['data'].get('text/plain', '')
127 if len(text_out) > 32:
127 if len(text_out) > 32:
128 text_out = text_out[:29] + '...'
128 text_out = text_out[:29] + '...'
129
129
130 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
130 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
131
131
132 def _repr_pretty_(self, p, cycle):
132 def _repr_pretty_(self, p, cycle):
133 pyout = self.metadata['pyout'] or {'data':{}}
133 pyout = self.metadata['pyout'] or {'data':{}}
134 text_out = pyout['data'].get('text/plain', '')
134 text_out = pyout['data'].get('text/plain', '')
135
135
136 if not text_out:
136 if not text_out:
137 return
137 return
138
138
139 try:
139 try:
140 ip = get_ipython()
140 ip = get_ipython()
141 except NameError:
141 except NameError:
142 colors = "NoColor"
142 colors = "NoColor"
143 else:
143 else:
144 colors = ip.colors
144 colors = ip.colors
145
145
146 if colors == "NoColor":
146 if colors == "NoColor":
147 out = normal = ""
147 out = normal = ""
148 else:
148 else:
149 out = TermColors.Red
149 out = TermColors.Red
150 normal = TermColors.Normal
150 normal = TermColors.Normal
151
151
152 if '\n' in text_out and not text_out.startswith('\n'):
152 if '\n' in text_out and not text_out.startswith('\n'):
153 # add newline for multiline reprs
153 # add newline for multiline reprs
154 text_out = '\n' + text_out
154 text_out = '\n' + text_out
155
155
156 p.text(
156 p.text(
157 out + u'Out[%i:%i]: ' % (
157 out + u'Out[%i:%i]: ' % (
158 self.metadata['engine_id'], self.execution_count
158 self.metadata['engine_id'], self.execution_count
159 ) + normal + text_out
159 ) + normal + text_out
160 )
160 )
161
161
162
162
163 class Metadata(dict):
163 class Metadata(dict):
164 """Subclass of dict for initializing metadata values.
164 """Subclass of dict for initializing metadata values.
165
165
166 Attribute access works on keys.
166 Attribute access works on keys.
167
167
168 These objects have a strict set of keys - errors will raise if you try
168 These objects have a strict set of keys - errors will raise if you try
169 to add new keys.
169 to add new keys.
170 """
170 """
171 def __init__(self, *args, **kwargs):
171 def __init__(self, *args, **kwargs):
172 dict.__init__(self)
172 dict.__init__(self)
173 md = {'msg_id' : None,
173 md = {'msg_id' : None,
174 'submitted' : None,
174 'submitted' : None,
175 'started' : None,
175 'started' : None,
176 'completed' : None,
176 'completed' : None,
177 'received' : None,
177 'received' : None,
178 'engine_uuid' : None,
178 'engine_uuid' : None,
179 'engine_id' : None,
179 'engine_id' : None,
180 'follow' : None,
180 'follow' : None,
181 'after' : None,
181 'after' : None,
182 'status' : None,
182 'status' : None,
183
183
184 'pyin' : None,
184 'pyin' : None,
185 'pyout' : None,
185 'pyout' : None,
186 'pyerr' : None,
186 'pyerr' : None,
187 'stdout' : '',
187 'stdout' : '',
188 'stderr' : '',
188 'stderr' : '',
189 'outputs' : [],
189 'outputs' : [],
190 'data': {},
190 'data': {},
191 'outputs_ready' : False,
191 'outputs_ready' : False,
192 }
192 }
193 self.update(md)
193 self.update(md)
194 self.update(dict(*args, **kwargs))
194 self.update(dict(*args, **kwargs))
195
195
196 def __getattr__(self, key):
196 def __getattr__(self, key):
197 """getattr aliased to getitem"""
197 """getattr aliased to getitem"""
198 if key in self.iterkeys():
198 if key in self:
199 return self[key]
199 return self[key]
200 else:
200 else:
201 raise AttributeError(key)
201 raise AttributeError(key)
202
202
203 def __setattr__(self, key, value):
203 def __setattr__(self, key, value):
204 """setattr aliased to setitem, with strict"""
204 """setattr aliased to setitem, with strict"""
205 if key in self.iterkeys():
205 if key in self:
206 self[key] = value
206 self[key] = value
207 else:
207 else:
208 raise AttributeError(key)
208 raise AttributeError(key)
209
209
210 def __setitem__(self, key, value):
210 def __setitem__(self, key, value):
211 """strict static key enforcement"""
211 """strict static key enforcement"""
212 if key in self.iterkeys():
212 if key in self:
213 dict.__setitem__(self, key, value)
213 dict.__setitem__(self, key, value)
214 else:
214 else:
215 raise KeyError(key)
215 raise KeyError(key)
216
216
217
217
218 class Client(HasTraits):
218 class Client(HasTraits):
219 """A semi-synchronous client to the IPython ZMQ cluster
219 """A semi-synchronous client to the IPython ZMQ cluster
220
220
221 Parameters
221 Parameters
222 ----------
222 ----------
223
223
224 url_file : str/unicode; path to ipcontroller-client.json
224 url_file : str/unicode; path to ipcontroller-client.json
225 This JSON file should contain all the information needed to connect to a cluster,
225 This JSON file should contain all the information needed to connect to a cluster,
226 and is likely the only argument needed.
226 and is likely the only argument needed.
227 Connection information for the Hub's registration. If a json connector
227 Connection information for the Hub's registration. If a json connector
228 file is given, then likely no further configuration is necessary.
228 file is given, then likely no further configuration is necessary.
229 [Default: use profile]
229 [Default: use profile]
230 profile : bytes
230 profile : bytes
231 The name of the Cluster profile to be used to find connector information.
231 The name of the Cluster profile to be used to find connector information.
232 If run from an IPython application, the default profile will be the same
232 If run from an IPython application, the default profile will be the same
233 as the running application, otherwise it will be 'default'.
233 as the running application, otherwise it will be 'default'.
234 cluster_id : str
234 cluster_id : str
235 String id to added to runtime files, to prevent name collisions when using
235 String id to added to runtime files, to prevent name collisions when using
236 multiple clusters with a single profile simultaneously.
236 multiple clusters with a single profile simultaneously.
237 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
237 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
238 Since this is text inserted into filenames, typical recommendations apply:
238 Since this is text inserted into filenames, typical recommendations apply:
239 Simple character strings are ideal, and spaces are not recommended (but
239 Simple character strings are ideal, and spaces are not recommended (but
240 should generally work)
240 should generally work)
241 context : zmq.Context
241 context : zmq.Context
242 Pass an existing zmq.Context instance, otherwise the client will create its own.
242 Pass an existing zmq.Context instance, otherwise the client will create its own.
243 debug : bool
243 debug : bool
244 flag for lots of message printing for debug purposes
244 flag for lots of message printing for debug purposes
245 timeout : int/float
245 timeout : int/float
246 time (in seconds) to wait for connection replies from the Hub
246 time (in seconds) to wait for connection replies from the Hub
247 [Default: 10]
247 [Default: 10]
248
248
249 #-------------- session related args ----------------
249 #-------------- session related args ----------------
250
250
251 config : Config object
251 config : Config object
252 If specified, this will be relayed to the Session for configuration
252 If specified, this will be relayed to the Session for configuration
253 username : str
253 username : str
254 set username for the session object
254 set username for the session object
255
255
256 #-------------- ssh related args ----------------
256 #-------------- ssh related args ----------------
257 # These are args for configuring the ssh tunnel to be used
257 # These are args for configuring the ssh tunnel to be used
258 # credentials are used to forward connections over ssh to the Controller
258 # credentials are used to forward connections over ssh to the Controller
259 # Note that the ip given in `addr` needs to be relative to sshserver
259 # Note that the ip given in `addr` needs to be relative to sshserver
260 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
260 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
261 # and set sshserver as the same machine the Controller is on. However,
261 # and set sshserver as the same machine the Controller is on. However,
262 # the only requirement is that sshserver is able to see the Controller
262 # the only requirement is that sshserver is able to see the Controller
263 # (i.e. is within the same trusted network).
263 # (i.e. is within the same trusted network).
264
264
265 sshserver : str
265 sshserver : str
266 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
266 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
267 If keyfile or password is specified, and this is not, it will default to
267 If keyfile or password is specified, and this is not, it will default to
268 the ip given in addr.
268 the ip given in addr.
269 sshkey : str; path to ssh private key file
269 sshkey : str; path to ssh private key file
270 This specifies a key to be used in ssh login, default None.
270 This specifies a key to be used in ssh login, default None.
271 Regular default ssh keys will be used without specifying this argument.
271 Regular default ssh keys will be used without specifying this argument.
272 password : str
272 password : str
273 Your ssh password to sshserver. Note that if this is left None,
273 Your ssh password to sshserver. Note that if this is left None,
274 you will be prompted for it if passwordless key based login is unavailable.
274 you will be prompted for it if passwordless key based login is unavailable.
275 paramiko : bool
275 paramiko : bool
276 flag for whether to use paramiko instead of shell ssh for tunneling.
276 flag for whether to use paramiko instead of shell ssh for tunneling.
277 [default: True on win32, False else]
277 [default: True on win32, False else]
278
278
279
279
280 Attributes
280 Attributes
281 ----------
281 ----------
282
282
283 ids : list of int engine IDs
283 ids : list of int engine IDs
284 requesting the ids attribute always synchronizes
284 requesting the ids attribute always synchronizes
285 the registration state. To request ids without synchronization,
285 the registration state. To request ids without synchronization,
286 use semi-private _ids attributes.
286 use semi-private _ids attributes.
287
287
288 history : list of msg_ids
288 history : list of msg_ids
289 a list of msg_ids, keeping track of all the execution
289 a list of msg_ids, keeping track of all the execution
290 messages you have submitted in order.
290 messages you have submitted in order.
291
291
292 outstanding : set of msg_ids
292 outstanding : set of msg_ids
293 a set of msg_ids that have been submitted, but whose
293 a set of msg_ids that have been submitted, but whose
294 results have not yet been received.
294 results have not yet been received.
295
295
296 results : dict
296 results : dict
297 a dict of all our results, keyed by msg_id
297 a dict of all our results, keyed by msg_id
298
298
299 block : bool
299 block : bool
300 determines default behavior when block not specified
300 determines default behavior when block not specified
301 in execution methods
301 in execution methods
302
302
303 Methods
303 Methods
304 -------
304 -------
305
305
306 spin
306 spin
307 flushes incoming results and registration state changes
307 flushes incoming results and registration state changes
308 control methods spin, and requesting `ids` also ensures up to date
308 control methods spin, and requesting `ids` also ensures up to date
309
309
310 wait
310 wait
311 wait on one or more msg_ids
311 wait on one or more msg_ids
312
312
313 execution methods
313 execution methods
314 apply
314 apply
315 legacy: execute, run
315 legacy: execute, run
316
316
317 data movement
317 data movement
318 push, pull, scatter, gather
318 push, pull, scatter, gather
319
319
320 query methods
320 query methods
321 queue_status, get_result, purge, result_status
321 queue_status, get_result, purge, result_status
322
322
323 control methods
323 control methods
324 abort, shutdown
324 abort, shutdown
325
325
326 """
326 """
327
327
328
328
329 block = Bool(False)
329 block = Bool(False)
330 outstanding = Set()
330 outstanding = Set()
331 results = Instance('collections.defaultdict', (dict,))
331 results = Instance('collections.defaultdict', (dict,))
332 metadata = Instance('collections.defaultdict', (Metadata,))
332 metadata = Instance('collections.defaultdict', (Metadata,))
333 history = List()
333 history = List()
334 debug = Bool(False)
334 debug = Bool(False)
335 _spin_thread = Any()
335 _spin_thread = Any()
336 _stop_spinning = Any()
336 _stop_spinning = Any()
337
337
338 profile=Unicode()
338 profile=Unicode()
339 def _profile_default(self):
339 def _profile_default(self):
340 if BaseIPythonApplication.initialized():
340 if BaseIPythonApplication.initialized():
341 # an IPython app *might* be running, try to get its profile
341 # an IPython app *might* be running, try to get its profile
342 try:
342 try:
343 return BaseIPythonApplication.instance().profile
343 return BaseIPythonApplication.instance().profile
344 except (AttributeError, MultipleInstanceError):
344 except (AttributeError, MultipleInstanceError):
345 # could be a *different* subclass of config.Application,
345 # could be a *different* subclass of config.Application,
346 # which would raise one of these two errors.
346 # which would raise one of these two errors.
347 return u'default'
347 return u'default'
348 else:
348 else:
349 return u'default'
349 return u'default'
350
350
351
351
352 _outstanding_dict = Instance('collections.defaultdict', (set,))
352 _outstanding_dict = Instance('collections.defaultdict', (set,))
353 _ids = List()
353 _ids = List()
354 _connected=Bool(False)
354 _connected=Bool(False)
355 _ssh=Bool(False)
355 _ssh=Bool(False)
356 _context = Instance('zmq.Context')
356 _context = Instance('zmq.Context')
357 _config = Dict()
357 _config = Dict()
358 _engines=Instance(util.ReverseDict, (), {})
358 _engines=Instance(util.ReverseDict, (), {})
359 # _hub_socket=Instance('zmq.Socket')
359 # _hub_socket=Instance('zmq.Socket')
360 _query_socket=Instance('zmq.Socket')
360 _query_socket=Instance('zmq.Socket')
361 _control_socket=Instance('zmq.Socket')
361 _control_socket=Instance('zmq.Socket')
362 _iopub_socket=Instance('zmq.Socket')
362 _iopub_socket=Instance('zmq.Socket')
363 _notification_socket=Instance('zmq.Socket')
363 _notification_socket=Instance('zmq.Socket')
364 _mux_socket=Instance('zmq.Socket')
364 _mux_socket=Instance('zmq.Socket')
365 _task_socket=Instance('zmq.Socket')
365 _task_socket=Instance('zmq.Socket')
366 _task_scheme=Unicode()
366 _task_scheme=Unicode()
367 _closed = False
367 _closed = False
368 _ignored_control_replies=Integer(0)
368 _ignored_control_replies=Integer(0)
369 _ignored_hub_replies=Integer(0)
369 _ignored_hub_replies=Integer(0)
370
370
371 def __new__(self, *args, **kw):
371 def __new__(self, *args, **kw):
372 # don't raise on positional args
372 # don't raise on positional args
373 return HasTraits.__new__(self, **kw)
373 return HasTraits.__new__(self, **kw)
374
374
375 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
375 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
376 context=None, debug=False,
376 context=None, debug=False,
377 sshserver=None, sshkey=None, password=None, paramiko=None,
377 sshserver=None, sshkey=None, password=None, paramiko=None,
378 timeout=10, cluster_id=None, **extra_args
378 timeout=10, cluster_id=None, **extra_args
379 ):
379 ):
380 if profile:
380 if profile:
381 super(Client, self).__init__(debug=debug, profile=profile)
381 super(Client, self).__init__(debug=debug, profile=profile)
382 else:
382 else:
383 super(Client, self).__init__(debug=debug)
383 super(Client, self).__init__(debug=debug)
384 if context is None:
384 if context is None:
385 context = zmq.Context.instance()
385 context = zmq.Context.instance()
386 self._context = context
386 self._context = context
387 self._stop_spinning = Event()
387 self._stop_spinning = Event()
388
388
389 if 'url_or_file' in extra_args:
389 if 'url_or_file' in extra_args:
390 url_file = extra_args['url_or_file']
390 url_file = extra_args['url_or_file']
391 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
391 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
392
392
393 if url_file and util.is_url(url_file):
393 if url_file and util.is_url(url_file):
394 raise ValueError("single urls cannot be specified, url-files must be used.")
394 raise ValueError("single urls cannot be specified, url-files must be used.")
395
395
396 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
396 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
397
397
398 if self._cd is not None:
398 if self._cd is not None:
399 if url_file is None:
399 if url_file is None:
400 if not cluster_id:
400 if not cluster_id:
401 client_json = 'ipcontroller-client.json'
401 client_json = 'ipcontroller-client.json'
402 else:
402 else:
403 client_json = 'ipcontroller-%s-client.json' % cluster_id
403 client_json = 'ipcontroller-%s-client.json' % cluster_id
404 url_file = pjoin(self._cd.security_dir, client_json)
404 url_file = pjoin(self._cd.security_dir, client_json)
405 if url_file is None:
405 if url_file is None:
406 raise ValueError(
406 raise ValueError(
407 "I can't find enough information to connect to a hub!"
407 "I can't find enough information to connect to a hub!"
408 " Please specify at least one of url_file or profile."
408 " Please specify at least one of url_file or profile."
409 )
409 )
410
410
411 with open(url_file) as f:
411 with open(url_file) as f:
412 cfg = json.load(f)
412 cfg = json.load(f)
413
413
414 self._task_scheme = cfg['task_scheme']
414 self._task_scheme = cfg['task_scheme']
415
415
416 # sync defaults from args, json:
416 # sync defaults from args, json:
417 if sshserver:
417 if sshserver:
418 cfg['ssh'] = sshserver
418 cfg['ssh'] = sshserver
419
419
420 location = cfg.setdefault('location', None)
420 location = cfg.setdefault('location', None)
421
421
422 proto,addr = cfg['interface'].split('://')
422 proto,addr = cfg['interface'].split('://')
423 addr = util.disambiguate_ip_address(addr, location)
423 addr = util.disambiguate_ip_address(addr, location)
424 cfg['interface'] = "%s://%s" % (proto, addr)
424 cfg['interface'] = "%s://%s" % (proto, addr)
425
425
426 # turn interface,port into full urls:
426 # turn interface,port into full urls:
427 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
427 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
428 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
428 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
429
429
430 url = cfg['registration']
430 url = cfg['registration']
431
431
432 if location is not None and addr == localhost():
432 if location is not None and addr == localhost():
433 # location specified, and connection is expected to be local
433 # location specified, and connection is expected to be local
434 if not is_local_ip(location) and not sshserver:
434 if not is_local_ip(location) and not sshserver:
435 # load ssh from JSON *only* if the controller is not on
435 # load ssh from JSON *only* if the controller is not on
436 # this machine
436 # this machine
437 sshserver=cfg['ssh']
437 sshserver=cfg['ssh']
438 if not is_local_ip(location) and not sshserver:
438 if not is_local_ip(location) and not sshserver:
439 # warn if no ssh specified, but SSH is probably needed
439 # warn if no ssh specified, but SSH is probably needed
440 # This is only a warning, because the most likely cause
440 # This is only a warning, because the most likely cause
441 # is a local Controller on a laptop whose IP is dynamic
441 # is a local Controller on a laptop whose IP is dynamic
442 warnings.warn("""
442 warnings.warn("""
443 Controller appears to be listening on localhost, but not on this machine.
443 Controller appears to be listening on localhost, but not on this machine.
444 If this is true, you should specify Client(...,sshserver='you@%s')
444 If this is true, you should specify Client(...,sshserver='you@%s')
445 or instruct your controller to listen on an external IP."""%location,
445 or instruct your controller to listen on an external IP."""%location,
446 RuntimeWarning)
446 RuntimeWarning)
447 elif not sshserver:
447 elif not sshserver:
448 # otherwise sync with cfg
448 # otherwise sync with cfg
449 sshserver = cfg['ssh']
449 sshserver = cfg['ssh']
450
450
451 self._config = cfg
451 self._config = cfg
452
452
453 self._ssh = bool(sshserver or sshkey or password)
453 self._ssh = bool(sshserver or sshkey or password)
454 if self._ssh and sshserver is None:
454 if self._ssh and sshserver is None:
455 # default to ssh via localhost
455 # default to ssh via localhost
456 sshserver = addr
456 sshserver = addr
457 if self._ssh and password is None:
457 if self._ssh and password is None:
458 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
458 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
459 password=False
459 password=False
460 else:
460 else:
461 password = getpass("SSH Password for %s: "%sshserver)
461 password = getpass("SSH Password for %s: "%sshserver)
462 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
462 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
463
463
464 # configure and construct the session
464 # configure and construct the session
465 try:
465 try:
466 extra_args['packer'] = cfg['pack']
466 extra_args['packer'] = cfg['pack']
467 extra_args['unpacker'] = cfg['unpack']
467 extra_args['unpacker'] = cfg['unpack']
468 extra_args['key'] = cast_bytes(cfg['key'])
468 extra_args['key'] = cast_bytes(cfg['key'])
469 extra_args['signature_scheme'] = cfg['signature_scheme']
469 extra_args['signature_scheme'] = cfg['signature_scheme']
470 except KeyError as exc:
470 except KeyError as exc:
471 msg = '\n'.join([
471 msg = '\n'.join([
472 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
472 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
473 "If you are reusing connection files, remove them and start ipcontroller again."
473 "If you are reusing connection files, remove them and start ipcontroller again."
474 ])
474 ])
475 raise ValueError(msg.format(exc.message))
475 raise ValueError(msg.format(exc.message))
476
476
477 self.session = Session(**extra_args)
477 self.session = Session(**extra_args)
478
478
479 self._query_socket = self._context.socket(zmq.DEALER)
479 self._query_socket = self._context.socket(zmq.DEALER)
480
480
481 if self._ssh:
481 if self._ssh:
482 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
482 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
483 else:
483 else:
484 self._query_socket.connect(cfg['registration'])
484 self._query_socket.connect(cfg['registration'])
485
485
486 self.session.debug = self.debug
486 self.session.debug = self.debug
487
487
488 self._notification_handlers = {'registration_notification' : self._register_engine,
488 self._notification_handlers = {'registration_notification' : self._register_engine,
489 'unregistration_notification' : self._unregister_engine,
489 'unregistration_notification' : self._unregister_engine,
490 'shutdown_notification' : lambda msg: self.close(),
490 'shutdown_notification' : lambda msg: self.close(),
491 }
491 }
492 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
492 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
493 'apply_reply' : self._handle_apply_reply}
493 'apply_reply' : self._handle_apply_reply}
494
494
495 try:
495 try:
496 self._connect(sshserver, ssh_kwargs, timeout)
496 self._connect(sshserver, ssh_kwargs, timeout)
497 except:
497 except:
498 self.close(linger=0)
498 self.close(linger=0)
499 raise
499 raise
500
500
501 # last step: setup magics, if we are in IPython:
501 # last step: setup magics, if we are in IPython:
502
502
503 try:
503 try:
504 ip = get_ipython()
504 ip = get_ipython()
505 except NameError:
505 except NameError:
506 return
506 return
507 else:
507 else:
508 if 'px' not in ip.magics_manager.magics:
508 if 'px' not in ip.magics_manager.magics:
509 # in IPython but we are the first Client.
509 # in IPython but we are the first Client.
510 # activate a default view for parallel magics.
510 # activate a default view for parallel magics.
511 self.activate()
511 self.activate()
512
512
513 def __del__(self):
513 def __del__(self):
514 """cleanup sockets, but _not_ context."""
514 """cleanup sockets, but _not_ context."""
515 self.close()
515 self.close()
516
516
517 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
517 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
518 if ipython_dir is None:
518 if ipython_dir is None:
519 ipython_dir = get_ipython_dir()
519 ipython_dir = get_ipython_dir()
520 if profile_dir is not None:
520 if profile_dir is not None:
521 try:
521 try:
522 self._cd = ProfileDir.find_profile_dir(profile_dir)
522 self._cd = ProfileDir.find_profile_dir(profile_dir)
523 return
523 return
524 except ProfileDirError:
524 except ProfileDirError:
525 pass
525 pass
526 elif profile is not None:
526 elif profile is not None:
527 try:
527 try:
528 self._cd = ProfileDir.find_profile_dir_by_name(
528 self._cd = ProfileDir.find_profile_dir_by_name(
529 ipython_dir, profile)
529 ipython_dir, profile)
530 return
530 return
531 except ProfileDirError:
531 except ProfileDirError:
532 pass
532 pass
533 self._cd = None
533 self._cd = None
534
534
535 def _update_engines(self, engines):
535 def _update_engines(self, engines):
536 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
536 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
537 for k,v in engines.iteritems():
537 for k,v in engines.iteritems():
538 eid = int(k)
538 eid = int(k)
539 if eid not in self._engines:
539 if eid not in self._engines:
540 self._ids.append(eid)
540 self._ids.append(eid)
541 self._engines[eid] = v
541 self._engines[eid] = v
542 self._ids = sorted(self._ids)
542 self._ids = sorted(self._ids)
543 if sorted(self._engines.keys()) != range(len(self._engines)) and \
543 if sorted(self._engines.keys()) != range(len(self._engines)) and \
544 self._task_scheme == 'pure' and self._task_socket:
544 self._task_scheme == 'pure' and self._task_socket:
545 self._stop_scheduling_tasks()
545 self._stop_scheduling_tasks()
546
546
547 def _stop_scheduling_tasks(self):
547 def _stop_scheduling_tasks(self):
548 """Stop scheduling tasks because an engine has been unregistered
548 """Stop scheduling tasks because an engine has been unregistered
549 from a pure ZMQ scheduler.
549 from a pure ZMQ scheduler.
550 """
550 """
551 self._task_socket.close()
551 self._task_socket.close()
552 self._task_socket = None
552 self._task_socket = None
553 msg = "An engine has been unregistered, and we are using pure " +\
553 msg = "An engine has been unregistered, and we are using pure " +\
554 "ZMQ task scheduling. Task farming will be disabled."
554 "ZMQ task scheduling. Task farming will be disabled."
555 if self.outstanding:
555 if self.outstanding:
556 msg += " If you were running tasks when this happened, " +\
556 msg += " If you were running tasks when this happened, " +\
557 "some `outstanding` msg_ids may never resolve."
557 "some `outstanding` msg_ids may never resolve."
558 warnings.warn(msg, RuntimeWarning)
558 warnings.warn(msg, RuntimeWarning)
559
559
560 def _build_targets(self, targets):
560 def _build_targets(self, targets):
561 """Turn valid target IDs or 'all' into two lists:
561 """Turn valid target IDs or 'all' into two lists:
562 (int_ids, uuids).
562 (int_ids, uuids).
563 """
563 """
564 if not self._ids:
564 if not self._ids:
565 # flush notification socket if no engines yet, just in case
565 # flush notification socket if no engines yet, just in case
566 if not self.ids:
566 if not self.ids:
567 raise error.NoEnginesRegistered("Can't build targets without any engines")
567 raise error.NoEnginesRegistered("Can't build targets without any engines")
568
568
569 if targets is None:
569 if targets is None:
570 targets = self._ids
570 targets = self._ids
571 elif isinstance(targets, string_types):
571 elif isinstance(targets, string_types):
572 if targets.lower() == 'all':
572 if targets.lower() == 'all':
573 targets = self._ids
573 targets = self._ids
574 else:
574 else:
575 raise TypeError("%r not valid str target, must be 'all'"%(targets))
575 raise TypeError("%r not valid str target, must be 'all'"%(targets))
576 elif isinstance(targets, int):
576 elif isinstance(targets, int):
577 if targets < 0:
577 if targets < 0:
578 targets = self.ids[targets]
578 targets = self.ids[targets]
579 if targets not in self._ids:
579 if targets not in self._ids:
580 raise IndexError("No such engine: %i"%targets)
580 raise IndexError("No such engine: %i"%targets)
581 targets = [targets]
581 targets = [targets]
582
582
583 if isinstance(targets, slice):
583 if isinstance(targets, slice):
584 indices = range(len(self._ids))[targets]
584 indices = range(len(self._ids))[targets]
585 ids = self.ids
585 ids = self.ids
586 targets = [ ids[i] for i in indices ]
586 targets = [ ids[i] for i in indices ]
587
587
588 if not isinstance(targets, (tuple, list, xrange)):
588 if not isinstance(targets, (tuple, list, xrange)):
589 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
589 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
590
590
591 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
591 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
592
592
593 def _connect(self, sshserver, ssh_kwargs, timeout):
593 def _connect(self, sshserver, ssh_kwargs, timeout):
594 """setup all our socket connections to the cluster. This is called from
594 """setup all our socket connections to the cluster. This is called from
595 __init__."""
595 __init__."""
596
596
597 # Maybe allow reconnecting?
597 # Maybe allow reconnecting?
598 if self._connected:
598 if self._connected:
599 return
599 return
600 self._connected=True
600 self._connected=True
601
601
602 def connect_socket(s, url):
602 def connect_socket(s, url):
603 if self._ssh:
603 if self._ssh:
604 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
604 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
605 else:
605 else:
606 return s.connect(url)
606 return s.connect(url)
607
607
608 self.session.send(self._query_socket, 'connection_request')
608 self.session.send(self._query_socket, 'connection_request')
609 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
609 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
610 poller = zmq.Poller()
610 poller = zmq.Poller()
611 poller.register(self._query_socket, zmq.POLLIN)
611 poller.register(self._query_socket, zmq.POLLIN)
612 # poll expects milliseconds, timeout is seconds
612 # poll expects milliseconds, timeout is seconds
613 evts = poller.poll(timeout*1000)
613 evts = poller.poll(timeout*1000)
614 if not evts:
614 if not evts:
615 raise error.TimeoutError("Hub connection request timed out")
615 raise error.TimeoutError("Hub connection request timed out")
616 idents,msg = self.session.recv(self._query_socket,mode=0)
616 idents,msg = self.session.recv(self._query_socket,mode=0)
617 if self.debug:
617 if self.debug:
618 pprint(msg)
618 pprint(msg)
619 content = msg['content']
619 content = msg['content']
620 # self._config['registration'] = dict(content)
620 # self._config['registration'] = dict(content)
621 cfg = self._config
621 cfg = self._config
622 if content['status'] == 'ok':
622 if content['status'] == 'ok':
623 self._mux_socket = self._context.socket(zmq.DEALER)
623 self._mux_socket = self._context.socket(zmq.DEALER)
624 connect_socket(self._mux_socket, cfg['mux'])
624 connect_socket(self._mux_socket, cfg['mux'])
625
625
626 self._task_socket = self._context.socket(zmq.DEALER)
626 self._task_socket = self._context.socket(zmq.DEALER)
627 connect_socket(self._task_socket, cfg['task'])
627 connect_socket(self._task_socket, cfg['task'])
628
628
629 self._notification_socket = self._context.socket(zmq.SUB)
629 self._notification_socket = self._context.socket(zmq.SUB)
630 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
630 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
631 connect_socket(self._notification_socket, cfg['notification'])
631 connect_socket(self._notification_socket, cfg['notification'])
632
632
633 self._control_socket = self._context.socket(zmq.DEALER)
633 self._control_socket = self._context.socket(zmq.DEALER)
634 connect_socket(self._control_socket, cfg['control'])
634 connect_socket(self._control_socket, cfg['control'])
635
635
636 self._iopub_socket = self._context.socket(zmq.SUB)
636 self._iopub_socket = self._context.socket(zmq.SUB)
637 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
637 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
638 connect_socket(self._iopub_socket, cfg['iopub'])
638 connect_socket(self._iopub_socket, cfg['iopub'])
639
639
640 self._update_engines(dict(content['engines']))
640 self._update_engines(dict(content['engines']))
641 else:
641 else:
642 self._connected = False
642 self._connected = False
643 raise Exception("Failed to connect!")
643 raise Exception("Failed to connect!")
644
644
645 #--------------------------------------------------------------------------
645 #--------------------------------------------------------------------------
646 # handlers and callbacks for incoming messages
646 # handlers and callbacks for incoming messages
647 #--------------------------------------------------------------------------
647 #--------------------------------------------------------------------------
648
648
649 def _unwrap_exception(self, content):
649 def _unwrap_exception(self, content):
650 """unwrap exception, and remap engine_id to int."""
650 """unwrap exception, and remap engine_id to int."""
651 e = error.unwrap_exception(content)
651 e = error.unwrap_exception(content)
652 # print e.traceback
652 # print e.traceback
653 if e.engine_info:
653 if e.engine_info:
654 e_uuid = e.engine_info['engine_uuid']
654 e_uuid = e.engine_info['engine_uuid']
655 eid = self._engines[e_uuid]
655 eid = self._engines[e_uuid]
656 e.engine_info['engine_id'] = eid
656 e.engine_info['engine_id'] = eid
657 return e
657 return e
658
658
659 def _extract_metadata(self, msg):
659 def _extract_metadata(self, msg):
660 header = msg['header']
660 header = msg['header']
661 parent = msg['parent_header']
661 parent = msg['parent_header']
662 msg_meta = msg['metadata']
662 msg_meta = msg['metadata']
663 content = msg['content']
663 content = msg['content']
664 md = {'msg_id' : parent['msg_id'],
664 md = {'msg_id' : parent['msg_id'],
665 'received' : datetime.now(),
665 'received' : datetime.now(),
666 'engine_uuid' : msg_meta.get('engine', None),
666 'engine_uuid' : msg_meta.get('engine', None),
667 'follow' : msg_meta.get('follow', []),
667 'follow' : msg_meta.get('follow', []),
668 'after' : msg_meta.get('after', []),
668 'after' : msg_meta.get('after', []),
669 'status' : content['status'],
669 'status' : content['status'],
670 }
670 }
671
671
672 if md['engine_uuid'] is not None:
672 if md['engine_uuid'] is not None:
673 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
673 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
674
674
675 if 'date' in parent:
675 if 'date' in parent:
676 md['submitted'] = parent['date']
676 md['submitted'] = parent['date']
677 if 'started' in msg_meta:
677 if 'started' in msg_meta:
678 md['started'] = msg_meta['started']
678 md['started'] = msg_meta['started']
679 if 'date' in header:
679 if 'date' in header:
680 md['completed'] = header['date']
680 md['completed'] = header['date']
681 return md
681 return md
682
682
683 def _register_engine(self, msg):
683 def _register_engine(self, msg):
684 """Register a new engine, and update our connection info."""
684 """Register a new engine, and update our connection info."""
685 content = msg['content']
685 content = msg['content']
686 eid = content['id']
686 eid = content['id']
687 d = {eid : content['uuid']}
687 d = {eid : content['uuid']}
688 self._update_engines(d)
688 self._update_engines(d)
689
689
690 def _unregister_engine(self, msg):
690 def _unregister_engine(self, msg):
691 """Unregister an engine that has died."""
691 """Unregister an engine that has died."""
692 content = msg['content']
692 content = msg['content']
693 eid = int(content['id'])
693 eid = int(content['id'])
694 if eid in self._ids:
694 if eid in self._ids:
695 self._ids.remove(eid)
695 self._ids.remove(eid)
696 uuid = self._engines.pop(eid)
696 uuid = self._engines.pop(eid)
697
697
698 self._handle_stranded_msgs(eid, uuid)
698 self._handle_stranded_msgs(eid, uuid)
699
699
700 if self._task_socket and self._task_scheme == 'pure':
700 if self._task_socket and self._task_scheme == 'pure':
701 self._stop_scheduling_tasks()
701 self._stop_scheduling_tasks()
702
702
703 def _handle_stranded_msgs(self, eid, uuid):
703 def _handle_stranded_msgs(self, eid, uuid):
704 """Handle messages known to be on an engine when the engine unregisters.
704 """Handle messages known to be on an engine when the engine unregisters.
705
705
706 It is possible that this will fire prematurely - that is, an engine will
706 It is possible that this will fire prematurely - that is, an engine will
707 go down after completing a result, and the client will be notified
707 go down after completing a result, and the client will be notified
708 of the unregistration and later receive the successful result.
708 of the unregistration and later receive the successful result.
709 """
709 """
710
710
711 outstanding = self._outstanding_dict[uuid]
711 outstanding = self._outstanding_dict[uuid]
712
712
713 for msg_id in list(outstanding):
713 for msg_id in list(outstanding):
714 if msg_id in self.results:
714 if msg_id in self.results:
715 # we already
715 # we already
716 continue
716 continue
717 try:
717 try:
718 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
718 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
719 except:
719 except:
720 content = error.wrap_exception()
720 content = error.wrap_exception()
721 # build a fake message:
721 # build a fake message:
722 msg = self.session.msg('apply_reply', content=content)
722 msg = self.session.msg('apply_reply', content=content)
723 msg['parent_header']['msg_id'] = msg_id
723 msg['parent_header']['msg_id'] = msg_id
724 msg['metadata']['engine'] = uuid
724 msg['metadata']['engine'] = uuid
725 self._handle_apply_reply(msg)
725 self._handle_apply_reply(msg)
726
726
727 def _handle_execute_reply(self, msg):
727 def _handle_execute_reply(self, msg):
728 """Save the reply to an execute_request into our results.
728 """Save the reply to an execute_request into our results.
729
729
730 execute messages are never actually used. apply is used instead.
730 execute messages are never actually used. apply is used instead.
731 """
731 """
732
732
733 parent = msg['parent_header']
733 parent = msg['parent_header']
734 msg_id = parent['msg_id']
734 msg_id = parent['msg_id']
735 if msg_id not in self.outstanding:
735 if msg_id not in self.outstanding:
736 if msg_id in self.history:
736 if msg_id in self.history:
737 print(("got stale result: %s"%msg_id))
737 print(("got stale result: %s"%msg_id))
738 else:
738 else:
739 print(("got unknown result: %s"%msg_id))
739 print(("got unknown result: %s"%msg_id))
740 else:
740 else:
741 self.outstanding.remove(msg_id)
741 self.outstanding.remove(msg_id)
742
742
743 content = msg['content']
743 content = msg['content']
744 header = msg['header']
744 header = msg['header']
745
745
746 # construct metadata:
746 # construct metadata:
747 md = self.metadata[msg_id]
747 md = self.metadata[msg_id]
748 md.update(self._extract_metadata(msg))
748 md.update(self._extract_metadata(msg))
749 # is this redundant?
749 # is this redundant?
750 self.metadata[msg_id] = md
750 self.metadata[msg_id] = md
751
751
752 e_outstanding = self._outstanding_dict[md['engine_uuid']]
752 e_outstanding = self._outstanding_dict[md['engine_uuid']]
753 if msg_id in e_outstanding:
753 if msg_id in e_outstanding:
754 e_outstanding.remove(msg_id)
754 e_outstanding.remove(msg_id)
755
755
756 # construct result:
756 # construct result:
757 if content['status'] == 'ok':
757 if content['status'] == 'ok':
758 self.results[msg_id] = ExecuteReply(msg_id, content, md)
758 self.results[msg_id] = ExecuteReply(msg_id, content, md)
759 elif content['status'] == 'aborted':
759 elif content['status'] == 'aborted':
760 self.results[msg_id] = error.TaskAborted(msg_id)
760 self.results[msg_id] = error.TaskAborted(msg_id)
761 elif content['status'] == 'resubmitted':
761 elif content['status'] == 'resubmitted':
762 # TODO: handle resubmission
762 # TODO: handle resubmission
763 pass
763 pass
764 else:
764 else:
765 self.results[msg_id] = self._unwrap_exception(content)
765 self.results[msg_id] = self._unwrap_exception(content)
766
766
767 def _handle_apply_reply(self, msg):
767 def _handle_apply_reply(self, msg):
768 """Save the reply to an apply_request into our results."""
768 """Save the reply to an apply_request into our results."""
769 parent = msg['parent_header']
769 parent = msg['parent_header']
770 msg_id = parent['msg_id']
770 msg_id = parent['msg_id']
771 if msg_id not in self.outstanding:
771 if msg_id not in self.outstanding:
772 if msg_id in self.history:
772 if msg_id in self.history:
773 print(("got stale result: %s"%msg_id))
773 print(("got stale result: %s"%msg_id))
774 print(self.results[msg_id])
774 print(self.results[msg_id])
775 print(msg)
775 print(msg)
776 else:
776 else:
777 print(("got unknown result: %s"%msg_id))
777 print(("got unknown result: %s"%msg_id))
778 else:
778 else:
779 self.outstanding.remove(msg_id)
779 self.outstanding.remove(msg_id)
780 content = msg['content']
780 content = msg['content']
781 header = msg['header']
781 header = msg['header']
782
782
783 # construct metadata:
783 # construct metadata:
784 md = self.metadata[msg_id]
784 md = self.metadata[msg_id]
785 md.update(self._extract_metadata(msg))
785 md.update(self._extract_metadata(msg))
786 # is this redundant?
786 # is this redundant?
787 self.metadata[msg_id] = md
787 self.metadata[msg_id] = md
788
788
789 e_outstanding = self._outstanding_dict[md['engine_uuid']]
789 e_outstanding = self._outstanding_dict[md['engine_uuid']]
790 if msg_id in e_outstanding:
790 if msg_id in e_outstanding:
791 e_outstanding.remove(msg_id)
791 e_outstanding.remove(msg_id)
792
792
793 # construct result:
793 # construct result:
794 if content['status'] == 'ok':
794 if content['status'] == 'ok':
795 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
795 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
796 elif content['status'] == 'aborted':
796 elif content['status'] == 'aborted':
797 self.results[msg_id] = error.TaskAborted(msg_id)
797 self.results[msg_id] = error.TaskAborted(msg_id)
798 elif content['status'] == 'resubmitted':
798 elif content['status'] == 'resubmitted':
799 # TODO: handle resubmission
799 # TODO: handle resubmission
800 pass
800 pass
801 else:
801 else:
802 self.results[msg_id] = self._unwrap_exception(content)
802 self.results[msg_id] = self._unwrap_exception(content)
803
803
804 def _flush_notifications(self):
804 def _flush_notifications(self):
805 """Flush notifications of engine registrations waiting
805 """Flush notifications of engine registrations waiting
806 in ZMQ queue."""
806 in ZMQ queue."""
807 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
807 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
808 while msg is not None:
808 while msg is not None:
809 if self.debug:
809 if self.debug:
810 pprint(msg)
810 pprint(msg)
811 msg_type = msg['header']['msg_type']
811 msg_type = msg['header']['msg_type']
812 handler = self._notification_handlers.get(msg_type, None)
812 handler = self._notification_handlers.get(msg_type, None)
813 if handler is None:
813 if handler is None:
814 raise Exception("Unhandled message type: %s" % msg_type)
814 raise Exception("Unhandled message type: %s" % msg_type)
815 else:
815 else:
816 handler(msg)
816 handler(msg)
817 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
817 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
818
818
819 def _flush_results(self, sock):
819 def _flush_results(self, sock):
820 """Flush task or queue results waiting in ZMQ queue."""
820 """Flush task or queue results waiting in ZMQ queue."""
821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822 while msg is not None:
822 while msg is not None:
823 if self.debug:
823 if self.debug:
824 pprint(msg)
824 pprint(msg)
825 msg_type = msg['header']['msg_type']
825 msg_type = msg['header']['msg_type']
826 handler = self._queue_handlers.get(msg_type, None)
826 handler = self._queue_handlers.get(msg_type, None)
827 if handler is None:
827 if handler is None:
828 raise Exception("Unhandled message type: %s" % msg_type)
828 raise Exception("Unhandled message type: %s" % msg_type)
829 else:
829 else:
830 handler(msg)
830 handler(msg)
831 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
831 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
832
832
833 def _flush_control(self, sock):
833 def _flush_control(self, sock):
834 """Flush replies from the control channel waiting
834 """Flush replies from the control channel waiting
835 in the ZMQ queue.
835 in the ZMQ queue.
836
836
837 Currently: ignore them."""
837 Currently: ignore them."""
838 if self._ignored_control_replies <= 0:
838 if self._ignored_control_replies <= 0:
839 return
839 return
840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
841 while msg is not None:
841 while msg is not None:
842 self._ignored_control_replies -= 1
842 self._ignored_control_replies -= 1
843 if self.debug:
843 if self.debug:
844 pprint(msg)
844 pprint(msg)
845 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
845 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
846
846
847 def _flush_ignored_control(self):
847 def _flush_ignored_control(self):
848 """flush ignored control replies"""
848 """flush ignored control replies"""
849 while self._ignored_control_replies > 0:
849 while self._ignored_control_replies > 0:
850 self.session.recv(self._control_socket)
850 self.session.recv(self._control_socket)
851 self._ignored_control_replies -= 1
851 self._ignored_control_replies -= 1
852
852
853 def _flush_ignored_hub_replies(self):
853 def _flush_ignored_hub_replies(self):
854 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
854 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
855 while msg is not None:
855 while msg is not None:
856 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
856 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
857
857
858 def _flush_iopub(self, sock):
858 def _flush_iopub(self, sock):
859 """Flush replies from the iopub channel waiting
859 """Flush replies from the iopub channel waiting
860 in the ZMQ queue.
860 in the ZMQ queue.
861 """
861 """
862 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
862 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
863 while msg is not None:
863 while msg is not None:
864 if self.debug:
864 if self.debug:
865 pprint(msg)
865 pprint(msg)
866 parent = msg['parent_header']
866 parent = msg['parent_header']
867 # ignore IOPub messages with no parent.
867 # ignore IOPub messages with no parent.
868 # Caused by print statements or warnings from before the first execution.
868 # Caused by print statements or warnings from before the first execution.
869 if not parent:
869 if not parent:
870 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
870 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
871 continue
871 continue
872 msg_id = parent['msg_id']
872 msg_id = parent['msg_id']
873 content = msg['content']
873 content = msg['content']
874 header = msg['header']
874 header = msg['header']
875 msg_type = msg['header']['msg_type']
875 msg_type = msg['header']['msg_type']
876
876
877 # init metadata:
877 # init metadata:
878 md = self.metadata[msg_id]
878 md = self.metadata[msg_id]
879
879
880 if msg_type == 'stream':
880 if msg_type == 'stream':
881 name = content['name']
881 name = content['name']
882 s = md[name] or ''
882 s = md[name] or ''
883 md[name] = s + content['data']
883 md[name] = s + content['data']
884 elif msg_type == 'pyerr':
884 elif msg_type == 'pyerr':
885 md.update({'pyerr' : self._unwrap_exception(content)})
885 md.update({'pyerr' : self._unwrap_exception(content)})
886 elif msg_type == 'pyin':
886 elif msg_type == 'pyin':
887 md.update({'pyin' : content['code']})
887 md.update({'pyin' : content['code']})
888 elif msg_type == 'display_data':
888 elif msg_type == 'display_data':
889 md['outputs'].append(content)
889 md['outputs'].append(content)
890 elif msg_type == 'pyout':
890 elif msg_type == 'pyout':
891 md['pyout'] = content
891 md['pyout'] = content
892 elif msg_type == 'data_message':
892 elif msg_type == 'data_message':
893 data, remainder = serialize.unserialize_object(msg['buffers'])
893 data, remainder = serialize.unserialize_object(msg['buffers'])
894 md['data'].update(data)
894 md['data'].update(data)
895 elif msg_type == 'status':
895 elif msg_type == 'status':
896 # idle message comes after all outputs
896 # idle message comes after all outputs
897 if content['execution_state'] == 'idle':
897 if content['execution_state'] == 'idle':
898 md['outputs_ready'] = True
898 md['outputs_ready'] = True
899 else:
899 else:
900 # unhandled msg_type (status, etc.)
900 # unhandled msg_type (status, etc.)
901 pass
901 pass
902
902
903 # reduntant?
903 # reduntant?
904 self.metadata[msg_id] = md
904 self.metadata[msg_id] = md
905
905
906 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
906 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
907
907
908 #--------------------------------------------------------------------------
908 #--------------------------------------------------------------------------
909 # len, getitem
909 # len, getitem
910 #--------------------------------------------------------------------------
910 #--------------------------------------------------------------------------
911
911
912 def __len__(self):
912 def __len__(self):
913 """len(client) returns # of engines."""
913 """len(client) returns # of engines."""
914 return len(self.ids)
914 return len(self.ids)
915
915
916 def __getitem__(self, key):
916 def __getitem__(self, key):
917 """index access returns DirectView multiplexer objects
917 """index access returns DirectView multiplexer objects
918
918
919 Must be int, slice, or list/tuple/xrange of ints"""
919 Must be int, slice, or list/tuple/xrange of ints"""
920 if not isinstance(key, (int, slice, tuple, list, xrange)):
920 if not isinstance(key, (int, slice, tuple, list, xrange)):
921 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
921 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
922 else:
922 else:
923 return self.direct_view(key)
923 return self.direct_view(key)
924
924
925 #--------------------------------------------------------------------------
925 #--------------------------------------------------------------------------
926 # Begin public methods
926 # Begin public methods
927 #--------------------------------------------------------------------------
927 #--------------------------------------------------------------------------
928
928
929 @property
929 @property
930 def ids(self):
930 def ids(self):
931 """Always up-to-date ids property."""
931 """Always up-to-date ids property."""
932 self._flush_notifications()
932 self._flush_notifications()
933 # always copy:
933 # always copy:
934 return list(self._ids)
934 return list(self._ids)
935
935
936 def activate(self, targets='all', suffix=''):
936 def activate(self, targets='all', suffix=''):
937 """Create a DirectView and register it with IPython magics
937 """Create a DirectView and register it with IPython magics
938
938
939 Defines the magics `%px, %autopx, %pxresult, %%px`
939 Defines the magics `%px, %autopx, %pxresult, %%px`
940
940
941 Parameters
941 Parameters
942 ----------
942 ----------
943
943
944 targets: int, list of ints, or 'all'
944 targets: int, list of ints, or 'all'
945 The engines on which the view's magics will run
945 The engines on which the view's magics will run
946 suffix: str [default: '']
946 suffix: str [default: '']
947 The suffix, if any, for the magics. This allows you to have
947 The suffix, if any, for the magics. This allows you to have
948 multiple views associated with parallel magics at the same time.
948 multiple views associated with parallel magics at the same time.
949
949
950 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
950 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
951 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
951 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
952 on engine 0.
952 on engine 0.
953 """
953 """
954 view = self.direct_view(targets)
954 view = self.direct_view(targets)
955 view.block = True
955 view.block = True
956 view.activate(suffix)
956 view.activate(suffix)
957 return view
957 return view
958
958
959 def close(self, linger=None):
959 def close(self, linger=None):
960 """Close my zmq Sockets
960 """Close my zmq Sockets
961
961
962 If `linger`, set the zmq LINGER socket option,
962 If `linger`, set the zmq LINGER socket option,
963 which allows discarding of messages.
963 which allows discarding of messages.
964 """
964 """
965 if self._closed:
965 if self._closed:
966 return
966 return
967 self.stop_spin_thread()
967 self.stop_spin_thread()
968 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
968 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
969 for name in snames:
969 for name in snames:
970 socket = getattr(self, name)
970 socket = getattr(self, name)
971 if socket is not None and not socket.closed:
971 if socket is not None and not socket.closed:
972 if linger is not None:
972 if linger is not None:
973 socket.close(linger=linger)
973 socket.close(linger=linger)
974 else:
974 else:
975 socket.close()
975 socket.close()
976 self._closed = True
976 self._closed = True
977
977
978 def _spin_every(self, interval=1):
978 def _spin_every(self, interval=1):
979 """target func for use in spin_thread"""
979 """target func for use in spin_thread"""
980 while True:
980 while True:
981 if self._stop_spinning.is_set():
981 if self._stop_spinning.is_set():
982 return
982 return
983 time.sleep(interval)
983 time.sleep(interval)
984 self.spin()
984 self.spin()
985
985
986 def spin_thread(self, interval=1):
986 def spin_thread(self, interval=1):
987 """call Client.spin() in a background thread on some regular interval
987 """call Client.spin() in a background thread on some regular interval
988
988
989 This helps ensure that messages don't pile up too much in the zmq queue
989 This helps ensure that messages don't pile up too much in the zmq queue
990 while you are working on other things, or just leaving an idle terminal.
990 while you are working on other things, or just leaving an idle terminal.
991
991
992 It also helps limit potential padding of the `received` timestamp
992 It also helps limit potential padding of the `received` timestamp
993 on AsyncResult objects, used for timings.
993 on AsyncResult objects, used for timings.
994
994
995 Parameters
995 Parameters
996 ----------
996 ----------
997
997
998 interval : float, optional
998 interval : float, optional
999 The interval on which to spin the client in the background thread
999 The interval on which to spin the client in the background thread
1000 (simply passed to time.sleep).
1000 (simply passed to time.sleep).
1001
1001
1002 Notes
1002 Notes
1003 -----
1003 -----
1004
1004
1005 For precision timing, you may want to use this method to put a bound
1005 For precision timing, you may want to use this method to put a bound
1006 on the jitter (in seconds) in `received` timestamps used
1006 on the jitter (in seconds) in `received` timestamps used
1007 in AsyncResult.wall_time.
1007 in AsyncResult.wall_time.
1008
1008
1009 """
1009 """
1010 if self._spin_thread is not None:
1010 if self._spin_thread is not None:
1011 self.stop_spin_thread()
1011 self.stop_spin_thread()
1012 self._stop_spinning.clear()
1012 self._stop_spinning.clear()
1013 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1013 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1014 self._spin_thread.daemon = True
1014 self._spin_thread.daemon = True
1015 self._spin_thread.start()
1015 self._spin_thread.start()
1016
1016
1017 def stop_spin_thread(self):
1017 def stop_spin_thread(self):
1018 """stop background spin_thread, if any"""
1018 """stop background spin_thread, if any"""
1019 if self._spin_thread is not None:
1019 if self._spin_thread is not None:
1020 self._stop_spinning.set()
1020 self._stop_spinning.set()
1021 self._spin_thread.join()
1021 self._spin_thread.join()
1022 self._spin_thread = None
1022 self._spin_thread = None
1023
1023
1024 def spin(self):
1024 def spin(self):
1025 """Flush any registration notifications and execution results
1025 """Flush any registration notifications and execution results
1026 waiting in the ZMQ queue.
1026 waiting in the ZMQ queue.
1027 """
1027 """
1028 if self._notification_socket:
1028 if self._notification_socket:
1029 self._flush_notifications()
1029 self._flush_notifications()
1030 if self._iopub_socket:
1030 if self._iopub_socket:
1031 self._flush_iopub(self._iopub_socket)
1031 self._flush_iopub(self._iopub_socket)
1032 if self._mux_socket:
1032 if self._mux_socket:
1033 self._flush_results(self._mux_socket)
1033 self._flush_results(self._mux_socket)
1034 if self._task_socket:
1034 if self._task_socket:
1035 self._flush_results(self._task_socket)
1035 self._flush_results(self._task_socket)
1036 if self._control_socket:
1036 if self._control_socket:
1037 self._flush_control(self._control_socket)
1037 self._flush_control(self._control_socket)
1038 if self._query_socket:
1038 if self._query_socket:
1039 self._flush_ignored_hub_replies()
1039 self._flush_ignored_hub_replies()
1040
1040
1041 def wait(self, jobs=None, timeout=-1):
1041 def wait(self, jobs=None, timeout=-1):
1042 """waits on one or more `jobs`, for up to `timeout` seconds.
1042 """waits on one or more `jobs`, for up to `timeout` seconds.
1043
1043
1044 Parameters
1044 Parameters
1045 ----------
1045 ----------
1046
1046
1047 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1047 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1048 ints are indices to self.history
1048 ints are indices to self.history
1049 strs are msg_ids
1049 strs are msg_ids
1050 default: wait on all outstanding messages
1050 default: wait on all outstanding messages
1051 timeout : float
1051 timeout : float
1052 a time in seconds, after which to give up.
1052 a time in seconds, after which to give up.
1053 default is -1, which means no timeout
1053 default is -1, which means no timeout
1054
1054
1055 Returns
1055 Returns
1056 -------
1056 -------
1057
1057
1058 True : when all msg_ids are done
1058 True : when all msg_ids are done
1059 False : timeout reached, some msg_ids still outstanding
1059 False : timeout reached, some msg_ids still outstanding
1060 """
1060 """
1061 tic = time.time()
1061 tic = time.time()
1062 if jobs is None:
1062 if jobs is None:
1063 theids = self.outstanding
1063 theids = self.outstanding
1064 else:
1064 else:
1065 if isinstance(jobs, string_types + (int, AsyncResult)):
1065 if isinstance(jobs, string_types + (int, AsyncResult)):
1066 jobs = [jobs]
1066 jobs = [jobs]
1067 theids = set()
1067 theids = set()
1068 for job in jobs:
1068 for job in jobs:
1069 if isinstance(job, int):
1069 if isinstance(job, int):
1070 # index access
1070 # index access
1071 job = self.history[job]
1071 job = self.history[job]
1072 elif isinstance(job, AsyncResult):
1072 elif isinstance(job, AsyncResult):
1073 map(theids.add, job.msg_ids)
1073 map(theids.add, job.msg_ids)
1074 continue
1074 continue
1075 theids.add(job)
1075 theids.add(job)
1076 if not theids.intersection(self.outstanding):
1076 if not theids.intersection(self.outstanding):
1077 return True
1077 return True
1078 self.spin()
1078 self.spin()
1079 while theids.intersection(self.outstanding):
1079 while theids.intersection(self.outstanding):
1080 if timeout >= 0 and ( time.time()-tic ) > timeout:
1080 if timeout >= 0 and ( time.time()-tic ) > timeout:
1081 break
1081 break
1082 time.sleep(1e-3)
1082 time.sleep(1e-3)
1083 self.spin()
1083 self.spin()
1084 return len(theids.intersection(self.outstanding)) == 0
1084 return len(theids.intersection(self.outstanding)) == 0
1085
1085
1086 #--------------------------------------------------------------------------
1086 #--------------------------------------------------------------------------
1087 # Control methods
1087 # Control methods
1088 #--------------------------------------------------------------------------
1088 #--------------------------------------------------------------------------
1089
1089
1090 @spin_first
1090 @spin_first
1091 def clear(self, targets=None, block=None):
1091 def clear(self, targets=None, block=None):
1092 """Clear the namespace in target(s)."""
1092 """Clear the namespace in target(s)."""
1093 block = self.block if block is None else block
1093 block = self.block if block is None else block
1094 targets = self._build_targets(targets)[0]
1094 targets = self._build_targets(targets)[0]
1095 for t in targets:
1095 for t in targets:
1096 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1096 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1097 error = False
1097 error = False
1098 if block:
1098 if block:
1099 self._flush_ignored_control()
1099 self._flush_ignored_control()
1100 for i in range(len(targets)):
1100 for i in range(len(targets)):
1101 idents,msg = self.session.recv(self._control_socket,0)
1101 idents,msg = self.session.recv(self._control_socket,0)
1102 if self.debug:
1102 if self.debug:
1103 pprint(msg)
1103 pprint(msg)
1104 if msg['content']['status'] != 'ok':
1104 if msg['content']['status'] != 'ok':
1105 error = self._unwrap_exception(msg['content'])
1105 error = self._unwrap_exception(msg['content'])
1106 else:
1106 else:
1107 self._ignored_control_replies += len(targets)
1107 self._ignored_control_replies += len(targets)
1108 if error:
1108 if error:
1109 raise error
1109 raise error
1110
1110
1111
1111
1112 @spin_first
1112 @spin_first
1113 def abort(self, jobs=None, targets=None, block=None):
1113 def abort(self, jobs=None, targets=None, block=None):
1114 """Abort specific jobs from the execution queues of target(s).
1114 """Abort specific jobs from the execution queues of target(s).
1115
1115
1116 This is a mechanism to prevent jobs that have already been submitted
1116 This is a mechanism to prevent jobs that have already been submitted
1117 from executing.
1117 from executing.
1118
1118
1119 Parameters
1119 Parameters
1120 ----------
1120 ----------
1121
1121
1122 jobs : msg_id, list of msg_ids, or AsyncResult
1122 jobs : msg_id, list of msg_ids, or AsyncResult
1123 The jobs to be aborted
1123 The jobs to be aborted
1124
1124
1125 If unspecified/None: abort all outstanding jobs.
1125 If unspecified/None: abort all outstanding jobs.
1126
1126
1127 """
1127 """
1128 block = self.block if block is None else block
1128 block = self.block if block is None else block
1129 jobs = jobs if jobs is not None else list(self.outstanding)
1129 jobs = jobs if jobs is not None else list(self.outstanding)
1130 targets = self._build_targets(targets)[0]
1130 targets = self._build_targets(targets)[0]
1131
1131
1132 msg_ids = []
1132 msg_ids = []
1133 if isinstance(jobs, string_types + (AsyncResult,)):
1133 if isinstance(jobs, string_types + (AsyncResult,)):
1134 jobs = [jobs]
1134 jobs = [jobs]
1135 bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult,)), jobs)
1135 bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult,)), jobs)
1136 if bad_ids:
1136 if bad_ids:
1137 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1137 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1138 for j in jobs:
1138 for j in jobs:
1139 if isinstance(j, AsyncResult):
1139 if isinstance(j, AsyncResult):
1140 msg_ids.extend(j.msg_ids)
1140 msg_ids.extend(j.msg_ids)
1141 else:
1141 else:
1142 msg_ids.append(j)
1142 msg_ids.append(j)
1143 content = dict(msg_ids=msg_ids)
1143 content = dict(msg_ids=msg_ids)
1144 for t in targets:
1144 for t in targets:
1145 self.session.send(self._control_socket, 'abort_request',
1145 self.session.send(self._control_socket, 'abort_request',
1146 content=content, ident=t)
1146 content=content, ident=t)
1147 error = False
1147 error = False
1148 if block:
1148 if block:
1149 self._flush_ignored_control()
1149 self._flush_ignored_control()
1150 for i in range(len(targets)):
1150 for i in range(len(targets)):
1151 idents,msg = self.session.recv(self._control_socket,0)
1151 idents,msg = self.session.recv(self._control_socket,0)
1152 if self.debug:
1152 if self.debug:
1153 pprint(msg)
1153 pprint(msg)
1154 if msg['content']['status'] != 'ok':
1154 if msg['content']['status'] != 'ok':
1155 error = self._unwrap_exception(msg['content'])
1155 error = self._unwrap_exception(msg['content'])
1156 else:
1156 else:
1157 self._ignored_control_replies += len(targets)
1157 self._ignored_control_replies += len(targets)
1158 if error:
1158 if error:
1159 raise error
1159 raise error
1160
1160
1161 @spin_first
1161 @spin_first
1162 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1162 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1163 """Terminates one or more engine processes, optionally including the hub.
1163 """Terminates one or more engine processes, optionally including the hub.
1164
1164
1165 Parameters
1165 Parameters
1166 ----------
1166 ----------
1167
1167
1168 targets: list of ints or 'all' [default: all]
1168 targets: list of ints or 'all' [default: all]
1169 Which engines to shutdown.
1169 Which engines to shutdown.
1170 hub: bool [default: False]
1170 hub: bool [default: False]
1171 Whether to include the Hub. hub=True implies targets='all'.
1171 Whether to include the Hub. hub=True implies targets='all'.
1172 block: bool [default: self.block]
1172 block: bool [default: self.block]
1173 Whether to wait for clean shutdown replies or not.
1173 Whether to wait for clean shutdown replies or not.
1174 restart: bool [default: False]
1174 restart: bool [default: False]
1175 NOT IMPLEMENTED
1175 NOT IMPLEMENTED
1176 whether to restart engines after shutting them down.
1176 whether to restart engines after shutting them down.
1177 """
1177 """
1178 from IPython.parallel.error import NoEnginesRegistered
1178 from IPython.parallel.error import NoEnginesRegistered
1179 if restart:
1179 if restart:
1180 raise NotImplementedError("Engine restart is not yet implemented")
1180 raise NotImplementedError("Engine restart is not yet implemented")
1181
1181
1182 block = self.block if block is None else block
1182 block = self.block if block is None else block
1183 if hub:
1183 if hub:
1184 targets = 'all'
1184 targets = 'all'
1185 try:
1185 try:
1186 targets = self._build_targets(targets)[0]
1186 targets = self._build_targets(targets)[0]
1187 except NoEnginesRegistered:
1187 except NoEnginesRegistered:
1188 targets = []
1188 targets = []
1189 for t in targets:
1189 for t in targets:
1190 self.session.send(self._control_socket, 'shutdown_request',
1190 self.session.send(self._control_socket, 'shutdown_request',
1191 content={'restart':restart},ident=t)
1191 content={'restart':restart},ident=t)
1192 error = False
1192 error = False
1193 if block or hub:
1193 if block or hub:
1194 self._flush_ignored_control()
1194 self._flush_ignored_control()
1195 for i in range(len(targets)):
1195 for i in range(len(targets)):
1196 idents,msg = self.session.recv(self._control_socket, 0)
1196 idents,msg = self.session.recv(self._control_socket, 0)
1197 if self.debug:
1197 if self.debug:
1198 pprint(msg)
1198 pprint(msg)
1199 if msg['content']['status'] != 'ok':
1199 if msg['content']['status'] != 'ok':
1200 error = self._unwrap_exception(msg['content'])
1200 error = self._unwrap_exception(msg['content'])
1201 else:
1201 else:
1202 self._ignored_control_replies += len(targets)
1202 self._ignored_control_replies += len(targets)
1203
1203
1204 if hub:
1204 if hub:
1205 time.sleep(0.25)
1205 time.sleep(0.25)
1206 self.session.send(self._query_socket, 'shutdown_request')
1206 self.session.send(self._query_socket, 'shutdown_request')
1207 idents,msg = self.session.recv(self._query_socket, 0)
1207 idents,msg = self.session.recv(self._query_socket, 0)
1208 if self.debug:
1208 if self.debug:
1209 pprint(msg)
1209 pprint(msg)
1210 if msg['content']['status'] != 'ok':
1210 if msg['content']['status'] != 'ok':
1211 error = self._unwrap_exception(msg['content'])
1211 error = self._unwrap_exception(msg['content'])
1212
1212
1213 if error:
1213 if error:
1214 raise error
1214 raise error
1215
1215
1216 #--------------------------------------------------------------------------
1216 #--------------------------------------------------------------------------
1217 # Execution related methods
1217 # Execution related methods
1218 #--------------------------------------------------------------------------
1218 #--------------------------------------------------------------------------
1219
1219
1220 def _maybe_raise(self, result):
1220 def _maybe_raise(self, result):
1221 """wrapper for maybe raising an exception if apply failed."""
1221 """wrapper for maybe raising an exception if apply failed."""
1222 if isinstance(result, error.RemoteError):
1222 if isinstance(result, error.RemoteError):
1223 raise result
1223 raise result
1224
1224
1225 return result
1225 return result
1226
1226
1227 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1227 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1228 ident=None):
1228 ident=None):
1229 """construct and send an apply message via a socket.
1229 """construct and send an apply message via a socket.
1230
1230
1231 This is the principal method with which all engine execution is performed by views.
1231 This is the principal method with which all engine execution is performed by views.
1232 """
1232 """
1233
1233
1234 if self._closed:
1234 if self._closed:
1235 raise RuntimeError("Client cannot be used after its sockets have been closed")
1235 raise RuntimeError("Client cannot be used after its sockets have been closed")
1236
1236
1237 # defaults:
1237 # defaults:
1238 args = args if args is not None else []
1238 args = args if args is not None else []
1239 kwargs = kwargs if kwargs is not None else {}
1239 kwargs = kwargs if kwargs is not None else {}
1240 metadata = metadata if metadata is not None else {}
1240 metadata = metadata if metadata is not None else {}
1241
1241
1242 # validate arguments
1242 # validate arguments
1243 if not callable(f) and not isinstance(f, Reference):
1243 if not callable(f) and not isinstance(f, Reference):
1244 raise TypeError("f must be callable, not %s"%type(f))
1244 raise TypeError("f must be callable, not %s"%type(f))
1245 if not isinstance(args, (tuple, list)):
1245 if not isinstance(args, (tuple, list)):
1246 raise TypeError("args must be tuple or list, not %s"%type(args))
1246 raise TypeError("args must be tuple or list, not %s"%type(args))
1247 if not isinstance(kwargs, dict):
1247 if not isinstance(kwargs, dict):
1248 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1248 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1249 if not isinstance(metadata, dict):
1249 if not isinstance(metadata, dict):
1250 raise TypeError("metadata must be dict, not %s"%type(metadata))
1250 raise TypeError("metadata must be dict, not %s"%type(metadata))
1251
1251
1252 bufs = serialize.pack_apply_message(f, args, kwargs,
1252 bufs = serialize.pack_apply_message(f, args, kwargs,
1253 buffer_threshold=self.session.buffer_threshold,
1253 buffer_threshold=self.session.buffer_threshold,
1254 item_threshold=self.session.item_threshold,
1254 item_threshold=self.session.item_threshold,
1255 )
1255 )
1256
1256
1257 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1257 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1258 metadata=metadata, track=track)
1258 metadata=metadata, track=track)
1259
1259
1260 msg_id = msg['header']['msg_id']
1260 msg_id = msg['header']['msg_id']
1261 self.outstanding.add(msg_id)
1261 self.outstanding.add(msg_id)
1262 if ident:
1262 if ident:
1263 # possibly routed to a specific engine
1263 # possibly routed to a specific engine
1264 if isinstance(ident, list):
1264 if isinstance(ident, list):
1265 ident = ident[-1]
1265 ident = ident[-1]
1266 if ident in self._engines.values():
1266 if ident in self._engines.values():
1267 # save for later, in case of engine death
1267 # save for later, in case of engine death
1268 self._outstanding_dict[ident].add(msg_id)
1268 self._outstanding_dict[ident].add(msg_id)
1269 self.history.append(msg_id)
1269 self.history.append(msg_id)
1270 self.metadata[msg_id]['submitted'] = datetime.now()
1270 self.metadata[msg_id]['submitted'] = datetime.now()
1271
1271
1272 return msg
1272 return msg
1273
1273
1274 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1274 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1275 """construct and send an execute request via a socket.
1275 """construct and send an execute request via a socket.
1276
1276
1277 """
1277 """
1278
1278
1279 if self._closed:
1279 if self._closed:
1280 raise RuntimeError("Client cannot be used after its sockets have been closed")
1280 raise RuntimeError("Client cannot be used after its sockets have been closed")
1281
1281
1282 # defaults:
1282 # defaults:
1283 metadata = metadata if metadata is not None else {}
1283 metadata = metadata if metadata is not None else {}
1284
1284
1285 # validate arguments
1285 # validate arguments
1286 if not isinstance(code, string_types):
1286 if not isinstance(code, string_types):
1287 raise TypeError("code must be text, not %s" % type(code))
1287 raise TypeError("code must be text, not %s" % type(code))
1288 if not isinstance(metadata, dict):
1288 if not isinstance(metadata, dict):
1289 raise TypeError("metadata must be dict, not %s" % type(metadata))
1289 raise TypeError("metadata must be dict, not %s" % type(metadata))
1290
1290
1291 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1291 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1292
1292
1293
1293
1294 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1294 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1295 metadata=metadata)
1295 metadata=metadata)
1296
1296
1297 msg_id = msg['header']['msg_id']
1297 msg_id = msg['header']['msg_id']
1298 self.outstanding.add(msg_id)
1298 self.outstanding.add(msg_id)
1299 if ident:
1299 if ident:
1300 # possibly routed to a specific engine
1300 # possibly routed to a specific engine
1301 if isinstance(ident, list):
1301 if isinstance(ident, list):
1302 ident = ident[-1]
1302 ident = ident[-1]
1303 if ident in self._engines.values():
1303 if ident in self._engines.values():
1304 # save for later, in case of engine death
1304 # save for later, in case of engine death
1305 self._outstanding_dict[ident].add(msg_id)
1305 self._outstanding_dict[ident].add(msg_id)
1306 self.history.append(msg_id)
1306 self.history.append(msg_id)
1307 self.metadata[msg_id]['submitted'] = datetime.now()
1307 self.metadata[msg_id]['submitted'] = datetime.now()
1308
1308
1309 return msg
1309 return msg
1310
1310
1311 #--------------------------------------------------------------------------
1311 #--------------------------------------------------------------------------
1312 # construct a View object
1312 # construct a View object
1313 #--------------------------------------------------------------------------
1313 #--------------------------------------------------------------------------
1314
1314
1315 def load_balanced_view(self, targets=None):
1315 def load_balanced_view(self, targets=None):
1316 """construct a DirectView object.
1316 """construct a DirectView object.
1317
1317
1318 If no arguments are specified, create a LoadBalancedView
1318 If no arguments are specified, create a LoadBalancedView
1319 using all engines.
1319 using all engines.
1320
1320
1321 Parameters
1321 Parameters
1322 ----------
1322 ----------
1323
1323
1324 targets: list,slice,int,etc. [default: use all engines]
1324 targets: list,slice,int,etc. [default: use all engines]
1325 The subset of engines across which to load-balance
1325 The subset of engines across which to load-balance
1326 """
1326 """
1327 if targets == 'all':
1327 if targets == 'all':
1328 targets = None
1328 targets = None
1329 if targets is not None:
1329 if targets is not None:
1330 targets = self._build_targets(targets)[1]
1330 targets = self._build_targets(targets)[1]
1331 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1331 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1332
1332
1333 def direct_view(self, targets='all'):
1333 def direct_view(self, targets='all'):
1334 """construct a DirectView object.
1334 """construct a DirectView object.
1335
1335
1336 If no targets are specified, create a DirectView using all engines.
1336 If no targets are specified, create a DirectView using all engines.
1337
1337
1338 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1338 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1339 evaluate the target engines at each execution, whereas rc[:] will connect to
1339 evaluate the target engines at each execution, whereas rc[:] will connect to
1340 all *current* engines, and that list will not change.
1340 all *current* engines, and that list will not change.
1341
1341
1342 That is, 'all' will always use all engines, whereas rc[:] will not use
1342 That is, 'all' will always use all engines, whereas rc[:] will not use
1343 engines added after the DirectView is constructed.
1343 engines added after the DirectView is constructed.
1344
1344
1345 Parameters
1345 Parameters
1346 ----------
1346 ----------
1347
1347
1348 targets: list,slice,int,etc. [default: use all engines]
1348 targets: list,slice,int,etc. [default: use all engines]
1349 The engines to use for the View
1349 The engines to use for the View
1350 """
1350 """
1351 single = isinstance(targets, int)
1351 single = isinstance(targets, int)
1352 # allow 'all' to be lazily evaluated at each execution
1352 # allow 'all' to be lazily evaluated at each execution
1353 if targets != 'all':
1353 if targets != 'all':
1354 targets = self._build_targets(targets)[1]
1354 targets = self._build_targets(targets)[1]
1355 if single:
1355 if single:
1356 targets = targets[0]
1356 targets = targets[0]
1357 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1357 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1358
1358
1359 #--------------------------------------------------------------------------
1359 #--------------------------------------------------------------------------
1360 # Query methods
1360 # Query methods
1361 #--------------------------------------------------------------------------
1361 #--------------------------------------------------------------------------
1362
1362
1363 @spin_first
1363 @spin_first
1364 def get_result(self, indices_or_msg_ids=None, block=None):
1364 def get_result(self, indices_or_msg_ids=None, block=None):
1365 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1365 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1366
1366
1367 If the client already has the results, no request to the Hub will be made.
1367 If the client already has the results, no request to the Hub will be made.
1368
1368
1369 This is a convenient way to construct AsyncResult objects, which are wrappers
1369 This is a convenient way to construct AsyncResult objects, which are wrappers
1370 that include metadata about execution, and allow for awaiting results that
1370 that include metadata about execution, and allow for awaiting results that
1371 were not submitted by this Client.
1371 were not submitted by this Client.
1372
1372
1373 It can also be a convenient way to retrieve the metadata associated with
1373 It can also be a convenient way to retrieve the metadata associated with
1374 blocking execution, since it always retrieves
1374 blocking execution, since it always retrieves
1375
1375
1376 Examples
1376 Examples
1377 --------
1377 --------
1378 ::
1378 ::
1379
1379
1380 In [10]: r = client.apply()
1380 In [10]: r = client.apply()
1381
1381
1382 Parameters
1382 Parameters
1383 ----------
1383 ----------
1384
1384
1385 indices_or_msg_ids : integer history index, str msg_id, or list of either
1385 indices_or_msg_ids : integer history index, str msg_id, or list of either
1386 The indices or msg_ids of indices to be retrieved
1386 The indices or msg_ids of indices to be retrieved
1387
1387
1388 block : bool
1388 block : bool
1389 Whether to wait for the result to be done
1389 Whether to wait for the result to be done
1390
1390
1391 Returns
1391 Returns
1392 -------
1392 -------
1393
1393
1394 AsyncResult
1394 AsyncResult
1395 A single AsyncResult object will always be returned.
1395 A single AsyncResult object will always be returned.
1396
1396
1397 AsyncHubResult
1397 AsyncHubResult
1398 A subclass of AsyncResult that retrieves results from the Hub
1398 A subclass of AsyncResult that retrieves results from the Hub
1399
1399
1400 """
1400 """
1401 block = self.block if block is None else block
1401 block = self.block if block is None else block
1402 if indices_or_msg_ids is None:
1402 if indices_or_msg_ids is None:
1403 indices_or_msg_ids = -1
1403 indices_or_msg_ids = -1
1404
1404
1405 single_result = False
1405 single_result = False
1406 if not isinstance(indices_or_msg_ids, (list,tuple)):
1406 if not isinstance(indices_or_msg_ids, (list,tuple)):
1407 indices_or_msg_ids = [indices_or_msg_ids]
1407 indices_or_msg_ids = [indices_or_msg_ids]
1408 single_result = True
1408 single_result = True
1409
1409
1410 theids = []
1410 theids = []
1411 for id in indices_or_msg_ids:
1411 for id in indices_or_msg_ids:
1412 if isinstance(id, int):
1412 if isinstance(id, int):
1413 id = self.history[id]
1413 id = self.history[id]
1414 if not isinstance(id, string_types):
1414 if not isinstance(id, string_types):
1415 raise TypeError("indices must be str or int, not %r"%id)
1415 raise TypeError("indices must be str or int, not %r"%id)
1416 theids.append(id)
1416 theids.append(id)
1417
1417
1418 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1418 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1419 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1419 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1420
1420
1421 # given single msg_id initially, get_result shot get the result itself,
1421 # given single msg_id initially, get_result shot get the result itself,
1422 # not a length-one list
1422 # not a length-one list
1423 if single_result:
1423 if single_result:
1424 theids = theids[0]
1424 theids = theids[0]
1425
1425
1426 if remote_ids:
1426 if remote_ids:
1427 ar = AsyncHubResult(self, msg_ids=theids)
1427 ar = AsyncHubResult(self, msg_ids=theids)
1428 else:
1428 else:
1429 ar = AsyncResult(self, msg_ids=theids)
1429 ar = AsyncResult(self, msg_ids=theids)
1430
1430
1431 if block:
1431 if block:
1432 ar.wait()
1432 ar.wait()
1433
1433
1434 return ar
1434 return ar
1435
1435
1436 @spin_first
1436 @spin_first
1437 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1437 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1438 """Resubmit one or more tasks.
1438 """Resubmit one or more tasks.
1439
1439
1440 in-flight tasks may not be resubmitted.
1440 in-flight tasks may not be resubmitted.
1441
1441
1442 Parameters
1442 Parameters
1443 ----------
1443 ----------
1444
1444
1445 indices_or_msg_ids : integer history index, str msg_id, or list of either
1445 indices_or_msg_ids : integer history index, str msg_id, or list of either
1446 The indices or msg_ids of indices to be retrieved
1446 The indices or msg_ids of indices to be retrieved
1447
1447
1448 block : bool
1448 block : bool
1449 Whether to wait for the result to be done
1449 Whether to wait for the result to be done
1450
1450
1451 Returns
1451 Returns
1452 -------
1452 -------
1453
1453
1454 AsyncHubResult
1454 AsyncHubResult
1455 A subclass of AsyncResult that retrieves results from the Hub
1455 A subclass of AsyncResult that retrieves results from the Hub
1456
1456
1457 """
1457 """
1458 block = self.block if block is None else block
1458 block = self.block if block is None else block
1459 if indices_or_msg_ids is None:
1459 if indices_or_msg_ids is None:
1460 indices_or_msg_ids = -1
1460 indices_or_msg_ids = -1
1461
1461
1462 if not isinstance(indices_or_msg_ids, (list,tuple)):
1462 if not isinstance(indices_or_msg_ids, (list,tuple)):
1463 indices_or_msg_ids = [indices_or_msg_ids]
1463 indices_or_msg_ids = [indices_or_msg_ids]
1464
1464
1465 theids = []
1465 theids = []
1466 for id in indices_or_msg_ids:
1466 for id in indices_or_msg_ids:
1467 if isinstance(id, int):
1467 if isinstance(id, int):
1468 id = self.history[id]
1468 id = self.history[id]
1469 if not isinstance(id, string_types):
1469 if not isinstance(id, string_types):
1470 raise TypeError("indices must be str or int, not %r"%id)
1470 raise TypeError("indices must be str or int, not %r"%id)
1471 theids.append(id)
1471 theids.append(id)
1472
1472
1473 content = dict(msg_ids = theids)
1473 content = dict(msg_ids = theids)
1474
1474
1475 self.session.send(self._query_socket, 'resubmit_request', content)
1475 self.session.send(self._query_socket, 'resubmit_request', content)
1476
1476
1477 zmq.select([self._query_socket], [], [])
1477 zmq.select([self._query_socket], [], [])
1478 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1478 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1479 if self.debug:
1479 if self.debug:
1480 pprint(msg)
1480 pprint(msg)
1481 content = msg['content']
1481 content = msg['content']
1482 if content['status'] != 'ok':
1482 if content['status'] != 'ok':
1483 raise self._unwrap_exception(content)
1483 raise self._unwrap_exception(content)
1484 mapping = content['resubmitted']
1484 mapping = content['resubmitted']
1485 new_ids = [ mapping[msg_id] for msg_id in theids ]
1485 new_ids = [ mapping[msg_id] for msg_id in theids ]
1486
1486
1487 ar = AsyncHubResult(self, msg_ids=new_ids)
1487 ar = AsyncHubResult(self, msg_ids=new_ids)
1488
1488
1489 if block:
1489 if block:
1490 ar.wait()
1490 ar.wait()
1491
1491
1492 return ar
1492 return ar
1493
1493
1494 @spin_first
1494 @spin_first
1495 def result_status(self, msg_ids, status_only=True):
1495 def result_status(self, msg_ids, status_only=True):
1496 """Check on the status of the result(s) of the apply request with `msg_ids`.
1496 """Check on the status of the result(s) of the apply request with `msg_ids`.
1497
1497
1498 If status_only is False, then the actual results will be retrieved, else
1498 If status_only is False, then the actual results will be retrieved, else
1499 only the status of the results will be checked.
1499 only the status of the results will be checked.
1500
1500
1501 Parameters
1501 Parameters
1502 ----------
1502 ----------
1503
1503
1504 msg_ids : list of msg_ids
1504 msg_ids : list of msg_ids
1505 if int:
1505 if int:
1506 Passed as index to self.history for convenience.
1506 Passed as index to self.history for convenience.
1507 status_only : bool (default: True)
1507 status_only : bool (default: True)
1508 if False:
1508 if False:
1509 Retrieve the actual results of completed tasks.
1509 Retrieve the actual results of completed tasks.
1510
1510
1511 Returns
1511 Returns
1512 -------
1512 -------
1513
1513
1514 results : dict
1514 results : dict
1515 There will always be the keys 'pending' and 'completed', which will
1515 There will always be the keys 'pending' and 'completed', which will
1516 be lists of msg_ids that are incomplete or complete. If `status_only`
1516 be lists of msg_ids that are incomplete or complete. If `status_only`
1517 is False, then completed results will be keyed by their `msg_id`.
1517 is False, then completed results will be keyed by their `msg_id`.
1518 """
1518 """
1519 if not isinstance(msg_ids, (list,tuple)):
1519 if not isinstance(msg_ids, (list,tuple)):
1520 msg_ids = [msg_ids]
1520 msg_ids = [msg_ids]
1521
1521
1522 theids = []
1522 theids = []
1523 for msg_id in msg_ids:
1523 for msg_id in msg_ids:
1524 if isinstance(msg_id, int):
1524 if isinstance(msg_id, int):
1525 msg_id = self.history[msg_id]
1525 msg_id = self.history[msg_id]
1526 if not isinstance(msg_id, string_types):
1526 if not isinstance(msg_id, string_types):
1527 raise TypeError("msg_ids must be str, not %r"%msg_id)
1527 raise TypeError("msg_ids must be str, not %r"%msg_id)
1528 theids.append(msg_id)
1528 theids.append(msg_id)
1529
1529
1530 completed = []
1530 completed = []
1531 local_results = {}
1531 local_results = {}
1532
1532
1533 # comment this block out to temporarily disable local shortcut:
1533 # comment this block out to temporarily disable local shortcut:
1534 for msg_id in theids:
1534 for msg_id in theids:
1535 if msg_id in self.results:
1535 if msg_id in self.results:
1536 completed.append(msg_id)
1536 completed.append(msg_id)
1537 local_results[msg_id] = self.results[msg_id]
1537 local_results[msg_id] = self.results[msg_id]
1538 theids.remove(msg_id)
1538 theids.remove(msg_id)
1539
1539
1540 if theids: # some not locally cached
1540 if theids: # some not locally cached
1541 content = dict(msg_ids=theids, status_only=status_only)
1541 content = dict(msg_ids=theids, status_only=status_only)
1542 msg = self.session.send(self._query_socket, "result_request", content=content)
1542 msg = self.session.send(self._query_socket, "result_request", content=content)
1543 zmq.select([self._query_socket], [], [])
1543 zmq.select([self._query_socket], [], [])
1544 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1544 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1545 if self.debug:
1545 if self.debug:
1546 pprint(msg)
1546 pprint(msg)
1547 content = msg['content']
1547 content = msg['content']
1548 if content['status'] != 'ok':
1548 if content['status'] != 'ok':
1549 raise self._unwrap_exception(content)
1549 raise self._unwrap_exception(content)
1550 buffers = msg['buffers']
1550 buffers = msg['buffers']
1551 else:
1551 else:
1552 content = dict(completed=[],pending=[])
1552 content = dict(completed=[],pending=[])
1553
1553
1554 content['completed'].extend(completed)
1554 content['completed'].extend(completed)
1555
1555
1556 if status_only:
1556 if status_only:
1557 return content
1557 return content
1558
1558
1559 failures = []
1559 failures = []
1560 # load cached results into result:
1560 # load cached results into result:
1561 content.update(local_results)
1561 content.update(local_results)
1562
1562
1563 # update cache with results:
1563 # update cache with results:
1564 for msg_id in sorted(theids):
1564 for msg_id in sorted(theids):
1565 if msg_id in content['completed']:
1565 if msg_id in content['completed']:
1566 rec = content[msg_id]
1566 rec = content[msg_id]
1567 parent = rec['header']
1567 parent = rec['header']
1568 header = rec['result_header']
1568 header = rec['result_header']
1569 rcontent = rec['result_content']
1569 rcontent = rec['result_content']
1570 iodict = rec['io']
1570 iodict = rec['io']
1571 if isinstance(rcontent, str):
1571 if isinstance(rcontent, str):
1572 rcontent = self.session.unpack(rcontent)
1572 rcontent = self.session.unpack(rcontent)
1573
1573
1574 md = self.metadata[msg_id]
1574 md = self.metadata[msg_id]
1575 md_msg = dict(
1575 md_msg = dict(
1576 content=rcontent,
1576 content=rcontent,
1577 parent_header=parent,
1577 parent_header=parent,
1578 header=header,
1578 header=header,
1579 metadata=rec['result_metadata'],
1579 metadata=rec['result_metadata'],
1580 )
1580 )
1581 md.update(self._extract_metadata(md_msg))
1581 md.update(self._extract_metadata(md_msg))
1582 if rec.get('received'):
1582 if rec.get('received'):
1583 md['received'] = rec['received']
1583 md['received'] = rec['received']
1584 md.update(iodict)
1584 md.update(iodict)
1585
1585
1586 if rcontent['status'] == 'ok':
1586 if rcontent['status'] == 'ok':
1587 if header['msg_type'] == 'apply_reply':
1587 if header['msg_type'] == 'apply_reply':
1588 res,buffers = serialize.unserialize_object(buffers)
1588 res,buffers = serialize.unserialize_object(buffers)
1589 elif header['msg_type'] == 'execute_reply':
1589 elif header['msg_type'] == 'execute_reply':
1590 res = ExecuteReply(msg_id, rcontent, md)
1590 res = ExecuteReply(msg_id, rcontent, md)
1591 else:
1591 else:
1592 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1592 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1593 else:
1593 else:
1594 res = self._unwrap_exception(rcontent)
1594 res = self._unwrap_exception(rcontent)
1595 failures.append(res)
1595 failures.append(res)
1596
1596
1597 self.results[msg_id] = res
1597 self.results[msg_id] = res
1598 content[msg_id] = res
1598 content[msg_id] = res
1599
1599
1600 if len(theids) == 1 and failures:
1600 if len(theids) == 1 and failures:
1601 raise failures[0]
1601 raise failures[0]
1602
1602
1603 error.collect_exceptions(failures, "result_status")
1603 error.collect_exceptions(failures, "result_status")
1604 return content
1604 return content
1605
1605
1606 @spin_first
1606 @spin_first
1607 def queue_status(self, targets='all', verbose=False):
1607 def queue_status(self, targets='all', verbose=False):
1608 """Fetch the status of engine queues.
1608 """Fetch the status of engine queues.
1609
1609
1610 Parameters
1610 Parameters
1611 ----------
1611 ----------
1612
1612
1613 targets : int/str/list of ints/strs
1613 targets : int/str/list of ints/strs
1614 the engines whose states are to be queried.
1614 the engines whose states are to be queried.
1615 default : all
1615 default : all
1616 verbose : bool
1616 verbose : bool
1617 Whether to return lengths only, or lists of ids for each element
1617 Whether to return lengths only, or lists of ids for each element
1618 """
1618 """
1619 if targets == 'all':
1619 if targets == 'all':
1620 # allow 'all' to be evaluated on the engine
1620 # allow 'all' to be evaluated on the engine
1621 engine_ids = None
1621 engine_ids = None
1622 else:
1622 else:
1623 engine_ids = self._build_targets(targets)[1]
1623 engine_ids = self._build_targets(targets)[1]
1624 content = dict(targets=engine_ids, verbose=verbose)
1624 content = dict(targets=engine_ids, verbose=verbose)
1625 self.session.send(self._query_socket, "queue_request", content=content)
1625 self.session.send(self._query_socket, "queue_request", content=content)
1626 idents,msg = self.session.recv(self._query_socket, 0)
1626 idents,msg = self.session.recv(self._query_socket, 0)
1627 if self.debug:
1627 if self.debug:
1628 pprint(msg)
1628 pprint(msg)
1629 content = msg['content']
1629 content = msg['content']
1630 status = content.pop('status')
1630 status = content.pop('status')
1631 if status != 'ok':
1631 if status != 'ok':
1632 raise self._unwrap_exception(content)
1632 raise self._unwrap_exception(content)
1633 content = rekey(content)
1633 content = rekey(content)
1634 if isinstance(targets, int):
1634 if isinstance(targets, int):
1635 return content[targets]
1635 return content[targets]
1636 else:
1636 else:
1637 return content
1637 return content
1638
1638
1639 def _build_msgids_from_target(self, targets=None):
1639 def _build_msgids_from_target(self, targets=None):
1640 """Build a list of msg_ids from the list of engine targets"""
1640 """Build a list of msg_ids from the list of engine targets"""
1641 if not targets: # needed as _build_targets otherwise uses all engines
1641 if not targets: # needed as _build_targets otherwise uses all engines
1642 return []
1642 return []
1643 target_ids = self._build_targets(targets)[0]
1643 target_ids = self._build_targets(targets)[0]
1644 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1644 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1645
1645
1646 def _build_msgids_from_jobs(self, jobs=None):
1646 def _build_msgids_from_jobs(self, jobs=None):
1647 """Build a list of msg_ids from "jobs" """
1647 """Build a list of msg_ids from "jobs" """
1648 if not jobs:
1648 if not jobs:
1649 return []
1649 return []
1650 msg_ids = []
1650 msg_ids = []
1651 if isinstance(jobs, string_types + (AsyncResult,)):
1651 if isinstance(jobs, string_types + (AsyncResult,)):
1652 jobs = [jobs]
1652 jobs = [jobs]
1653 bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult)), jobs)
1653 bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult)), jobs)
1654 if bad_ids:
1654 if bad_ids:
1655 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1655 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1656 for j in jobs:
1656 for j in jobs:
1657 if isinstance(j, AsyncResult):
1657 if isinstance(j, AsyncResult):
1658 msg_ids.extend(j.msg_ids)
1658 msg_ids.extend(j.msg_ids)
1659 else:
1659 else:
1660 msg_ids.append(j)
1660 msg_ids.append(j)
1661 return msg_ids
1661 return msg_ids
1662
1662
1663 def purge_local_results(self, jobs=[], targets=[]):
1663 def purge_local_results(self, jobs=[], targets=[]):
1664 """Clears the client caches of results and frees such memory.
1664 """Clears the client caches of results and frees such memory.
1665
1665
1666 Individual results can be purged by msg_id, or the entire
1666 Individual results can be purged by msg_id, or the entire
1667 history of specific targets can be purged.
1667 history of specific targets can be purged.
1668
1668
1669 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1669 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1670
1670
1671 The client must have no outstanding tasks before purging the caches.
1671 The client must have no outstanding tasks before purging the caches.
1672 Raises `AssertionError` if there are still outstanding tasks.
1672 Raises `AssertionError` if there are still outstanding tasks.
1673
1673
1674 After this call all `AsyncResults` are invalid and should be discarded.
1674 After this call all `AsyncResults` are invalid and should be discarded.
1675
1675
1676 If you must "reget" the results, you can still do so by using
1676 If you must "reget" the results, you can still do so by using
1677 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1677 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1678 redownload the results from the hub if they are still available
1678 redownload the results from the hub if they are still available
1679 (i.e `client.purge_hub_results(...)` has not been called.
1679 (i.e `client.purge_hub_results(...)` has not been called.
1680
1680
1681 Parameters
1681 Parameters
1682 ----------
1682 ----------
1683
1683
1684 jobs : str or list of str or AsyncResult objects
1684 jobs : str or list of str or AsyncResult objects
1685 the msg_ids whose results should be purged.
1685 the msg_ids whose results should be purged.
1686 targets : int/str/list of ints/strs
1686 targets : int/str/list of ints/strs
1687 The targets, by int_id, whose entire results are to be purged.
1687 The targets, by int_id, whose entire results are to be purged.
1688
1688
1689 default : None
1689 default : None
1690 """
1690 """
1691 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1691 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1692
1692
1693 if not targets and not jobs:
1693 if not targets and not jobs:
1694 raise ValueError("Must specify at least one of `targets` and `jobs`")
1694 raise ValueError("Must specify at least one of `targets` and `jobs`")
1695
1695
1696 if jobs == 'all':
1696 if jobs == 'all':
1697 self.results.clear()
1697 self.results.clear()
1698 self.metadata.clear()
1698 self.metadata.clear()
1699 return
1699 return
1700 else:
1700 else:
1701 msg_ids = []
1701 msg_ids = []
1702 msg_ids.extend(self._build_msgids_from_target(targets))
1702 msg_ids.extend(self._build_msgids_from_target(targets))
1703 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1703 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1704 map(self.results.pop, msg_ids)
1704 map(self.results.pop, msg_ids)
1705 map(self.metadata.pop, msg_ids)
1705 map(self.metadata.pop, msg_ids)
1706
1706
1707
1707
1708 @spin_first
1708 @spin_first
1709 def purge_hub_results(self, jobs=[], targets=[]):
1709 def purge_hub_results(self, jobs=[], targets=[]):
1710 """Tell the Hub to forget results.
1710 """Tell the Hub to forget results.
1711
1711
1712 Individual results can be purged by msg_id, or the entire
1712 Individual results can be purged by msg_id, or the entire
1713 history of specific targets can be purged.
1713 history of specific targets can be purged.
1714
1714
1715 Use `purge_results('all')` to scrub everything from the Hub's db.
1715 Use `purge_results('all')` to scrub everything from the Hub's db.
1716
1716
1717 Parameters
1717 Parameters
1718 ----------
1718 ----------
1719
1719
1720 jobs : str or list of str or AsyncResult objects
1720 jobs : str or list of str or AsyncResult objects
1721 the msg_ids whose results should be forgotten.
1721 the msg_ids whose results should be forgotten.
1722 targets : int/str/list of ints/strs
1722 targets : int/str/list of ints/strs
1723 The targets, by int_id, whose entire history is to be purged.
1723 The targets, by int_id, whose entire history is to be purged.
1724
1724
1725 default : None
1725 default : None
1726 """
1726 """
1727 if not targets and not jobs:
1727 if not targets and not jobs:
1728 raise ValueError("Must specify at least one of `targets` and `jobs`")
1728 raise ValueError("Must specify at least one of `targets` and `jobs`")
1729 if targets:
1729 if targets:
1730 targets = self._build_targets(targets)[1]
1730 targets = self._build_targets(targets)[1]
1731
1731
1732 # construct msg_ids from jobs
1732 # construct msg_ids from jobs
1733 if jobs == 'all':
1733 if jobs == 'all':
1734 msg_ids = jobs
1734 msg_ids = jobs
1735 else:
1735 else:
1736 msg_ids = self._build_msgids_from_jobs(jobs)
1736 msg_ids = self._build_msgids_from_jobs(jobs)
1737
1737
1738 content = dict(engine_ids=targets, msg_ids=msg_ids)
1738 content = dict(engine_ids=targets, msg_ids=msg_ids)
1739 self.session.send(self._query_socket, "purge_request", content=content)
1739 self.session.send(self._query_socket, "purge_request", content=content)
1740 idents, msg = self.session.recv(self._query_socket, 0)
1740 idents, msg = self.session.recv(self._query_socket, 0)
1741 if self.debug:
1741 if self.debug:
1742 pprint(msg)
1742 pprint(msg)
1743 content = msg['content']
1743 content = msg['content']
1744 if content['status'] != 'ok':
1744 if content['status'] != 'ok':
1745 raise self._unwrap_exception(content)
1745 raise self._unwrap_exception(content)
1746
1746
1747 def purge_results(self, jobs=[], targets=[]):
1747 def purge_results(self, jobs=[], targets=[]):
1748 """Clears the cached results from both the hub and the local client
1748 """Clears the cached results from both the hub and the local client
1749
1749
1750 Individual results can be purged by msg_id, or the entire
1750 Individual results can be purged by msg_id, or the entire
1751 history of specific targets can be purged.
1751 history of specific targets can be purged.
1752
1752
1753 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1753 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1754 the Client's db.
1754 the Client's db.
1755
1755
1756 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1756 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1757 the same arguments.
1757 the same arguments.
1758
1758
1759 Parameters
1759 Parameters
1760 ----------
1760 ----------
1761
1761
1762 jobs : str or list of str or AsyncResult objects
1762 jobs : str or list of str or AsyncResult objects
1763 the msg_ids whose results should be forgotten.
1763 the msg_ids whose results should be forgotten.
1764 targets : int/str/list of ints/strs
1764 targets : int/str/list of ints/strs
1765 The targets, by int_id, whose entire history is to be purged.
1765 The targets, by int_id, whose entire history is to be purged.
1766
1766
1767 default : None
1767 default : None
1768 """
1768 """
1769 self.purge_local_results(jobs=jobs, targets=targets)
1769 self.purge_local_results(jobs=jobs, targets=targets)
1770 self.purge_hub_results(jobs=jobs, targets=targets)
1770 self.purge_hub_results(jobs=jobs, targets=targets)
1771
1771
1772 def purge_everything(self):
1772 def purge_everything(self):
1773 """Clears all content from previous Tasks from both the hub and the local client
1773 """Clears all content from previous Tasks from both the hub and the local client
1774
1774
1775 In addition to calling `purge_results("all")` it also deletes the history and
1775 In addition to calling `purge_results("all")` it also deletes the history and
1776 other bookkeeping lists.
1776 other bookkeeping lists.
1777 """
1777 """
1778 self.purge_results("all")
1778 self.purge_results("all")
1779 self.history = []
1779 self.history = []
1780 self.session.digest_history.clear()
1780 self.session.digest_history.clear()
1781
1781
1782 @spin_first
1782 @spin_first
1783 def hub_history(self):
1783 def hub_history(self):
1784 """Get the Hub's history
1784 """Get the Hub's history
1785
1785
1786 Just like the Client, the Hub has a history, which is a list of msg_ids.
1786 Just like the Client, the Hub has a history, which is a list of msg_ids.
1787 This will contain the history of all clients, and, depending on configuration,
1787 This will contain the history of all clients, and, depending on configuration,
1788 may contain history across multiple cluster sessions.
1788 may contain history across multiple cluster sessions.
1789
1789
1790 Any msg_id returned here is a valid argument to `get_result`.
1790 Any msg_id returned here is a valid argument to `get_result`.
1791
1791
1792 Returns
1792 Returns
1793 -------
1793 -------
1794
1794
1795 msg_ids : list of strs
1795 msg_ids : list of strs
1796 list of all msg_ids, ordered by task submission time.
1796 list of all msg_ids, ordered by task submission time.
1797 """
1797 """
1798
1798
1799 self.session.send(self._query_socket, "history_request", content={})
1799 self.session.send(self._query_socket, "history_request", content={})
1800 idents, msg = self.session.recv(self._query_socket, 0)
1800 idents, msg = self.session.recv(self._query_socket, 0)
1801
1801
1802 if self.debug:
1802 if self.debug:
1803 pprint(msg)
1803 pprint(msg)
1804 content = msg['content']
1804 content = msg['content']
1805 if content['status'] != 'ok':
1805 if content['status'] != 'ok':
1806 raise self._unwrap_exception(content)
1806 raise self._unwrap_exception(content)
1807 else:
1807 else:
1808 return content['history']
1808 return content['history']
1809
1809
1810 @spin_first
1810 @spin_first
1811 def db_query(self, query, keys=None):
1811 def db_query(self, query, keys=None):
1812 """Query the Hub's TaskRecord database
1812 """Query the Hub's TaskRecord database
1813
1813
1814 This will return a list of task record dicts that match `query`
1814 This will return a list of task record dicts that match `query`
1815
1815
1816 Parameters
1816 Parameters
1817 ----------
1817 ----------
1818
1818
1819 query : mongodb query dict
1819 query : mongodb query dict
1820 The search dict. See mongodb query docs for details.
1820 The search dict. See mongodb query docs for details.
1821 keys : list of strs [optional]
1821 keys : list of strs [optional]
1822 The subset of keys to be returned. The default is to fetch everything but buffers.
1822 The subset of keys to be returned. The default is to fetch everything but buffers.
1823 'msg_id' will *always* be included.
1823 'msg_id' will *always* be included.
1824 """
1824 """
1825 if isinstance(keys, string_types):
1825 if isinstance(keys, string_types):
1826 keys = [keys]
1826 keys = [keys]
1827 content = dict(query=query, keys=keys)
1827 content = dict(query=query, keys=keys)
1828 self.session.send(self._query_socket, "db_request", content=content)
1828 self.session.send(self._query_socket, "db_request", content=content)
1829 idents, msg = self.session.recv(self._query_socket, 0)
1829 idents, msg = self.session.recv(self._query_socket, 0)
1830 if self.debug:
1830 if self.debug:
1831 pprint(msg)
1831 pprint(msg)
1832 content = msg['content']
1832 content = msg['content']
1833 if content['status'] != 'ok':
1833 if content['status'] != 'ok':
1834 raise self._unwrap_exception(content)
1834 raise self._unwrap_exception(content)
1835
1835
1836 records = content['records']
1836 records = content['records']
1837
1837
1838 buffer_lens = content['buffer_lens']
1838 buffer_lens = content['buffer_lens']
1839 result_buffer_lens = content['result_buffer_lens']
1839 result_buffer_lens = content['result_buffer_lens']
1840 buffers = msg['buffers']
1840 buffers = msg['buffers']
1841 has_bufs = buffer_lens is not None
1841 has_bufs = buffer_lens is not None
1842 has_rbufs = result_buffer_lens is not None
1842 has_rbufs = result_buffer_lens is not None
1843 for i,rec in enumerate(records):
1843 for i,rec in enumerate(records):
1844 # relink buffers
1844 # relink buffers
1845 if has_bufs:
1845 if has_bufs:
1846 blen = buffer_lens[i]
1846 blen = buffer_lens[i]
1847 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1847 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1848 if has_rbufs:
1848 if has_rbufs:
1849 blen = result_buffer_lens[i]
1849 blen = result_buffer_lens[i]
1850 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1850 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1851
1851
1852 return records
1852 return records
1853
1853
1854 __all__ = [ 'Client' ]
1854 __all__ = [ 'Client' ]
@@ -1,369 +1,369 b''
1 """some generic utilities for dealing with classes, urls, and serialization
1 """some generic utilities for dealing with classes, urls, and serialization
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23 import socket
23 import socket
24 import sys
24 import sys
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 try:
26 try:
27 from signal import SIGKILL
27 from signal import SIGKILL
28 except ImportError:
28 except ImportError:
29 SIGKILL=None
29 SIGKILL=None
30
30
31 try:
31 try:
32 import cPickle
32 import cPickle
33 pickle = cPickle
33 pickle = cPickle
34 except:
34 except:
35 cPickle = None
35 cPickle = None
36 import pickle
36 import pickle
37
37
38 # System library imports
38 # System library imports
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
43
43
44 # IPython imports
44 # IPython imports
45 from IPython.config.application import Application
45 from IPython.config.application import Application
46 from IPython.utils.localinterfaces import localhost, is_public_ip, public_ips
46 from IPython.utils.localinterfaces import localhost, is_public_ip, public_ips
47 from IPython.utils.py3compat import string_types
47 from IPython.utils.py3compat import string_types
48 from IPython.kernel.zmq.log import EnginePUBHandler
48 from IPython.kernel.zmq.log import EnginePUBHandler
49 from IPython.kernel.zmq.serialize import (
49 from IPython.kernel.zmq.serialize import (
50 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
50 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
51 )
51 )
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Classes
54 # Classes
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 class Namespace(dict):
57 class Namespace(dict):
58 """Subclass of dict for attribute access to keys."""
58 """Subclass of dict for attribute access to keys."""
59
59
60 def __getattr__(self, key):
60 def __getattr__(self, key):
61 """getattr aliased to getitem"""
61 """getattr aliased to getitem"""
62 if key in self.iterkeys():
62 if key in self:
63 return self[key]
63 return self[key]
64 else:
64 else:
65 raise NameError(key)
65 raise NameError(key)
66
66
67 def __setattr__(self, key, value):
67 def __setattr__(self, key, value):
68 """setattr aliased to setitem, with strict"""
68 """setattr aliased to setitem, with strict"""
69 if hasattr(dict, key):
69 if hasattr(dict, key):
70 raise KeyError("Cannot override dict keys %r"%key)
70 raise KeyError("Cannot override dict keys %r"%key)
71 self[key] = value
71 self[key] = value
72
72
73
73
74 class ReverseDict(dict):
74 class ReverseDict(dict):
75 """simple double-keyed subset of dict methods."""
75 """simple double-keyed subset of dict methods."""
76
76
77 def __init__(self, *args, **kwargs):
77 def __init__(self, *args, **kwargs):
78 dict.__init__(self, *args, **kwargs)
78 dict.__init__(self, *args, **kwargs)
79 self._reverse = dict()
79 self._reverse = dict()
80 for key, value in self.iteritems():
80 for key, value in self.iteritems():
81 self._reverse[value] = key
81 self._reverse[value] = key
82
82
83 def __getitem__(self, key):
83 def __getitem__(self, key):
84 try:
84 try:
85 return dict.__getitem__(self, key)
85 return dict.__getitem__(self, key)
86 except KeyError:
86 except KeyError:
87 return self._reverse[key]
87 return self._reverse[key]
88
88
89 def __setitem__(self, key, value):
89 def __setitem__(self, key, value):
90 if key in self._reverse:
90 if key in self._reverse:
91 raise KeyError("Can't have key %r on both sides!"%key)
91 raise KeyError("Can't have key %r on both sides!"%key)
92 dict.__setitem__(self, key, value)
92 dict.__setitem__(self, key, value)
93 self._reverse[value] = key
93 self._reverse[value] = key
94
94
95 def pop(self, key):
95 def pop(self, key):
96 value = dict.pop(self, key)
96 value = dict.pop(self, key)
97 self._reverse.pop(value)
97 self._reverse.pop(value)
98 return value
98 return value
99
99
100 def get(self, key, default=None):
100 def get(self, key, default=None):
101 try:
101 try:
102 return self[key]
102 return self[key]
103 except KeyError:
103 except KeyError:
104 return default
104 return default
105
105
106 #-----------------------------------------------------------------------------
106 #-----------------------------------------------------------------------------
107 # Functions
107 # Functions
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109
109
110 @decorator
110 @decorator
111 def log_errors(f, self, *args, **kwargs):
111 def log_errors(f, self, *args, **kwargs):
112 """decorator to log unhandled exceptions raised in a method.
112 """decorator to log unhandled exceptions raised in a method.
113
113
114 For use wrapping on_recv callbacks, so that exceptions
114 For use wrapping on_recv callbacks, so that exceptions
115 do not cause the stream to be closed.
115 do not cause the stream to be closed.
116 """
116 """
117 try:
117 try:
118 return f(self, *args, **kwargs)
118 return f(self, *args, **kwargs)
119 except Exception:
119 except Exception:
120 self.log.error("Uncaught exception in %r" % f, exc_info=True)
120 self.log.error("Uncaught exception in %r" % f, exc_info=True)
121
121
122
122
123 def is_url(url):
123 def is_url(url):
124 """boolean check for whether a string is a zmq url"""
124 """boolean check for whether a string is a zmq url"""
125 if '://' not in url:
125 if '://' not in url:
126 return False
126 return False
127 proto, addr = url.split('://', 1)
127 proto, addr = url.split('://', 1)
128 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
128 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
129 return False
129 return False
130 return True
130 return True
131
131
132 def validate_url(url):
132 def validate_url(url):
133 """validate a url for zeromq"""
133 """validate a url for zeromq"""
134 if not isinstance(url, string_types):
134 if not isinstance(url, string_types):
135 raise TypeError("url must be a string, not %r"%type(url))
135 raise TypeError("url must be a string, not %r"%type(url))
136 url = url.lower()
136 url = url.lower()
137
137
138 proto_addr = url.split('://')
138 proto_addr = url.split('://')
139 assert len(proto_addr) == 2, 'Invalid url: %r'%url
139 assert len(proto_addr) == 2, 'Invalid url: %r'%url
140 proto, addr = proto_addr
140 proto, addr = proto_addr
141 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
141 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
142
142
143 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
143 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
144 # author: Remi Sabourin
144 # author: Remi Sabourin
145 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
145 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
146
146
147 if proto == 'tcp':
147 if proto == 'tcp':
148 lis = addr.split(':')
148 lis = addr.split(':')
149 assert len(lis) == 2, 'Invalid url: %r'%url
149 assert len(lis) == 2, 'Invalid url: %r'%url
150 addr,s_port = lis
150 addr,s_port = lis
151 try:
151 try:
152 port = int(s_port)
152 port = int(s_port)
153 except ValueError:
153 except ValueError:
154 raise AssertionError("Invalid port %r in url: %r"%(port, url))
154 raise AssertionError("Invalid port %r in url: %r"%(port, url))
155
155
156 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
156 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
157
157
158 else:
158 else:
159 # only validate tcp urls currently
159 # only validate tcp urls currently
160 pass
160 pass
161
161
162 return True
162 return True
163
163
164
164
165 def validate_url_container(container):
165 def validate_url_container(container):
166 """validate a potentially nested collection of urls."""
166 """validate a potentially nested collection of urls."""
167 if isinstance(container, string_types):
167 if isinstance(container, string_types):
168 url = container
168 url = container
169 return validate_url(url)
169 return validate_url(url)
170 elif isinstance(container, dict):
170 elif isinstance(container, dict):
171 container = container.itervalues()
171 container = container.itervalues()
172
172
173 for element in container:
173 for element in container:
174 validate_url_container(element)
174 validate_url_container(element)
175
175
176
176
177 def split_url(url):
177 def split_url(url):
178 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
178 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
179 proto_addr = url.split('://')
179 proto_addr = url.split('://')
180 assert len(proto_addr) == 2, 'Invalid url: %r'%url
180 assert len(proto_addr) == 2, 'Invalid url: %r'%url
181 proto, addr = proto_addr
181 proto, addr = proto_addr
182 lis = addr.split(':')
182 lis = addr.split(':')
183 assert len(lis) == 2, 'Invalid url: %r'%url
183 assert len(lis) == 2, 'Invalid url: %r'%url
184 addr,s_port = lis
184 addr,s_port = lis
185 return proto,addr,s_port
185 return proto,addr,s_port
186
186
187 def disambiguate_ip_address(ip, location=None):
187 def disambiguate_ip_address(ip, location=None):
188 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
188 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
189 ones, based on the location (default interpretation of location is localhost)."""
189 ones, based on the location (default interpretation of location is localhost)."""
190 if ip in ('0.0.0.0', '*'):
190 if ip in ('0.0.0.0', '*'):
191 if location is None or is_public_ip(location) or not public_ips():
191 if location is None or is_public_ip(location) or not public_ips():
192 # If location is unspecified or cannot be determined, assume local
192 # If location is unspecified or cannot be determined, assume local
193 ip = localhost()
193 ip = localhost()
194 elif location:
194 elif location:
195 return location
195 return location
196 return ip
196 return ip
197
197
198 def disambiguate_url(url, location=None):
198 def disambiguate_url(url, location=None):
199 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
199 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
200 ones, based on the location (default interpretation is localhost).
200 ones, based on the location (default interpretation is localhost).
201
201
202 This is for zeromq urls, such as tcp://*:10101."""
202 This is for zeromq urls, such as tcp://*:10101."""
203 try:
203 try:
204 proto,ip,port = split_url(url)
204 proto,ip,port = split_url(url)
205 except AssertionError:
205 except AssertionError:
206 # probably not tcp url; could be ipc, etc.
206 # probably not tcp url; could be ipc, etc.
207 return url
207 return url
208
208
209 ip = disambiguate_ip_address(ip,location)
209 ip = disambiguate_ip_address(ip,location)
210
210
211 return "%s://%s:%s"%(proto,ip,port)
211 return "%s://%s:%s"%(proto,ip,port)
212
212
213
213
214 #--------------------------------------------------------------------------
214 #--------------------------------------------------------------------------
215 # helpers for implementing old MEC API via view.apply
215 # helpers for implementing old MEC API via view.apply
216 #--------------------------------------------------------------------------
216 #--------------------------------------------------------------------------
217
217
218 def interactive(f):
218 def interactive(f):
219 """decorator for making functions appear as interactively defined.
219 """decorator for making functions appear as interactively defined.
220 This results in the function being linked to the user_ns as globals()
220 This results in the function being linked to the user_ns as globals()
221 instead of the module globals().
221 instead of the module globals().
222 """
222 """
223 f.__module__ = '__main__'
223 f.__module__ = '__main__'
224 return f
224 return f
225
225
226 @interactive
226 @interactive
227 def _push(**ns):
227 def _push(**ns):
228 """helper method for implementing `client.push` via `client.apply`"""
228 """helper method for implementing `client.push` via `client.apply`"""
229 user_ns = globals()
229 user_ns = globals()
230 tmp = '_IP_PUSH_TMP_'
230 tmp = '_IP_PUSH_TMP_'
231 while tmp in user_ns:
231 while tmp in user_ns:
232 tmp = tmp + '_'
232 tmp = tmp + '_'
233 try:
233 try:
234 for name, value in ns.iteritems():
234 for name, value in ns.iteritems():
235 user_ns[tmp] = value
235 user_ns[tmp] = value
236 exec("%s = %s" % (name, tmp), user_ns)
236 exec("%s = %s" % (name, tmp), user_ns)
237 finally:
237 finally:
238 user_ns.pop(tmp, None)
238 user_ns.pop(tmp, None)
239
239
240 @interactive
240 @interactive
241 def _pull(keys):
241 def _pull(keys):
242 """helper method for implementing `client.pull` via `client.apply`"""
242 """helper method for implementing `client.pull` via `client.apply`"""
243 if isinstance(keys, (list,tuple, set)):
243 if isinstance(keys, (list,tuple, set)):
244 return map(lambda key: eval(key, globals()), keys)
244 return map(lambda key: eval(key, globals()), keys)
245 else:
245 else:
246 return eval(keys, globals())
246 return eval(keys, globals())
247
247
248 @interactive
248 @interactive
249 def _execute(code):
249 def _execute(code):
250 """helper method for implementing `client.execute` via `client.apply`"""
250 """helper method for implementing `client.execute` via `client.apply`"""
251 exec(code, globals())
251 exec(code, globals())
252
252
253 #--------------------------------------------------------------------------
253 #--------------------------------------------------------------------------
254 # extra process management utilities
254 # extra process management utilities
255 #--------------------------------------------------------------------------
255 #--------------------------------------------------------------------------
256
256
257 _random_ports = set()
257 _random_ports = set()
258
258
259 def select_random_ports(n):
259 def select_random_ports(n):
260 """Selects and return n random ports that are available."""
260 """Selects and return n random ports that are available."""
261 ports = []
261 ports = []
262 for i in range(n):
262 for i in range(n):
263 sock = socket.socket()
263 sock = socket.socket()
264 sock.bind(('', 0))
264 sock.bind(('', 0))
265 while sock.getsockname()[1] in _random_ports:
265 while sock.getsockname()[1] in _random_ports:
266 sock.close()
266 sock.close()
267 sock = socket.socket()
267 sock = socket.socket()
268 sock.bind(('', 0))
268 sock.bind(('', 0))
269 ports.append(sock)
269 ports.append(sock)
270 for i, sock in enumerate(ports):
270 for i, sock in enumerate(ports):
271 port = sock.getsockname()[1]
271 port = sock.getsockname()[1]
272 sock.close()
272 sock.close()
273 ports[i] = port
273 ports[i] = port
274 _random_ports.add(port)
274 _random_ports.add(port)
275 return ports
275 return ports
276
276
277 def signal_children(children):
277 def signal_children(children):
278 """Relay interupt/term signals to children, for more solid process cleanup."""
278 """Relay interupt/term signals to children, for more solid process cleanup."""
279 def terminate_children(sig, frame):
279 def terminate_children(sig, frame):
280 log = Application.instance().log
280 log = Application.instance().log
281 log.critical("Got signal %i, terminating children..."%sig)
281 log.critical("Got signal %i, terminating children..."%sig)
282 for child in children:
282 for child in children:
283 child.terminate()
283 child.terminate()
284
284
285 sys.exit(sig != SIGINT)
285 sys.exit(sig != SIGINT)
286 # sys.exit(sig)
286 # sys.exit(sig)
287 for sig in (SIGINT, SIGABRT, SIGTERM):
287 for sig in (SIGINT, SIGABRT, SIGTERM):
288 signal(sig, terminate_children)
288 signal(sig, terminate_children)
289
289
290 def generate_exec_key(keyfile):
290 def generate_exec_key(keyfile):
291 import uuid
291 import uuid
292 newkey = str(uuid.uuid4())
292 newkey = str(uuid.uuid4())
293 with open(keyfile, 'w') as f:
293 with open(keyfile, 'w') as f:
294 # f.write('ipython-key ')
294 # f.write('ipython-key ')
295 f.write(newkey+'\n')
295 f.write(newkey+'\n')
296 # set user-only RW permissions (0600)
296 # set user-only RW permissions (0600)
297 # this will have no effect on Windows
297 # this will have no effect on Windows
298 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
298 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
299
299
300
300
301 def integer_loglevel(loglevel):
301 def integer_loglevel(loglevel):
302 try:
302 try:
303 loglevel = int(loglevel)
303 loglevel = int(loglevel)
304 except ValueError:
304 except ValueError:
305 if isinstance(loglevel, str):
305 if isinstance(loglevel, str):
306 loglevel = getattr(logging, loglevel)
306 loglevel = getattr(logging, loglevel)
307 return loglevel
307 return loglevel
308
308
309 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
309 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
310 logger = logging.getLogger(logname)
310 logger = logging.getLogger(logname)
311 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
311 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
312 # don't add a second PUBHandler
312 # don't add a second PUBHandler
313 return
313 return
314 loglevel = integer_loglevel(loglevel)
314 loglevel = integer_loglevel(loglevel)
315 lsock = context.socket(zmq.PUB)
315 lsock = context.socket(zmq.PUB)
316 lsock.connect(iface)
316 lsock.connect(iface)
317 handler = handlers.PUBHandler(lsock)
317 handler = handlers.PUBHandler(lsock)
318 handler.setLevel(loglevel)
318 handler.setLevel(loglevel)
319 handler.root_topic = root
319 handler.root_topic = root
320 logger.addHandler(handler)
320 logger.addHandler(handler)
321 logger.setLevel(loglevel)
321 logger.setLevel(loglevel)
322
322
323 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
323 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
324 logger = logging.getLogger()
324 logger = logging.getLogger()
325 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
325 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
326 # don't add a second PUBHandler
326 # don't add a second PUBHandler
327 return
327 return
328 loglevel = integer_loglevel(loglevel)
328 loglevel = integer_loglevel(loglevel)
329 lsock = context.socket(zmq.PUB)
329 lsock = context.socket(zmq.PUB)
330 lsock.connect(iface)
330 lsock.connect(iface)
331 handler = EnginePUBHandler(engine, lsock)
331 handler = EnginePUBHandler(engine, lsock)
332 handler.setLevel(loglevel)
332 handler.setLevel(loglevel)
333 logger.addHandler(handler)
333 logger.addHandler(handler)
334 logger.setLevel(loglevel)
334 logger.setLevel(loglevel)
335 return logger
335 return logger
336
336
337 def local_logger(logname, loglevel=logging.DEBUG):
337 def local_logger(logname, loglevel=logging.DEBUG):
338 loglevel = integer_loglevel(loglevel)
338 loglevel = integer_loglevel(loglevel)
339 logger = logging.getLogger(logname)
339 logger = logging.getLogger(logname)
340 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
340 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
341 # don't add a second StreamHandler
341 # don't add a second StreamHandler
342 return
342 return
343 handler = logging.StreamHandler()
343 handler = logging.StreamHandler()
344 handler.setLevel(loglevel)
344 handler.setLevel(loglevel)
345 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
345 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
346 datefmt="%Y-%m-%d %H:%M:%S")
346 datefmt="%Y-%m-%d %H:%M:%S")
347 handler.setFormatter(formatter)
347 handler.setFormatter(formatter)
348
348
349 logger.addHandler(handler)
349 logger.addHandler(handler)
350 logger.setLevel(loglevel)
350 logger.setLevel(loglevel)
351 return logger
351 return logger
352
352
353 def set_hwm(sock, hwm=0):
353 def set_hwm(sock, hwm=0):
354 """set zmq High Water Mark on a socket
354 """set zmq High Water Mark on a socket
355
355
356 in a way that always works for various pyzmq / libzmq versions.
356 in a way that always works for various pyzmq / libzmq versions.
357 """
357 """
358 import zmq
358 import zmq
359
359
360 for key in ('HWM', 'SNDHWM', 'RCVHWM'):
360 for key in ('HWM', 'SNDHWM', 'RCVHWM'):
361 opt = getattr(zmq, key, None)
361 opt = getattr(zmq, key, None)
362 if opt is None:
362 if opt is None:
363 continue
363 continue
364 try:
364 try:
365 sock.setsockopt(opt, hwm)
365 sock.setsockopt(opt, hwm)
366 except zmq.ZMQError:
366 except zmq.ZMQError:
367 pass
367 pass
368
368
369 No newline at end of file
369
@@ -1,229 +1,229 b''
1 """Utilities to manipulate JSON objects.
1 """Utilities to manipulate JSON objects.
2 """
2 """
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2010-2011 The IPython Development Team
4 # Copyright (C) 2010-2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING.txt, distributed as part of this software.
7 # the file COPYING.txt, distributed as part of this software.
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9
9
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # stdlib
13 # stdlib
14 import math
14 import math
15 import re
15 import re
16 import types
16 import types
17 from datetime import datetime
17 from datetime import datetime
18
18
19 try:
19 try:
20 # base64.encodestring is deprecated in Python 3.x
20 # base64.encodestring is deprecated in Python 3.x
21 from base64 import encodebytes
21 from base64 import encodebytes
22 except ImportError:
22 except ImportError:
23 # Python 2.x
23 # Python 2.x
24 from base64 import encodestring as encodebytes
24 from base64 import encodestring as encodebytes
25
25
26 from IPython.utils import py3compat
26 from IPython.utils import py3compat
27 from IPython.utils.py3compat import string_types, unicode_type
27 from IPython.utils.py3compat import string_types, unicode_type
28 from IPython.utils.encoding import DEFAULT_ENCODING
28 from IPython.utils.encoding import DEFAULT_ENCODING
29 next_attr_name = '__next__' if py3compat.PY3 else 'next'
29 next_attr_name = '__next__' if py3compat.PY3 else 'next'
30
30
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32 # Globals and constants
32 # Globals and constants
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34
34
35 # timestamp formats
35 # timestamp formats
36 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
36 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
37 ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+)Z?([\+\-]\d{2}:?\d{2})?$")
37 ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+)Z?([\+\-]\d{2}:?\d{2})?$")
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Classes and functions
40 # Classes and functions
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 def rekey(dikt):
43 def rekey(dikt):
44 """Rekey a dict that has been forced to use str keys where there should be
44 """Rekey a dict that has been forced to use str keys where there should be
45 ints by json."""
45 ints by json."""
46 for k in dikt.iterkeys():
46 for k in dikt:
47 if isinstance(k, string_types):
47 if isinstance(k, string_types):
48 ik=fk=None
48 ik=fk=None
49 try:
49 try:
50 ik = int(k)
50 ik = int(k)
51 except ValueError:
51 except ValueError:
52 try:
52 try:
53 fk = float(k)
53 fk = float(k)
54 except ValueError:
54 except ValueError:
55 continue
55 continue
56 if ik is not None:
56 if ik is not None:
57 nk = ik
57 nk = ik
58 else:
58 else:
59 nk = fk
59 nk = fk
60 if nk in dikt:
60 if nk in dikt:
61 raise KeyError("already have key %r"%nk)
61 raise KeyError("already have key %r"%nk)
62 dikt[nk] = dikt.pop(k)
62 dikt[nk] = dikt.pop(k)
63 return dikt
63 return dikt
64
64
65
65
66 def extract_dates(obj):
66 def extract_dates(obj):
67 """extract ISO8601 dates from unpacked JSON"""
67 """extract ISO8601 dates from unpacked JSON"""
68 if isinstance(obj, dict):
68 if isinstance(obj, dict):
69 obj = dict(obj) # don't clobber
69 obj = dict(obj) # don't clobber
70 for k,v in obj.iteritems():
70 for k,v in obj.iteritems():
71 obj[k] = extract_dates(v)
71 obj[k] = extract_dates(v)
72 elif isinstance(obj, (list, tuple)):
72 elif isinstance(obj, (list, tuple)):
73 obj = [ extract_dates(o) for o in obj ]
73 obj = [ extract_dates(o) for o in obj ]
74 elif isinstance(obj, string_types):
74 elif isinstance(obj, string_types):
75 m = ISO8601_PAT.match(obj)
75 m = ISO8601_PAT.match(obj)
76 if m:
76 if m:
77 # FIXME: add actual timezone support
77 # FIXME: add actual timezone support
78 # this just drops the timezone info
78 # this just drops the timezone info
79 notz = m.groups()[0]
79 notz = m.groups()[0]
80 obj = datetime.strptime(notz, ISO8601)
80 obj = datetime.strptime(notz, ISO8601)
81 return obj
81 return obj
82
82
83 def squash_dates(obj):
83 def squash_dates(obj):
84 """squash datetime objects into ISO8601 strings"""
84 """squash datetime objects into ISO8601 strings"""
85 if isinstance(obj, dict):
85 if isinstance(obj, dict):
86 obj = dict(obj) # don't clobber
86 obj = dict(obj) # don't clobber
87 for k,v in obj.iteritems():
87 for k,v in obj.iteritems():
88 obj[k] = squash_dates(v)
88 obj[k] = squash_dates(v)
89 elif isinstance(obj, (list, tuple)):
89 elif isinstance(obj, (list, tuple)):
90 obj = [ squash_dates(o) for o in obj ]
90 obj = [ squash_dates(o) for o in obj ]
91 elif isinstance(obj, datetime):
91 elif isinstance(obj, datetime):
92 obj = obj.isoformat()
92 obj = obj.isoformat()
93 return obj
93 return obj
94
94
95 def date_default(obj):
95 def date_default(obj):
96 """default function for packing datetime objects in JSON."""
96 """default function for packing datetime objects in JSON."""
97 if isinstance(obj, datetime):
97 if isinstance(obj, datetime):
98 return obj.isoformat()
98 return obj.isoformat()
99 else:
99 else:
100 raise TypeError("%r is not JSON serializable"%obj)
100 raise TypeError("%r is not JSON serializable"%obj)
101
101
102
102
103 # constants for identifying png/jpeg data
103 # constants for identifying png/jpeg data
104 PNG = b'\x89PNG\r\n\x1a\n'
104 PNG = b'\x89PNG\r\n\x1a\n'
105 # front of PNG base64-encoded
105 # front of PNG base64-encoded
106 PNG64 = b'iVBORw0KG'
106 PNG64 = b'iVBORw0KG'
107 JPEG = b'\xff\xd8'
107 JPEG = b'\xff\xd8'
108 # front of JPEG base64-encoded
108 # front of JPEG base64-encoded
109 JPEG64 = b'/9'
109 JPEG64 = b'/9'
110
110
111 def encode_images(format_dict):
111 def encode_images(format_dict):
112 """b64-encodes images in a displaypub format dict
112 """b64-encodes images in a displaypub format dict
113
113
114 Perhaps this should be handled in json_clean itself?
114 Perhaps this should be handled in json_clean itself?
115
115
116 Parameters
116 Parameters
117 ----------
117 ----------
118
118
119 format_dict : dict
119 format_dict : dict
120 A dictionary of display data keyed by mime-type
120 A dictionary of display data keyed by mime-type
121
121
122 Returns
122 Returns
123 -------
123 -------
124
124
125 format_dict : dict
125 format_dict : dict
126 A copy of the same dictionary,
126 A copy of the same dictionary,
127 but binary image data ('image/png' or 'image/jpeg')
127 but binary image data ('image/png' or 'image/jpeg')
128 is base64-encoded.
128 is base64-encoded.
129
129
130 """
130 """
131 encoded = format_dict.copy()
131 encoded = format_dict.copy()
132
132
133 pngdata = format_dict.get('image/png')
133 pngdata = format_dict.get('image/png')
134 if isinstance(pngdata, bytes):
134 if isinstance(pngdata, bytes):
135 # make sure we don't double-encode
135 # make sure we don't double-encode
136 if not pngdata.startswith(PNG64):
136 if not pngdata.startswith(PNG64):
137 pngdata = encodebytes(pngdata)
137 pngdata = encodebytes(pngdata)
138 encoded['image/png'] = pngdata.decode('ascii')
138 encoded['image/png'] = pngdata.decode('ascii')
139
139
140 jpegdata = format_dict.get('image/jpeg')
140 jpegdata = format_dict.get('image/jpeg')
141 if isinstance(jpegdata, bytes):
141 if isinstance(jpegdata, bytes):
142 # make sure we don't double-encode
142 # make sure we don't double-encode
143 if not jpegdata.startswith(JPEG64):
143 if not jpegdata.startswith(JPEG64):
144 jpegdata = encodebytes(jpegdata)
144 jpegdata = encodebytes(jpegdata)
145 encoded['image/jpeg'] = jpegdata.decode('ascii')
145 encoded['image/jpeg'] = jpegdata.decode('ascii')
146
146
147 return encoded
147 return encoded
148
148
149
149
150 def json_clean(obj):
150 def json_clean(obj):
151 """Clean an object to ensure it's safe to encode in JSON.
151 """Clean an object to ensure it's safe to encode in JSON.
152
152
153 Atomic, immutable objects are returned unmodified. Sets and tuples are
153 Atomic, immutable objects are returned unmodified. Sets and tuples are
154 converted to lists, lists are copied and dicts are also copied.
154 converted to lists, lists are copied and dicts are also copied.
155
155
156 Note: dicts whose keys could cause collisions upon encoding (such as a dict
156 Note: dicts whose keys could cause collisions upon encoding (such as a dict
157 with both the number 1 and the string '1' as keys) will cause a ValueError
157 with both the number 1 and the string '1' as keys) will cause a ValueError
158 to be raised.
158 to be raised.
159
159
160 Parameters
160 Parameters
161 ----------
161 ----------
162 obj : any python object
162 obj : any python object
163
163
164 Returns
164 Returns
165 -------
165 -------
166 out : object
166 out : object
167
167
168 A version of the input which will not cause an encoding error when
168 A version of the input which will not cause an encoding error when
169 encoded as JSON. Note that this function does not *encode* its inputs,
169 encoded as JSON. Note that this function does not *encode* its inputs,
170 it simply sanitizes it so that there will be no encoding errors later.
170 it simply sanitizes it so that there will be no encoding errors later.
171
171
172 Examples
172 Examples
173 --------
173 --------
174 >>> json_clean(4)
174 >>> json_clean(4)
175 4
175 4
176 >>> json_clean(range(10))
176 >>> json_clean(range(10))
177 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
177 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
178 >>> sorted(json_clean(dict(x=1, y=2)).items())
178 >>> sorted(json_clean(dict(x=1, y=2)).items())
179 [('x', 1), ('y', 2)]
179 [('x', 1), ('y', 2)]
180 >>> sorted(json_clean(dict(x=1, y=2, z=[1,2,3])).items())
180 >>> sorted(json_clean(dict(x=1, y=2, z=[1,2,3])).items())
181 [('x', 1), ('y', 2), ('z', [1, 2, 3])]
181 [('x', 1), ('y', 2), ('z', [1, 2, 3])]
182 >>> json_clean(True)
182 >>> json_clean(True)
183 True
183 True
184 """
184 """
185 # types that are 'atomic' and ok in json as-is. bool doesn't need to be
185 # types that are 'atomic' and ok in json as-is. bool doesn't need to be
186 # listed explicitly because bools pass as int instances
186 # listed explicitly because bools pass as int instances
187 atomic_ok = (unicode_type, int, types.NoneType)
187 atomic_ok = (unicode_type, int, types.NoneType)
188
188
189 # containers that we need to convert into lists
189 # containers that we need to convert into lists
190 container_to_list = (tuple, set, types.GeneratorType)
190 container_to_list = (tuple, set, types.GeneratorType)
191
191
192 if isinstance(obj, float):
192 if isinstance(obj, float):
193 # cast out-of-range floats to their reprs
193 # cast out-of-range floats to their reprs
194 if math.isnan(obj) or math.isinf(obj):
194 if math.isnan(obj) or math.isinf(obj):
195 return repr(obj)
195 return repr(obj)
196 return obj
196 return obj
197
197
198 if isinstance(obj, atomic_ok):
198 if isinstance(obj, atomic_ok):
199 return obj
199 return obj
200
200
201 if isinstance(obj, bytes):
201 if isinstance(obj, bytes):
202 return obj.decode(DEFAULT_ENCODING, 'replace')
202 return obj.decode(DEFAULT_ENCODING, 'replace')
203
203
204 if isinstance(obj, container_to_list) or (
204 if isinstance(obj, container_to_list) or (
205 hasattr(obj, '__iter__') and hasattr(obj, next_attr_name)):
205 hasattr(obj, '__iter__') and hasattr(obj, next_attr_name)):
206 obj = list(obj)
206 obj = list(obj)
207
207
208 if isinstance(obj, list):
208 if isinstance(obj, list):
209 return [json_clean(x) for x in obj]
209 return [json_clean(x) for x in obj]
210
210
211 if isinstance(obj, dict):
211 if isinstance(obj, dict):
212 # First, validate that the dict won't lose data in conversion due to
212 # First, validate that the dict won't lose data in conversion due to
213 # key collisions after stringification. This can happen with keys like
213 # key collisions after stringification. This can happen with keys like
214 # True and 'true' or 1 and '1', which collide in JSON.
214 # True and 'true' or 1 and '1', which collide in JSON.
215 nkeys = len(obj)
215 nkeys = len(obj)
216 nkeys_collapsed = len(set(map(str, obj)))
216 nkeys_collapsed = len(set(map(str, obj)))
217 if nkeys != nkeys_collapsed:
217 if nkeys != nkeys_collapsed:
218 raise ValueError('dict can not be safely converted to JSON: '
218 raise ValueError('dict can not be safely converted to JSON: '
219 'key collision would lead to dropped values')
219 'key collision would lead to dropped values')
220 # If all OK, proceed by making the new dict that will be json-safe
220 # If all OK, proceed by making the new dict that will be json-safe
221 out = {}
221 out = {}
222 for k,v in obj.iteritems():
222 for k,v in obj.iteritems():
223 out[str(k)] = json_clean(v)
223 out[str(k)] = json_clean(v)
224 return out
224 return out
225
225
226 # If we get here, we don't know how to handle the object, so we just get
226 # If we get here, we don't know how to handle the object, so we just get
227 # its repr and return that. This will catch lambdas, open sockets, class
227 # its repr and return that. This will catch lambdas, open sockets, class
228 # objects, and any other complicated contraption that json can't encode
228 # objects, and any other complicated contraption that json can't encode
229 return repr(obj)
229 return repr(obj)
General Comments 0
You need to be logged in to leave comments. Login now