##// END OF EJS Templates
merge from trunk
Barry Wark -
r1441:d4c5ae87 merge
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,233 b''
1 # encoding: utf-8
2
3 """A parallelized version of Python's builtin map."""
4
5 __docformat__ = "restructuredtext en"
6
7 #----------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #----------------------------------------------------------------------------
13
14 #----------------------------------------------------------------------------
15 # Imports
16 #----------------------------------------------------------------------------
17
18 from types import FunctionType
19 from zope.interface import Interface, implements
20 from IPython.kernel.task import MapTask
21 from IPython.kernel.twistedutil import DeferredList, gatherBoth
22 from IPython.kernel.util import printer
23 from IPython.kernel.error import collect_exceptions
24
25 #----------------------------------------------------------------------------
26 # Code
27 #----------------------------------------------------------------------------
28
29 class IMapper(Interface):
30 """The basic interface for a Mapper.
31
32 This defines a generic interface for mapping. The idea of this is
33 similar to that of Python's builtin `map` function, which applies a function
34 elementwise to a sequence.
35 """
36
37 def map(func, *seqs):
38 """Do map in parallel.
39
40 Equivalent to map(func, *seqs) or:
41
42 [func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...]
43
44 :Parameters:
45 func : FunctionType
46 The function to apply to the sequence
47 sequences : tuple of iterables
48 A sequence of iterables that are used for sucessive function
49 arguments. This work just like map
50 """
51
52 class IMultiEngineMapperFactory(Interface):
53 """
54 An interface for something that creates `IMapper` instances.
55 """
56
57 def mapper(dist='b', targets='all', block=True):
58 """
59 Create an `IMapper` implementer with a given set of arguments.
60
61 The `IMapper` created using a multiengine controller is
62 not load balanced.
63 """
64
65 class ITaskMapperFactory(Interface):
66 """
67 An interface for something that creates `IMapper` instances.
68 """
69
70 def mapper(clear_before=False, clear_after=False, retries=0,
71 recovery_task=None, depend=None, block=True):
72 """
73 Create an `IMapper` implementer with a given set of arguments.
74
75 The `IMapper` created using a task controller is load balanced.
76
77 See the documentation for `IPython.kernel.task.BaseTask` for
78 documentation on the arguments to this method.
79 """
80
81
82 class MultiEngineMapper(object):
83 """
84 A Mapper for `IMultiEngine` implementers.
85 """
86
87 implements(IMapper)
88
89 def __init__(self, multiengine, dist='b', targets='all', block=True):
90 """
91 Create a Mapper for a multiengine.
92
93 The value of all arguments are used for all calls to `map`. This
94 class allows these arguemnts to be set for a series of map calls.
95
96 :Parameters:
97 multiengine : `IMultiEngine` implementer
98 The multiengine to use for running the map commands
99 dist : str
100 The type of decomposition to use. Only block ('b') is
101 supported currently
102 targets : (str, int, tuple of ints)
103 The engines to use in the map
104 block : boolean
105 Whether to block when the map is applied
106 """
107 self.multiengine = multiengine
108 self.dist = dist
109 self.targets = targets
110 self.block = block
111
112 def map(self, func, *sequences):
113 """
114 Apply func to *sequences elementwise. Like Python's builtin map.
115
116 This version is not load balanced.
117 """
118 max_len = max(len(s) for s in sequences)
119 for s in sequences:
120 if len(s)!=max_len:
121 raise ValueError('all sequences must have equal length')
122 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
123 return self.multiengine.raw_map(func, sequences, dist=self.dist,
124 targets=self.targets, block=self.block)
125
126 class TaskMapper(object):
127 """
128 Make an `ITaskController` look like an `IMapper`.
129
130 This class provides a load balanced version of `map`.
131 """
132
133 def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
134 recovery_task=None, depend=None, block=True):
135 """
136 Create a `IMapper` given a `TaskController` and arguments.
137
138 The additional arguments are those that are common to all types of
139 tasks and are described in the documentation for
140 `IPython.kernel.task.BaseTask`.
141
142 :Parameters:
143 task_controller : an `IBlockingTaskClient` implementer
144 The `TaskController` to use for calls to `map`
145 """
146 self.task_controller = task_controller
147 self.clear_before = clear_before
148 self.clear_after = clear_after
149 self.retries = retries
150 self.recovery_task = recovery_task
151 self.depend = depend
152 self.block = block
153
154 def map(self, func, *sequences):
155 """
156 Apply func to *sequences elementwise. Like Python's builtin map.
157
158 This version is load balanced.
159 """
160 max_len = max(len(s) for s in sequences)
161 for s in sequences:
162 if len(s)!=max_len:
163 raise ValueError('all sequences must have equal length')
164 task_args = zip(*sequences)
165 task_ids = []
166 dlist = []
167 for ta in task_args:
168 task = MapTask(func, ta, clear_before=self.clear_before,
169 clear_after=self.clear_after, retries=self.retries,
170 recovery_task=self.recovery_task, depend=self.depend)
171 dlist.append(self.task_controller.run(task))
172 dlist = gatherBoth(dlist, consumeErrors=1)
173 dlist.addCallback(collect_exceptions,'map')
174 if self.block:
175 def get_results(task_ids):
176 d = self.task_controller.barrier(task_ids)
177 d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1))
178 d.addCallback(collect_exceptions, 'map')
179 return d
180 dlist.addCallback(get_results)
181 return dlist
182
183 class SynchronousTaskMapper(object):
184 """
185 Make an `IBlockingTaskClient` look like an `IMapper`.
186
187 This class provides a load balanced version of `map`.
188 """
189
190 def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
191 recovery_task=None, depend=None, block=True):
192 """
193 Create a `IMapper` given a `IBlockingTaskClient` and arguments.
194
195 The additional arguments are those that are common to all types of
196 tasks and are described in the documentation for
197 `IPython.kernel.task.BaseTask`.
198
199 :Parameters:
200 task_controller : an `IBlockingTaskClient` implementer
201 The `TaskController` to use for calls to `map`
202 """
203 self.task_controller = task_controller
204 self.clear_before = clear_before
205 self.clear_after = clear_after
206 self.retries = retries
207 self.recovery_task = recovery_task
208 self.depend = depend
209 self.block = block
210
211 def map(self, func, *sequences):
212 """
213 Apply func to *sequences elementwise. Like Python's builtin map.
214
215 This version is load balanced.
216 """
217 max_len = max(len(s) for s in sequences)
218 for s in sequences:
219 if len(s)!=max_len:
220 raise ValueError('all sequences must have equal length')
221 task_args = zip(*sequences)
222 task_ids = []
223 for ta in task_args:
224 task = MapTask(func, ta, clear_before=self.clear_before,
225 clear_after=self.clear_after, retries=self.retries,
226 recovery_task=self.recovery_task, depend=self.depend)
227 task_ids.append(self.task_controller.run(task))
228 if self.block:
229 self.task_controller.barrier(task_ids)
230 task_results = [self.task_controller.get_task_result(tid) for tid in task_ids]
231 return task_results
232 else:
233 return task_ids No newline at end of file
@@ -0,0 +1,20 b''
1 # Set this prefix to where you want to install the plugin
2 PREFIX=~/usr/local
3 PREFIX=~/tmp/local
4
5 plugin: IPython_doctest_plugin.egg-info
6
7 test: plugin dtexample.py
8 nosetests -s --with-ipdoctest --doctest-tests --doctest-extension=txt \
9 dtexample.py test*.txt
10
11 deb: plugin dtexample.py
12 nosetests -vs --with-ipdoctest --doctest-tests --doctest-extension=txt \
13 test_combo.txt
14
15 IPython_doctest_plugin.egg-info: ipdoctest.py setup.py
16 python setup.py install --prefix=$(PREFIX)
17 touch $@
18
19 clean:
20 rm -rf IPython_doctest_plugin.egg-info *~ *pyc build/ dist/
@@ -0,0 +1,39 b''
1 =======================================================
2 Nose plugin with IPython and extension module support
3 =======================================================
4
5 This directory provides the key functionality for test support that IPython
6 needs as a nose plugin, which can be installed for use in projects other than
7 IPython.
8
9 The presence of a Makefile here is mostly for development and debugging
10 purposes as it only provides a few shorthand commands. You can manually
11 install the plugin by using standard Python procedures (``setup.py install``
12 with appropriate arguments).
13
14 To install the plugin using the Makefile, edit its first line to reflect where
15 you'd like the installation. If you want it system-wide, you may want to edit
16 the install line in the plugin target to use sudo and no prefix::
17
18 sudo python setup.py install
19
20 instead of the code using `--prefix` that's in there.
21
22 Once you've set the prefix, simply build/install the plugin with::
23
24 make
25
26 and run the tests with::
27
28 make test
29
30 You should see output similar to::
31
32 maqroll[plugin]> make test
33 nosetests -s --with-ipdoctest --doctest-tests dtexample.py
34 ..
35 ----------------------------------------------------------------------
36 Ran 2 tests in 0.016s
37
38 OK
39
@@ -0,0 +1,72 b''
1 """Simple example using doctests.
2
3 This file just contains doctests both using plain python and IPython prompts.
4 All tests should be loaded by nose.
5 """
6
7 def pyfunc():
8 """Some pure python tests...
9
10 >>> pyfunc()
11 'pyfunc'
12
13 >>> import os
14
15 >>> 2+3
16 5
17
18 >>> for i in range(3):
19 ... print i,
20 ... print i+1,
21 ...
22 0 1 1 2 2 3
23 """
24
25 return 'pyfunc'
26
27 def ipfunc():
28 """Some ipython tests...
29
30 In [1]: import os
31
32 In [2]: cd /
33 /
34
35 In [3]: 2+3
36 Out[3]: 5
37
38 In [26]: for i in range(3):
39 ....: print i,
40 ....: print i+1,
41 ....:
42 0 1 1 2 2 3
43
44
45 Examples that access the operating system work:
46
47 In [1]: !echo hello
48 hello
49
50 In [2]: !echo hello > /tmp/foo
51
52 In [3]: !cat /tmp/foo
53 hello
54
55 In [4]: rm -f /tmp/foo
56
57 It's OK to use '_' for the last result, but do NOT try to use IPython's
58 numbered history of _NN outputs, since those won't exist under the
59 doctest environment:
60
61 In [7]: 3+4
62 Out[7]: 7
63
64 In [8]: _+3
65 Out[8]: 10
66
67 In [9]: ipfunc()
68 Out[9]: 'ipfunc'
69 """
70
71 return 'ipfunc'
72
This diff has been collapsed as it changes many lines, (587 lines changed) Show them Hide them
@@ -0,0 +1,587 b''
1 """Nose Plugin that supports IPython doctests.
2
3 Limitations:
4
5 - When generating examples for use as doctests, make sure that you have
6 pretty-printing OFF. This can be done either by starting ipython with the
7 flag '--nopprint', by setting pprint to 0 in your ipythonrc file, or by
8 interactively disabling it with %Pprint. This is required so that IPython
9 output matches that of normal Python, which is used by doctest for internal
10 execution.
11
12 - Do not rely on specific prompt numbers for results (such as using
13 '_34==True', for example). For IPython tests run via an external process the
14 prompt numbers may be different, and IPython tests run as normal python code
15 won't even have these special _NN variables set at all.
16
17 - IPython functions that produce output as a side-effect of calling a system
18 process (e.g. 'ls') can be doc-tested, but they must be handled in an
19 external IPython process. Such doctests must be tagged with:
20
21 # ipdoctest: EXTERNAL
22
23 so that the testing machinery handles them differently. Since these are run
24 via pexpect in an external process, they can't deal with exceptions or other
25 fancy featurs of regular doctests. You must limit such tests to simple
26 matching of the output. For this reason, I recommend you limit these kinds
27 of doctests to features that truly require a separate process, and use the
28 normal IPython ones (which have all the features of normal doctests) for
29 everything else. See the examples at the bottom of this file for a
30 comparison of what can be done with both types.
31 """
32
33
34 #-----------------------------------------------------------------------------
35 # Module imports
36
37 # From the standard library
38 import __builtin__
39 import commands
40 import doctest
41 import inspect
42 import logging
43 import os
44 import re
45 import sys
46 import unittest
47
48 from inspect import getmodule
49
50 # Third-party modules
51 import nose.core
52
53 from nose.plugins import doctests, Plugin
54 from nose.util import anyp, getpackage, test_address, resolve_name, tolist
55
56 # Our own imports
57 #from extdoctest import ExtensionDoctest, DocTestFinder
58 #from dttools import DocTestFinder, DocTestCase
59 #-----------------------------------------------------------------------------
60 # Module globals and other constants
61
62 log = logging.getLogger(__name__)
63
64 ###########################################################################
65 # *** HACK ***
66 # We must start our own ipython object and heavily muck with it so that all the
67 # modifications IPython makes to system behavior don't send the doctest
68 # machinery into a fit. This code should be considered a gross hack, but it
69 # gets the job done.
70
71 def start_ipython():
72 """Start a global IPython shell, which we need for IPython-specific syntax.
73 """
74 import IPython
75
76 def xsys(cmd):
77 """Execute a command and print its output.
78
79 This is just a convenience function to replace the IPython system call
80 with one that is more doctest-friendly.
81 """
82 cmd = _ip.IP.var_expand(cmd,depth=1)
83 sys.stdout.write(commands.getoutput(cmd))
84 sys.stdout.flush()
85
86 # Store certain global objects that IPython modifies
87 _displayhook = sys.displayhook
88 _excepthook = sys.excepthook
89 _main = sys.modules.get('__main__')
90
91 # Start IPython instance
92 IPython.Shell.IPShell(['--classic','--noterm_title'])
93
94 # Deactivate the various python system hooks added by ipython for
95 # interactive convenience so we don't confuse the doctest system
96 sys.modules['__main__'] = _main
97 sys.displayhook = _displayhook
98 sys.excepthook = _excepthook
99
100 # So that ipython magics and aliases can be doctested (they work by making
101 # a call into a global _ip object)
102 _ip = IPython.ipapi.get()
103 __builtin__._ip = _ip
104
105 # Modify the IPython system call with one that uses getoutput, so that we
106 # can capture subcommands and print them to Python's stdout, otherwise the
107 # doctest machinery would miss them.
108 _ip.system = xsys
109
110 # The start call MUST be made here. I'm not sure yet why it doesn't work if
111 # it is made later, at plugin initialization time, but in all my tests, that's
112 # the case.
113 start_ipython()
114
115 # *** END HACK ***
116 ###########################################################################
117
118 #-----------------------------------------------------------------------------
119 # Modified version of the one in the stdlib, that fixes a python bug (doctests
120 # not found in extension modules, http://bugs.python.org/issue3158)
121 class DocTestFinder(doctest.DocTestFinder):
122
123 def _from_module(self, module, object):
124 """
125 Return true if the given object is defined in the given
126 module.
127 """
128 if module is None:
129 #print '_fm C1' # dbg
130 return True
131 elif inspect.isfunction(object):
132 #print '_fm C2' # dbg
133 return module.__dict__ is object.func_globals
134 elif inspect.isbuiltin(object):
135 #print '_fm C2-1' # dbg
136 return module.__name__ == object.__module__
137 elif inspect.isclass(object):
138 #print '_fm C3' # dbg
139 return module.__name__ == object.__module__
140 elif inspect.ismethod(object):
141 # This one may be a bug in cython that fails to correctly set the
142 # __module__ attribute of methods, but since the same error is easy
143 # to make by extension code writers, having this safety in place
144 # isn't such a bad idea
145 #print '_fm C3-1' # dbg
146 return module.__name__ == object.im_class.__module__
147 elif inspect.getmodule(object) is not None:
148 #print '_fm C4' # dbg
149 #print 'C4 mod',module,'obj',object # dbg
150 return module is inspect.getmodule(object)
151 elif hasattr(object, '__module__'):
152 #print '_fm C5' # dbg
153 return module.__name__ == object.__module__
154 elif isinstance(object, property):
155 #print '_fm C6' # dbg
156 return True # [XX] no way not be sure.
157 else:
158 raise ValueError("object must be a class or function")
159
160
161
162 def _find(self, tests, obj, name, module, source_lines, globs, seen):
163 """
164 Find tests for the given object and any contained objects, and
165 add them to `tests`.
166 """
167
168 doctest.DocTestFinder._find(self,tests, obj, name, module,
169 source_lines, globs, seen)
170
171 # Below we re-run pieces of the above method with manual modifications,
172 # because the original code is buggy and fails to correctly identify
173 # doctests in extension modules.
174
175 # Local shorthands
176 from inspect import isroutine, isclass, ismodule
177
178 # Look for tests in a module's contained objects.
179 if inspect.ismodule(obj) and self._recurse:
180 for valname, val in obj.__dict__.items():
181 valname1 = '%s.%s' % (name, valname)
182 if ( (isroutine(val) or isclass(val))
183 and self._from_module(module, val) ):
184
185 self._find(tests, val, valname1, module, source_lines,
186 globs, seen)
187
188
189 # Look for tests in a class's contained objects.
190 if inspect.isclass(obj) and self._recurse:
191 #print 'RECURSE into class:',obj # dbg
192 for valname, val in obj.__dict__.items():
193 #valname1 = '%s.%s' % (name, valname) # dbg
194 #print 'N',name,'VN:',valname,'val:',str(val)[:77] # dbg
195 # Special handling for staticmethod/classmethod.
196 if isinstance(val, staticmethod):
197 val = getattr(obj, valname)
198 if isinstance(val, classmethod):
199 val = getattr(obj, valname).im_func
200
201 # Recurse to methods, properties, and nested classes.
202 if ((inspect.isfunction(val) or inspect.isclass(val) or
203 inspect.ismethod(val) or
204 isinstance(val, property)) and
205 self._from_module(module, val)):
206 valname = '%s.%s' % (name, valname)
207 self._find(tests, val, valname, module, source_lines,
208 globs, seen)
209
210
211 class DocTestCase(doctests.DocTestCase):
212 """Proxy for DocTestCase: provides an address() method that
213 returns the correct address for the doctest case. Otherwise
214 acts as a proxy to the test case. To provide hints for address(),
215 an obj may also be passed -- this will be used as the test object
216 for purposes of determining the test address, if it is provided.
217 """
218
219 # doctests loaded via find(obj) omit the module name
220 # so we need to override id, __repr__ and shortDescription
221 # bonus: this will squash a 2.3 vs 2.4 incompatiblity
222 def id(self):
223 name = self._dt_test.name
224 filename = self._dt_test.filename
225 if filename is not None:
226 pk = getpackage(filename)
227 if pk is not None and not name.startswith(pk):
228 name = "%s.%s" % (pk, name)
229 return name
230
231
232 # Classes and functions
233
234 def is_extension_module(filename):
235 """Return whether the given filename is an extension module.
236
237 This simply checks that the extension is either .so or .pyd.
238 """
239 return os.path.splitext(filename)[1].lower() in ('.so','.pyd')
240
241
242 # A simple subclassing of the original with a different class name, so we can
243 # distinguish and treat differently IPython examples from pure python ones.
244 class IPExample(doctest.Example): pass
245
246 class IPExternalExample(doctest.Example):
247 """Doctest examples to be run in an external process."""
248
249 def __init__(self, source, want, exc_msg=None, lineno=0, indent=0,
250 options=None):
251 # Parent constructor
252 doctest.Example.__init__(self,source,want,exc_msg,lineno,indent,options)
253
254 # An EXTRA newline is needed to prevent pexpect hangs
255 self.source += '\n'
256
257 class IPDocTestParser(doctest.DocTestParser):
258 """
259 A class used to parse strings containing doctest examples.
260
261 Note: This is a version modified to properly recognize IPython input and
262 convert any IPython examples into valid Python ones.
263 """
264 # This regular expression is used to find doctest examples in a
265 # string. It defines three groups: `source` is the source code
266 # (including leading indentation and prompts); `indent` is the
267 # indentation of the first (PS1) line of the source code; and
268 # `want` is the expected output (including leading indentation).
269
270 # Classic Python prompts or default IPython ones
271 _PS1_PY = r'>>>'
272 _PS2_PY = r'\.\.\.'
273
274 _PS1_IP = r'In\ \[\d+\]:'
275 _PS2_IP = r'\ \ \ \.\.\.+:'
276
277 _RE_TPL = r'''
278 # Source consists of a PS1 line followed by zero or more PS2 lines.
279 (?P<source>
280 (?:^(?P<indent> [ ]*) (?P<ps1> %s) .*) # PS1 line
281 (?:\n [ ]* (?P<ps2> %s) .*)*) # PS2 lines
282 \n? # a newline
283 # Want consists of any non-blank lines that do not start with PS1.
284 (?P<want> (?:(?![ ]*$) # Not a blank line
285 (?![ ]*%s) # Not a line starting with PS1
286 (?![ ]*%s) # Not a line starting with PS2
287 .*$\n? # But any other line
288 )*)
289 '''
290
291 _EXAMPLE_RE_PY = re.compile( _RE_TPL % (_PS1_PY,_PS2_PY,_PS1_PY,_PS2_PY),
292 re.MULTILINE | re.VERBOSE)
293
294 _EXAMPLE_RE_IP = re.compile( _RE_TPL % (_PS1_IP,_PS2_IP,_PS1_IP,_PS2_IP),
295 re.MULTILINE | re.VERBOSE)
296
297 def ip2py(self,source):
298 """Convert input IPython source into valid Python."""
299 out = []
300 newline = out.append
301 for line in source.splitlines():
302 #newline(_ip.IPipython.prefilter(line,True))
303 newline(_ip.IP.prefilter(line,True))
304 newline('') # ensure a closing newline, needed by doctest
305 return '\n'.join(out)
306
307 def parse(self, string, name='<string>'):
308 """
309 Divide the given string into examples and intervening text,
310 and return them as a list of alternating Examples and strings.
311 Line numbers for the Examples are 0-based. The optional
312 argument `name` is a name identifying this string, and is only
313 used for error messages.
314 """
315
316 #print 'Parse string:\n',string # dbg
317
318 string = string.expandtabs()
319 # If all lines begin with the same indentation, then strip it.
320 min_indent = self._min_indent(string)
321 if min_indent > 0:
322 string = '\n'.join([l[min_indent:] for l in string.split('\n')])
323
324 output = []
325 charno, lineno = 0, 0
326
327 # Whether to convert the input from ipython to python syntax
328 ip2py = False
329 # Find all doctest examples in the string. First, try them as Python
330 # examples, then as IPython ones
331 terms = list(self._EXAMPLE_RE_PY.finditer(string))
332 if terms:
333 # Normal Python example
334 #print '-'*70 # dbg
335 #print 'PyExample, Source:\n',string # dbg
336 #print '-'*70 # dbg
337 Example = doctest.Example
338 else:
339 # It's an ipython example. Note that IPExamples are run
340 # in-process, so their syntax must be turned into valid python.
341 # IPExternalExamples are run out-of-process (via pexpect) so they
342 # don't need any filtering (a real ipython will be executing them).
343 terms = list(self._EXAMPLE_RE_IP.finditer(string))
344 if re.search(r'#\s*ipdoctest:\s*EXTERNAL',string):
345 #print '-'*70 # dbg
346 #print 'IPExternalExample, Source:\n',string # dbg
347 #print '-'*70 # dbg
348 Example = IPExternalExample
349 else:
350 #print '-'*70 # dbg
351 #print 'IPExample, Source:\n',string # dbg
352 #print '-'*70 # dbg
353 Example = IPExample
354 ip2py = True
355
356 for m in terms:
357 # Add the pre-example text to `output`.
358 output.append(string[charno:m.start()])
359 # Update lineno (lines before this example)
360 lineno += string.count('\n', charno, m.start())
361 # Extract info from the regexp match.
362 (source, options, want, exc_msg) = \
363 self._parse_example(m, name, lineno,ip2py)
364 if Example is IPExternalExample:
365 options[doctest.NORMALIZE_WHITESPACE] = True
366 want += '\n'
367 # Create an Example, and add it to the list.
368 if not self._IS_BLANK_OR_COMMENT(source):
369 #print 'Example source:', source # dbg
370 output.append(Example(source, want, exc_msg,
371 lineno=lineno,
372 indent=min_indent+len(m.group('indent')),
373 options=options))
374 # Update lineno (lines inside this example)
375 lineno += string.count('\n', m.start(), m.end())
376 # Update charno.
377 charno = m.end()
378 # Add any remaining post-example text to `output`.
379 output.append(string[charno:])
380
381 return output
382
383 def _parse_example(self, m, name, lineno,ip2py=False):
384 """
385 Given a regular expression match from `_EXAMPLE_RE` (`m`),
386 return a pair `(source, want)`, where `source` is the matched
387 example's source code (with prompts and indentation stripped);
388 and `want` is the example's expected output (with indentation
389 stripped).
390
391 `name` is the string's name, and `lineno` is the line number
392 where the example starts; both are used for error messages.
393
394 Optional:
395 `ip2py`: if true, filter the input via IPython to convert the syntax
396 into valid python.
397 """
398
399 # Get the example's indentation level.
400 indent = len(m.group('indent'))
401
402 # Divide source into lines; check that they're properly
403 # indented; and then strip their indentation & prompts.
404 source_lines = m.group('source').split('\n')
405
406 # We're using variable-length input prompts
407 ps1 = m.group('ps1')
408 ps2 = m.group('ps2')
409 ps1_len = len(ps1)
410
411 self._check_prompt_blank(source_lines, indent, name, lineno,ps1_len)
412 if ps2:
413 self._check_prefix(source_lines[1:], ' '*indent + ps2, name, lineno)
414
415 source = '\n'.join([sl[indent+ps1_len+1:] for sl in source_lines])
416
417 if ip2py:
418 # Convert source input from IPython into valid Python syntax
419 source = self.ip2py(source)
420
421 # Divide want into lines; check that it's properly indented; and
422 # then strip the indentation. Spaces before the last newline should
423 # be preserved, so plain rstrip() isn't good enough.
424 want = m.group('want')
425 want_lines = want.split('\n')
426 if len(want_lines) > 1 and re.match(r' *$', want_lines[-1]):
427 del want_lines[-1] # forget final newline & spaces after it
428 self._check_prefix(want_lines, ' '*indent, name,
429 lineno + len(source_lines))
430
431 # Remove ipython output prompt that might be present in the first line
432 want_lines[0] = re.sub(r'Out\[\d+\]: \s*?\n?','',want_lines[0])
433
434 want = '\n'.join([wl[indent:] for wl in want_lines])
435
436 # If `want` contains a traceback message, then extract it.
437 m = self._EXCEPTION_RE.match(want)
438 if m:
439 exc_msg = m.group('msg')
440 else:
441 exc_msg = None
442
443 # Extract options from the source.
444 options = self._find_options(source, name, lineno)
445
446 return source, options, want, exc_msg
447
448 def _check_prompt_blank(self, lines, indent, name, lineno, ps1_len):
449 """
450 Given the lines of a source string (including prompts and
451 leading indentation), check to make sure that every prompt is
452 followed by a space character. If any line is not followed by
453 a space character, then raise ValueError.
454
455 Note: IPython-modified version which takes the input prompt length as a
456 parameter, so that prompts of variable length can be dealt with.
457 """
458 space_idx = indent+ps1_len
459 min_len = space_idx+1
460 for i, line in enumerate(lines):
461 if len(line) >= min_len and line[space_idx] != ' ':
462 raise ValueError('line %r of the docstring for %s '
463 'lacks blank after %s: %r' %
464 (lineno+i+1, name,
465 line[indent:space_idx], line))
466
467 SKIP = doctest.register_optionflag('SKIP')
468
469 ###########################################################################
470
471 class DocFileCase(doctest.DocFileCase):
472 """Overrides to provide filename
473 """
474 def address(self):
475 return (self._dt_test.filename, None, None)
476
477
478 class ExtensionDoctest(doctests.Doctest):
479 """Nose Plugin that supports doctests in extension modules.
480 """
481 name = 'extdoctest' # call nosetests with --with-extdoctest
482 enabled = True
483
484 def options(self, parser, env=os.environ):
485 Plugin.options(self, parser, env)
486
487 def configure(self, options, config):
488 Plugin.configure(self, options, config)
489 self.doctest_tests = options.doctest_tests
490 self.extension = tolist(options.doctestExtension)
491 self.finder = DocTestFinder()
492 self.parser = doctest.DocTestParser()
493
494
495 def loadTestsFromExtensionModule(self,filename):
496 bpath,mod = os.path.split(filename)
497 modname = os.path.splitext(mod)[0]
498 try:
499 sys.path.append(bpath)
500 module = __import__(modname)
501 tests = list(self.loadTestsFromModule(module))
502 finally:
503 sys.path.pop()
504 return tests
505
506 def loadTestsFromFile(self, filename):
507 if is_extension_module(filename):
508 for t in self.loadTestsFromExtensionModule(filename):
509 yield t
510 else:
511 ## for t in list(doctests.Doctest.loadTestsFromFile(self,filename)):
512 ## yield t
513 pass
514
515 if self.extension and anyp(filename.endswith, self.extension):
516 #print 'lTF',filename # dbg
517 name = os.path.basename(filename)
518 dh = open(filename)
519 try:
520 doc = dh.read()
521 finally:
522 dh.close()
523 test = self.parser.get_doctest(
524 doc, globs={'__file__': filename}, name=name,
525 filename=filename, lineno=0)
526 if test.examples:
527 #print 'FileCase:',test.examples # dbg
528 yield DocFileCase(test)
529 else:
530 yield False # no tests to load
531
532
533 def wantFile(self,filename):
534 """Return whether the given filename should be scanned for tests.
535
536 Modified version that accepts extension modules as valid containers for
537 doctests.
538 """
539 #print 'Filename:',filename # dbg
540
541 if is_extension_module(filename):
542 return True
543 else:
544 return doctests.Doctest.wantFile(self,filename)
545
546 # NOTE: the method below is a *copy* of the one in the nose doctests
547 # plugin, but we have to replicate it here in order to have it resolve the
548 # DocTestCase (last line) to our local copy, since the nose plugin doesn't
549 # provide a public hook for what TestCase class to use. The alternative
550 # would be to monkeypatch doctest in the stdlib, but that's ugly and
551 # brittle, since a change in plugin load order can break it. So for now,
552 # we just paste this in here, inelegant as this may be.
553
554 def loadTestsFromModule(self, module):
555 #print 'lTM',module # dbg
556
557 if not self.matches(module.__name__):
558 log.debug("Doctest doesn't want module %s", module)
559 return
560 tests = self.finder.find(module)
561 if not tests:
562 return
563 tests.sort()
564 module_file = module.__file__
565 if module_file[-4:] in ('.pyc', '.pyo'):
566 module_file = module_file[:-1]
567 for test in tests:
568 if not test.examples:
569 continue
570 if not test.filename:
571 test.filename = module_file
572 yield DocTestCase(test)
573
574 class IPythonDoctest(ExtensionDoctest):
575 """Nose Plugin that supports doctests in extension modules.
576 """
577 name = 'ipdoctest' # call nosetests with --with-ipdoctest
578 enabled = True
579
580 def configure(self, options, config):
581
582 Plugin.configure(self, options, config)
583 self.doctest_tests = options.doctest_tests
584 self.extension = tolist(options.doctestExtension)
585 self.parser = IPDocTestParser()
586 #self.finder = DocTestFinder(parser=IPDocTestParser())
587 self.finder = DocTestFinder(parser=self.parser)
@@ -0,0 +1,18 b''
1 #!/usr/bin/env python
2 """Nose-based test runner.
3 """
4
5 from nose.core import main
6 from nose.plugins.builtin import plugins
7 from nose.plugins.doctests import Doctest
8
9 import ipdoctest
10 from ipdoctest import IPDocTestRunner
11
12 if __name__ == '__main__':
13 print 'WARNING: this code is incomplete!'
14 print
15
16 pp = [x() for x in plugins] # activate all builtin plugins first
17 main(testRunner=IPDocTestRunner(),
18 plugins=pp+[ipdoctest.IPythonDoctest(),Doctest()])
@@ -0,0 +1,18 b''
1 #!/usr/bin/env python
2 """A Nose plugin to support IPython doctests.
3 """
4
5 from setuptools import setup
6
7 setup(name='IPython doctest plugin',
8 version='0.1',
9 author='The IPython Team',
10 description = 'Nose plugin to load IPython-extended doctests',
11 license = 'LGPL',
12 py_modules = ['ipdoctest'],
13 entry_points = {
14 'nose.plugins.0.10': ['ipdoctest = ipdoctest:IPythonDoctest',
15 'extdoctest = ipdoctest:ExtensionDoctest',
16 ],
17 },
18 )
@@ -0,0 +1,36 b''
1 =======================
2 Combo testing example
3 =======================
4
5 This is a simple example that mixes ipython doctests::
6
7 In [1]: import code
8
9 In [2]: 2**12
10 Out[2]: 4096
11
12 with command-line example information that does *not* get executed::
13
14 $ mpirun -n 4 ipengine --controller-port=10000 --controller-ip=host0
15
16 and with literal examples of Python source code::
17
18 controller = dict(host='myhost',
19 engine_port=None, # default is 10105
20 control_port=None,
21 )
22
23 # keys are hostnames, values are the number of engine on that host
24 engines = dict(node1=2,
25 node2=2,
26 node3=2,
27 node3=2,
28 )
29
30 # Force failure to detect that this test is being run.
31 1/0
32
33 These source code examples are executed but no output is compared at all. An
34 error or failure is reported only if an exception is raised.
35
36 NOTE: the execution of pure python blocks is not yet working!
@@ -0,0 +1,24 b''
1 =====================================
2 Tests in example form - pure python
3 =====================================
4
5 This file contains doctest examples embedded as code blocks, using normal
6 Python prompts. See the accompanying file for similar examples using IPython
7 prompts (you can't mix both types within one file). The following will be run
8 as a test::
9
10 >>> 1+1
11 2
12 >>> print "hello"
13 hello
14
15 More than one example works::
16
17 >>> s="Hello World"
18
19 >>> s.upper()
20 'HELLO WORLD'
21
22 but you should note that the *entire* test file is considered to be a single
23 test. Individual code blocks that fail are printed separately as ``example
24 failures``, but the whole file is still counted and reported as one test.
@@ -0,0 +1,30 b''
1 =================================
2 Tests in example form - IPython
3 =================================
4
5 You can write text files with examples that use IPython prompts (as long as you
6 use the nose ipython doctest plugin), but you can not mix and match prompt
7 styles in a single file. That is, you either use all ``>>>`` prompts or all
8 IPython-style prompts. Your test suite *can* have both types, you just need to
9 put each type of example in a separate. Using IPython prompts, you can paste
10 directly from your session::
11
12 In [5]: s="Hello World"
13
14 In [6]: s.upper()
15 Out[6]: 'HELLO WORLD'
16
17 Another example::
18
19 In [8]: 1+3
20 Out[8]: 4
21
22 Just like in IPython docstrings, you can use all IPython syntax and features::
23
24 In [9]: !echo "hello"
25 hello
26
27 In [10]: a='hi'
28
29 In [11]: !echo $a
30 hi
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
@@ -0,0 +1,23 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 # A super simple example showing how to use all of this in a fully
5 # asynchronous manner. The TaskClient also works in this mode.
6
7 from twisted.internet import reactor, defer
8 from IPython.kernel import asyncclient
9
10 def printer(r):
11 print r
12 return r
13
14 def submit(client):
15 d = client.push(dict(a=5, b='asdf', c=[1,2,3]),targets=0,block=True)
16 d.addCallback(lambda _: client.pull(('a','b','c'),targets=0,block=True))
17 d.addBoth(printer)
18 d.addCallback(lambda _: reactor.stop())
19
20 d = asyncclient.get_multiengine_client()
21 d.addCallback(submit)
22
23 reactor.run() No newline at end of file
@@ -0,0 +1,32 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 # This example shows how the AsynTaskClient can be used
5 # This example is currently broken
6
7 from twisted.internet import reactor, defer
8 from IPython.kernel import asyncclient
9
10 mec = asyncclient.AsyncMultiEngineClient(('localhost', 10105))
11 tc = asyncclient.AsyncTaskClient(('localhost',10113))
12
13 cmd1 = """\
14 a = 5
15 b = 10*d
16 c = a*b*d
17 """
18
19 t1 = asyncclient.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
20
21 d = mec.push(dict(d=30))
22
23 def raise_and_print(tr):
24 tr.raiseException()
25 print "a, b: ", tr.ns.a, tr.ns.b
26 return tr
27
28 d.addCallback(lambda _: tc.run(t1))
29 d.addCallback(lambda tid: tc.get_task_result(tid,block=True))
30 d.addCallback(raise_and_print)
31 d.addCallback(lambda _: reactor.stop())
32 reactor.run()
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
@@ -2728,8 +2728,7 b' Defaulting color scheme to \'NoColor\'"""'
2728 os.chdir(os.path.expanduser(ps))
2728 os.chdir(os.path.expanduser(ps))
2729 if self.shell.rc.term_title:
2729 if self.shell.rc.term_title:
2730 #print 'set term title:',self.shell.rc.term_title # dbg
2730 #print 'set term title:',self.shell.rc.term_title # dbg
2731 ttitle = 'IPy ' + abbrev_cwd()
2731 platutils.set_term_title('IPy ' + abbrev_cwd())
2732 platutils.set_term_title(ttitle)
2733 except OSError:
2732 except OSError:
2734 print sys.exc_info()[1]
2733 print sys.exc_info()[1]
2735 else:
2734 else:
@@ -3195,7 +3194,7 b' Defaulting color scheme to \'NoColor\'"""'
3195 exec b in self.user_ns
3194 exec b in self.user_ns
3196 self.user_ns['pasted_block'] = b
3195 self.user_ns['pasted_block'] = b
3197 else:
3196 else:
3198 self.user_ns[par] = block
3197 self.user_ns[par] = SList(block.splitlines())
3199 print "Block assigned to '%s'" % par
3198 print "Block assigned to '%s'" % par
3200
3199
3201 def magic_quickref(self,arg):
3200 def magic_quickref(self,arg):
@@ -22,15 +22,19 b" name = 'ipython'"
22 # because bdist_rpm does not accept dashes (an RPM) convention, and
22 # because bdist_rpm does not accept dashes (an RPM) convention, and
23 # bdist_deb does not accept underscores (a Debian convention).
23 # bdist_deb does not accept underscores (a Debian convention).
24
24
25 revision = '1016'
25 development = True # change this to False to do a release
26 version_base = '0.9.0'
26 branch = 'ipython'
27 branch = 'ipython'
28 revision = '1016'
27
29
30 if development:
28 if branch == 'ipython':
31 if branch == 'ipython':
29 version = '0.9.0.bzr.r' + revision
32 version = '%s.bzr.r%s' % (version_base, revision)
33 else:
34 version = '%s.bzr.r%s.%s' % (version_base, revision, branch)
30 else:
35 else:
31 version = '0.9.0.bzr.r%s.%s' % (revision,branch)
36 version = version_base
32
37
33 # version = '0.8.4'
34
38
35 description = "Tools for interactive development in Python."
39 description = "Tools for interactive development in Python."
36
40
@@ -53,7 +53,7 b' globsyntax = """\\'
53 - readme*, exclude files ending with .bak
53 - readme*, exclude files ending with .bak
54 !.svn/ !.hg/ !*_Data/ rec:.
54 !.svn/ !.hg/ !*_Data/ rec:.
55 - Skip .svn, .hg, foo_Data dirs (and their subdirs) in recurse.
55 - Skip .svn, .hg, foo_Data dirs (and their subdirs) in recurse.
56 Trailing / is the key, \ does not work!
56 Trailing / is the key, \ does not work! Use !.*/ for all hidden.
57 dir:foo
57 dir:foo
58 - the directory foo if it exists (not files in foo)
58 - the directory foo if it exists (not files in foo)
59 dir:*
59 dir:*
@@ -63,13 +63,16 b' globsyntax = """\\'
63 foo.py is *not* included twice.
63 foo.py is *not* included twice.
64 @filelist.txt
64 @filelist.txt
65 - All files listed in 'filelist.txt' file, on separate lines.
65 - All files listed in 'filelist.txt' file, on separate lines.
66 "cont:class \wak:" rec:*.py
67 - Match files containing regexp. Applies to subsequent files.
68 note quotes because of whitespace.
66 """
69 """
67
70
68
71
69 __version__ = "0.2"
72 __version__ = "0.2"
70
73
71
74
72 import os,glob,fnmatch,sys
75 import os,glob,fnmatch,sys,re
73 from sets import Set as set
76 from sets import Set as set
74
77
75
78
@@ -84,21 +87,34 b' def expand(flist,exp_dirs = False):'
84
87
85 """
88 """
86 if isinstance(flist, basestring):
89 if isinstance(flist, basestring):
87 flist = flist.split()
90 import shlex
91 flist = shlex.split(flist)
88 done_set = set()
92 done_set = set()
89 denied_set = set()
93 denied_set = set()
94 cont_set = set()
95 cur_rejected_dirs = set()
90
96
91 def recfind(p, pats = ["*"]):
97 def recfind(p, pats = ["*"]):
92 denied_dirs = ["*" + d+"*" for d in denied_set if d.endswith("/")]
98 denied_dirs = [os.path.dirname(d) for d in denied_set if d.endswith("/")]
93 #print "de", denied_dirs
94 for (dp,dnames,fnames) in os.walk(p):
99 for (dp,dnames,fnames) in os.walk(p):
95 # see if we should ignore the whole directory
100 # see if we should ignore the whole directory
96 dp_norm = dp.replace("\\","/") + "/"
101 dp_norm = dp.replace("\\","/") + "/"
97 deny = False
102 deny = False
103 # do not traverse under already rejected dirs
104 for d in cur_rejected_dirs:
105 if dp.startswith(d):
106 deny = True
107 break
108 if deny:
109 continue
110
111
98 #print "dp",dp
112 #print "dp",dp
113 bname = os.path.basename(dp)
99 for deny_pat in denied_dirs:
114 for deny_pat in denied_dirs:
100 if fnmatch.fnmatch( dp_norm, deny_pat):
115 if fnmatch.fnmatch( bname, deny_pat):
101 deny = True
116 deny = True
117 cur_rejected_dirs.add(dp)
102 break
118 break
103 if deny:
119 if deny:
104 continue
120 continue
@@ -124,6 +140,17 b' def expand(flist,exp_dirs = False):'
124 if fnmatch.fnmatch(os.path.basename(p), deny_pat):
140 if fnmatch.fnmatch(os.path.basename(p), deny_pat):
125 deny = True
141 deny = True
126 break
142 break
143 if cont_set:
144 try:
145 cont = open(p).read()
146 except IOError:
147 # deny
148 continue
149 for pat in cont_set:
150 if not re.search(pat,cont, re.IGNORECASE):
151 deny = True
152 break
153
127 if not deny:
154 if not deny:
128 yield it
155 yield it
129 return
156 return
@@ -158,7 +185,8 b' def expand(flist,exp_dirs = False):'
158 # glob only dirs
185 # glob only dirs
159 elif ent.lower().startswith('dir:'):
186 elif ent.lower().startswith('dir:'):
160 res.extend(once_filter(filter(os.path.isdir,glob.glob(ent[4:]))))
187 res.extend(once_filter(filter(os.path.isdir,glob.glob(ent[4:]))))
161
188 elif ent.lower().startswith('cont:'):
189 cont_set.add(ent[5:])
162 # get all files in the specified dir
190 # get all files in the specified dir
163 elif os.path.isdir(ent) and exp_dirs:
191 elif os.path.isdir(ent) and exp_dirs:
164 res.extend(once_filter(filter(os.path.isfile,glob.glob(ent + os.sep+"*"))))
192 res.extend(once_filter(filter(os.path.isfile,glob.glob(ent + os.sep+"*"))))
1 NO CONTENT: modified file
NO CONTENT: modified file
@@ -1137,14 +1137,41 b' class SList(list):'
1137 res.append(" ".join(lineparts))
1137 res.append(" ".join(lineparts))
1138
1138
1139 return res
1139 return res
1140 def sort(self,field= None, nums = False):
1141 """ sort by specified fields (see fields())
1140
1142
1143 Example::
1144 a.sort(1, nums = True)
1141
1145
1146 Sorts a by second field, in numerical order (so that 21 > 3)
1147
1148 """
1149
1150 #decorate, sort, undecorate
1151 if field is not None:
1152 dsu = [[SList([line]).fields(field), line] for line in self]
1153 else:
1154 dsu = [[line, line] for line in self]
1155 if nums:
1156 for i in range(len(dsu)):
1157 numstr = "".join([ch for ch in dsu[i][0] if ch.isdigit()])
1158 try:
1159 n = int(numstr)
1160 except ValueError:
1161 n = 0;
1162 dsu[i][0] = n
1142
1163
1143
1164
1165 dsu.sort()
1166 return SList([t[1] for t in dsu])
1144
1167
1145 def print_slist(arg):
1168 def print_slist(arg):
1146 """ Prettier (non-repr-like) and more informative printer for SList """
1169 """ Prettier (non-repr-like) and more informative printer for SList """
1147 print "SList (.p, .n, .l, .s, .grep(), .fields() available). Value:"
1170 print "SList (.p, .n, .l, .s, .grep(), .fields(), sort() available):"
1171 if hasattr(arg, 'hideonce') and arg.hideonce:
1172 arg.hideonce = False
1173 return
1174
1148 nlprint(arg)
1175 nlprint(arg)
1149
1176
1150 print_slist = result_display.when_type(SList)(print_slist)
1177 print_slist = result_display.when_type(SList)(print_slist)
@@ -27,7 +27,7 b' from IPython.kernel import codeutil'
27 from IPython.kernel.clientconnector import ClientConnector
27 from IPython.kernel.clientconnector import ClientConnector
28
28
29 # Other things that the user will need
29 # Other things that the user will need
30 from IPython.kernel.task import Task
30 from IPython.kernel.task import MapTask, StringTask
31 from IPython.kernel.error import CompositeError
31 from IPython.kernel.error import CompositeError
32
32
33 #-------------------------------------------------------------------------------
33 #-------------------------------------------------------------------------------
@@ -44,7 +44,7 b' from IPython.kernel import codeutil'
44 import IPython.kernel.magic
44 import IPython.kernel.magic
45
45
46 # Other things that the user will need
46 # Other things that the user will need
47 from IPython.kernel.task import Task
47 from IPython.kernel.task import MapTask, StringTask
48 from IPython.kernel.error import CompositeError
48 from IPython.kernel.error import CompositeError
49
49
50 #-------------------------------------------------------------------------------
50 #-------------------------------------------------------------------------------
@@ -144,35 +144,37 b' class RemoteMultiEngine(RemoteContextBase):'
144
144
145
145
146 # XXX - Temporary hackish testing, we'll move this into proper tests right
146 # XXX - Temporary hackish testing, we'll move this into proper tests right
147 # away
147 # away. This has been commented out as it doesn't run under Python 2.4
148
148 # because of the usage of the with statement below. We need to protect
149 if __name__ == '__main__':
149 # such things with a try:except.
150
150
151 # XXX - for now, we need a running cluster to be started separately. The
151 # if __name__ == '__main__':
152 # daemon work is almost finished, and will make much of this unnecessary.
152 #
153 from IPython.kernel import client
153 # # XXX - for now, we need a running cluster to be started separately. The
154 mec = client.MultiEngineClient(('127.0.0.1',10105))
154 # # daemon work is almost finished, and will make much of this unnecessary.
155
155 # from IPython.kernel import client
156 try:
156 # mec = client.MultiEngineClient(('127.0.0.1',10105))
157 mec.get_ids()
157 #
158 except ConnectionRefusedError:
158 # try:
159 import os, time
159 # mec.get_ids()
160 os.system('ipcluster -n 2 &')
160 # except ConnectionRefusedError:
161 time.sleep(2)
161 # import os, time
162 mec = client.MultiEngineClient(('127.0.0.1',10105))
162 # os.system('ipcluster -n 2 &')
163
163 # time.sleep(2)
164 mec.block = False
164 # mec = client.MultiEngineClient(('127.0.0.1',10105))
165
165 #
166 import itertools
166 # mec.block = False
167 c = itertools.count()
167 #
168
168 # import itertools
169 parallel = RemoteMultiEngine(mec)
169 # c = itertools.count()
170
170 #
171 with parallel as pr:
171 # parallel = RemoteMultiEngine(mec)
172 # A comment
172 #
173 remote() # this means the code below only runs remotely
173 # with parallel as pr:
174 print 'Hello remote world'
174 # # A comment
175 x = 3.14
175 # remote() # this means the code below only runs remotely
176 # Comments are OK
176 # print 'Hello remote world'
177 # Even misindented.
177 # x = 3.14
178 y = x+1
178 # # Comments are OK
179 # # Even misindented.
180 # y = x+1
@@ -79,7 +79,7 b" def magic_px(self,parameter_s=''):"
79 except AttributeError:
79 except AttributeError:
80 print NO_ACTIVE_CONTROLLER
80 print NO_ACTIVE_CONTROLLER
81 else:
81 else:
82 print "Executing command on Controller"
82 print "Parallel execution on engines: %s" % activeController.targets
83 result = activeController.execute(parameter_s)
83 result = activeController.execute(parameter_s)
84 return result
84 return result
85
85
@@ -115,7 +115,7 b' class RoundRobinMap(Map):'
115 # result.append(concat[i:totalLength:maxPartitionLength])
115 # result.append(concat[i:totalLength:maxPartitionLength])
116 return self.concatenate(listOfPartitions)
116 return self.concatenate(listOfPartitions)
117
117
118 styles = {'basic':Map}
118 dists = {'b':Map}
119
119
120
120
121
121
@@ -653,67 +653,55 b' components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMul'
653 class IMultiEngineCoordinator(Interface):
653 class IMultiEngineCoordinator(Interface):
654 """Methods that work on multiple engines explicitly."""
654 """Methods that work on multiple engines explicitly."""
655
655
656 def scatter(key, seq, style='basic', flatten=False, targets='all'):
656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 """Partition and distribute a sequence to targets.
657 """Partition and distribute a sequence to targets."""
658
658
659 :Parameters:
659 def gather(key, dist='b', targets='all'):
660 key : str
660 """Gather object key from targets."""
661 The variable name to call the scattered sequence.
662 seq : list, tuple, array
663 The sequence to scatter. The type should be preserved.
664 style : string
665 A specification of how the sequence is partitioned. Currently
666 only 'basic' is implemented.
667 flatten : boolean
668 Should single element sequences be converted to scalars.
669 """
670
661
671 def gather(key, style='basic', targets='all'):
662 def raw_map(func, seqs, dist='b', targets='all'):
672 """Gather object key from targets.
673
674 :Parameters:
675 key : string
676 The name of a sequence on the targets to gather.
677 style : string
678 A specification of how the sequence is partitioned. Currently
679 only 'basic' is implemented.
680 """
663 """
664 A parallelized version of Python's builtin `map` function.
681
665
682 def map(func, seq, style='basic', targets='all'):
666 This has a slightly different syntax than the builtin `map`.
683 """A parallelized version of Python's builtin map.
667 This is needed because we need to have keyword arguments and thus
668 can't use *args to capture all the sequences. Instead, they must
669 be passed in a list or tuple.
684
670
685 This function implements the following pattern:
671 The equivalence is:
686
672
687 1. The sequence seq is scattered to the given targets.
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
688 2. map(functionSource, seq) is called on each engine.
689 3. The resulting sequences are gathered back to the local machine.
690
674
691 :Parameters:
675 Most users will want to use parallel functions or the `mapper`
692 targets : int, list or 'all'
676 and `map` methods for an API that follows that of the builtin
693 The engine ids the action will apply to. Call `get_ids` to see
677 `map`.
694 a list of currently available engines.
695 func : str, function
696 An actual function object or a Python string that names a
697 callable defined on the engines.
698 seq : list, tuple or numpy array
699 The local sequence to be scattered.
700 style : str
701 Only 'basic' is supported for now.
702
703 :Returns: A list of len(seq) with functionSource called on each element
704 of seq.
705
706 Example
707 =======
708
709 >>> rc.mapAll('lambda x: x*x', range(10000))
710 [0,2,4,9,25,36,...]
711 """
678 """
712
679
713
680
714 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
715 """Methods that work on multiple engines explicitly."""
682 """Methods that work on multiple engines explicitly."""
716 pass
683
684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 """Partition and distribute a sequence to targets."""
686
687 def gather(key, dist='b', targets='all', block=True):
688 """Gather object key from targets"""
689
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 """
692 A parallelized version of Python's builtin map.
693
694 This has a slightly different syntax than the builtin `map`.
695 This is needed because we need to have keyword arguments and thus
696 can't use *args to capture all the sequences. Instead, they must
697 be passed in a list or tuple.
698
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700
701 Most users will want to use parallel functions or the `mapper`
702 and `map` methods for an API that follows that of the builtin
703 `map`.
704 """
717
705
718
706
719 #-------------------------------------------------------------------------------
707 #-------------------------------------------------------------------------------
@@ -722,46 +710,31 b' class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):'
722
710
723 class IMultiEngineExtras(Interface):
711 class IMultiEngineExtras(Interface):
724
712
725 def zip_pull(targets, *keys):
713 def zip_pull(targets, keys):
726 """Pull, but return results in a different format from `pull`.
714 """
715 Pull, but return results in a different format from `pull`.
727
716
728 This method basically returns zip(pull(targets, *keys)), with a few
717 This method basically returns zip(pull(targets, *keys)), with a few
729 edge cases handled differently. Users of chainsaw will find this format
718 edge cases handled differently. Users of chainsaw will find this format
730 familiar.
719 familiar.
731
732 :Parameters:
733 targets : int, list or 'all'
734 The engine ids the action will apply to. Call `get_ids` to see
735 a list of currently available engines.
736 keys: list or tuple of str
737 A list of variable names as string of the Python objects to be pulled
738 back to the client.
739
740 :Returns: A list of pulled Python objects for each target.
741 """
720 """
742
721
743 def run(targets, fname):
722 def run(targets, fname):
744 """Run a .py file on targets.
723 """Run a .py file on targets."""
745
746 :Parameters:
747 targets : int, list or 'all'
748 The engine ids the action will apply to. Call `get_ids` to see
749 a list of currently available engines.
750 fname : str
751 The filename of a .py file on the local system to be sent to and run
752 on the engines.
753 block : boolean
754 Should I block or not. If block=True, wait for the action to
755 complete and return the result. If block=False, return a
756 `PendingResult` object that can be used to later get the
757 result. If block is not specified, the block attribute
758 will be used instead.
759 """
760
724
761
725
762 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
763 pass
727 def zip_pull(targets, keys, block=True):
728 """
729 Pull, but return results in a different format from `pull`.
730
731 This method basically returns zip(pull(targets, *keys)), with a few
732 edge cases handled differently. Users of chainsaw will find this format
733 familiar.
734 """
764
735
736 def run(targets, fname, block=True):
737 """Run a .py file on targets."""
765
738
766 #-------------------------------------------------------------------------------
739 #-------------------------------------------------------------------------------
767 # The full MultiEngine interface
740 # The full MultiEngine interface
@@ -31,6 +31,11 b' from IPython.ColorANSI import TermColors'
31 from IPython.kernel.twistedutil import blockingCallFromThread
31 from IPython.kernel.twistedutil import blockingCallFromThread
32 from IPython.kernel import error
32 from IPython.kernel import error
33 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.mapper import (
35 MultiEngineMapper,
36 IMultiEngineMapperFactory,
37 IMapper
38 )
34 from IPython.kernel import map as Map
39 from IPython.kernel import map as Map
35 from IPython.kernel import multiengine as me
40 from IPython.kernel import multiengine as me
36 from IPython.kernel.multiengine import (IFullMultiEngine,
41 from IPython.kernel.multiengine import (IFullMultiEngine,
@@ -186,6 +191,10 b' class ResultList(list):'
186
191
187 def __repr__(self):
192 def __repr__(self):
188 output = []
193 output = []
194 # These colored prompts were not working on Windows
195 if sys.platform == 'win32':
196 blue = normal = red = green = ''
197 else:
189 blue = TermColors.Blue
198 blue = TermColors.Blue
190 normal = TermColors.Normal
199 normal = TermColors.Normal
191 red = TermColors.Red
200 red = TermColors.Red
@@ -295,34 +304,6 b' class InteractiveMultiEngineClient(object):'
295 """Return the number of available engines."""
304 """Return the number of available engines."""
296 return len(self.get_ids())
305 return len(self.get_ids())
297
306
298 def parallelize(self, func, targets=None, block=None):
299 """Build a `ParallelFunction` object for functionName on engines.
300
301 The returned object will implement a parallel version of functionName
302 that takes a local sequence as its only argument and calls (in
303 parallel) functionName on each element of that sequence. The
304 `ParallelFunction` object has a `targets` attribute that controls
305 which engines the function is run on.
306
307 :Parameters:
308 targets : int, list or 'all'
309 The engine ids the action will apply to. Call `get_ids` to see
310 a list of currently available engines.
311 functionName : str
312 A Python string that names a callable defined on the engines.
313
314 :Returns: A `ParallelFunction` object.
315
316 Examples
317 ========
318
319 >>> psin = rc.parallelize('all','lambda x:sin(x)')
320 >>> psin(range(10000))
321 [0,2,4,9,25,36,...]
322 """
323 targets, block = self._findTargetsAndBlock(targets, block)
324 return ParallelFunction(func, self, targets, block)
325
326 #---------------------------------------------------------------------------
307 #---------------------------------------------------------------------------
327 # Make this a context manager for with
308 # Make this a context manager for with
328 #---------------------------------------------------------------------------
309 #---------------------------------------------------------------------------
@@ -422,7 +403,11 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
422 engine, run code on it, etc.
403 engine, run code on it, etc.
423 """
404 """
424
405
425 implements(IFullBlockingMultiEngineClient)
406 implements(
407 IFullBlockingMultiEngineClient,
408 IMultiEngineMapperFactory,
409 IMapper
410 )
426
411
427 def __init__(self, smultiengine):
412 def __init__(self, smultiengine):
428 self.smultiengine = smultiengine
413 self.smultiengine = smultiengine
@@ -779,29 +764,100 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
779 # IMultiEngineCoordinator
764 # IMultiEngineCoordinator
780 #---------------------------------------------------------------------------
765 #---------------------------------------------------------------------------
781
766
782 def scatter(self, key, seq, style='basic', flatten=False, targets=None, block=None):
767 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
783 """
768 """
784 Partition a Python sequence and send the partitions to a set of engines.
769 Partition a Python sequence and send the partitions to a set of engines.
785 """
770 """
786 targets, block = self._findTargetsAndBlock(targets, block)
771 targets, block = self._findTargetsAndBlock(targets, block)
787 return self._blockFromThread(self.smultiengine.scatter, key, seq,
772 return self._blockFromThread(self.smultiengine.scatter, key, seq,
788 style, flatten, targets=targets, block=block)
773 dist, flatten, targets=targets, block=block)
789
774
790 def gather(self, key, style='basic', targets=None, block=None):
775 def gather(self, key, dist='b', targets=None, block=None):
791 """
776 """
792 Gather a partitioned sequence on a set of engines as a single local seq.
777 Gather a partitioned sequence on a set of engines as a single local seq.
793 """
778 """
794 targets, block = self._findTargetsAndBlock(targets, block)
779 targets, block = self._findTargetsAndBlock(targets, block)
795 return self._blockFromThread(self.smultiengine.gather, key, style,
780 return self._blockFromThread(self.smultiengine.gather, key, dist,
796 targets=targets, block=block)
781 targets=targets, block=block)
797
782
798 def map(self, func, seq, style='basic', targets=None, block=None):
783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
784 """
785 A parallelized version of Python's builtin map.
786
787 This has a slightly different syntax than the builtin `map`.
788 This is needed because we need to have keyword arguments and thus
789 can't use *args to capture all the sequences. Instead, they must
790 be passed in a list or tuple.
791
792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793
794 Most users will want to use parallel functions or the `mapper`
795 and `map` methods for an API that follows that of the builtin
796 `map`.
797 """
798 targets, block = self._findTargetsAndBlock(targets, block)
799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
800 dist, targets=targets, block=block)
801
802 def map(self, func, *sequences):
803 """
804 A parallel version of Python's builtin `map` function.
805
806 This method applies a function to sequences of arguments. It
807 follows the same syntax as the builtin `map`.
808
809 This method creates a mapper objects by calling `self.mapper` with
810 no arguments and then uses that mapper to do the mapping. See
811 the documentation of `mapper` for more details.
812 """
813 return self.mapper().map(func, *sequences)
814
815 def mapper(self, dist='b', targets='all', block=None):
816 """
817 Create a mapper object that has a `map` method.
818
819 This method returns an object that implements the `IMapper`
820 interface. This method is a factory that is used to control how
821 the map happens.
822
823 :Parameters:
824 dist : str
825 What decomposition to use, 'b' is the only one supported
826 currently
827 targets : str, int, sequence of ints
828 Which engines to use for the map
829 block : boolean
830 Should calls to `map` block or not
831 """
832 return MultiEngineMapper(self, dist, targets, block)
833
834 def parallel(self, dist='b', targets=None, block=None):
799 """
835 """
800 A parallelized version of Python's builtin map
836 A decorator that turns a function into a parallel function.
837
838 This can be used as:
839
840 @parallel()
841 def f(x, y)
842 ...
843
844 f(range(10), range(10))
845
846 This causes f(0,0), f(1,1), ... to be called in parallel.
847
848 :Parameters:
849 dist : str
850 What decomposition to use, 'b' is the only one supported
851 currently
852 targets : str, int, sequence of ints
853 Which engines to use for the map
854 block : boolean
855 Should calls to `map` block or not
801 """
856 """
802 targets, block = self._findTargetsAndBlock(targets, block)
857 targets, block = self._findTargetsAndBlock(targets, block)
803 return self._blockFromThread(self.smultiengine.map, func, seq,
858 mapper = self.mapper(dist, targets, block)
804 style, targets=targets, block=block)
859 pf = ParallelFunction(mapper)
860 return pf
805
861
806 #---------------------------------------------------------------------------
862 #---------------------------------------------------------------------------
807 # IMultiEngineExtras
863 # IMultiEngineExtras
@@ -29,6 +29,12 b' from foolscap import Referenceable'
29 from IPython.kernel import error
29 from IPython.kernel import error
30 from IPython.kernel.util import printer
30 from IPython.kernel.util import printer
31 from IPython.kernel import map as Map
31 from IPython.kernel import map as Map
32 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.mapper import (
34 MultiEngineMapper,
35 IMultiEngineMapperFactory,
36 IMapper
37 )
32 from IPython.kernel.twistedutil import gatherBoth
38 from IPython.kernel.twistedutil import gatherBoth
33 from IPython.kernel.multiengine import (MultiEngine,
39 from IPython.kernel.multiengine import (MultiEngine,
34 IMultiEngine,
40 IMultiEngine,
@@ -280,7 +286,12 b' components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,'
280
286
281 class FCFullSynchronousMultiEngineClient(object):
287 class FCFullSynchronousMultiEngineClient(object):
282
288
283 implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor)
289 implements(
290 IFullSynchronousMultiEngine,
291 IBlockingClientAdaptor,
292 IMultiEngineMapperFactory,
293 IMapper
294 )
284
295
285 def __init__(self, remote_reference):
296 def __init__(self, remote_reference):
286 self.remote_reference = remote_reference
297 self.remote_reference = remote_reference
@@ -475,7 +486,7 b' class FCFullSynchronousMultiEngineClient(object):'
475 d.addCallback(create_targets)
486 d.addCallback(create_targets)
476 return d
487 return d
477
488
478 def scatter(self, key, seq, style='basic', flatten=False, targets='all', block=True):
489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
479
490
480 # Note: scatter and gather handle pending deferreds locally through self.pdm.
491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
481 # This enables us to collect a bunch fo deferred ids and make a secondary
492 # This enables us to collect a bunch fo deferred ids and make a secondary
@@ -483,7 +494,7 b' class FCFullSynchronousMultiEngineClient(object):'
483 # difficult to get right though.
494 # difficult to get right though.
484 def do_scatter(engines):
495 def do_scatter(engines):
485 nEngines = len(engines)
496 nEngines = len(engines)
486 mapClass = Map.styles[style]
497 mapClass = Map.dists[dist]
487 mapObject = mapClass()
498 mapObject = mapClass()
488 d_list = []
499 d_list = []
489 # Loop through and push to each engine in non-blocking mode.
500 # Loop through and push to each engine in non-blocking mode.
@@ -541,7 +552,7 b' class FCFullSynchronousMultiEngineClient(object):'
541 d.addCallback(do_scatter)
552 d.addCallback(do_scatter)
542 return d
553 return d
543
554
544 def gather(self, key, style='basic', targets='all', block=True):
555 def gather(self, key, dist='b', targets='all', block=True):
545
556
546 # Note: scatter and gather handle pending deferreds locally through self.pdm.
557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
547 # This enables us to collect a bunch fo deferred ids and make a secondary
558 # This enables us to collect a bunch fo deferred ids and make a secondary
@@ -549,7 +560,7 b' class FCFullSynchronousMultiEngineClient(object):'
549 # difficult to get right though.
560 # difficult to get right though.
550 def do_gather(engines):
561 def do_gather(engines):
551 nEngines = len(engines)
562 nEngines = len(engines)
552 mapClass = Map.styles[style]
563 mapClass = Map.dists[dist]
553 mapObject = mapClass()
564 mapObject = mapClass()
554 d_list = []
565 d_list = []
555 # Loop through and push to each engine in non-blocking mode.
566 # Loop through and push to each engine in non-blocking mode.
@@ -604,25 +615,103 b' class FCFullSynchronousMultiEngineClient(object):'
604 d.addCallback(do_gather)
615 d.addCallback(do_gather)
605 return d
616 return d
606
617
607 def map(self, func, seq, style='basic', targets='all', block=True):
618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
608 d_list = []
619 """
620 A parallelized version of Python's builtin map.
621
622 This has a slightly different syntax than the builtin `map`.
623 This is needed because we need to have keyword arguments and thus
624 can't use *args to capture all the sequences. Instead, they must
625 be passed in a list or tuple.
626
627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628
629 Most users will want to use parallel functions or the `mapper`
630 and `map` methods for an API that follows that of the builtin
631 `map`.
632 """
633 if not isinstance(sequences, (list, tuple)):
634 raise TypeError('sequences must be a list or tuple')
635 max_len = max(len(s) for s in sequences)
636 for s in sequences:
637 if len(s)!=max_len:
638 raise ValueError('all sequences must have equal length')
609 if isinstance(func, FunctionType):
639 if isinstance(func, FunctionType):
610 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
611 d.addCallback(lambda did: self.get_pending_deferred(did, True))
641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
612 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)'
642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
613 elif isinstance(func, str):
643 elif isinstance(func, str):
614 d = defer.succeed(None)
644 d = defer.succeed(None)
615 sourceToRun = \
645 sourceToRun = \
616 '_ipython_map_seq_result = map(%s, _ipython_map_seq)' % func
646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
617 else:
647 else:
618 raise TypeError("func must be a function or str")
648 raise TypeError("func must be a function or str")
619
649
620 d.addCallback(lambda _: self.scatter('_ipython_map_seq', seq, style, targets=targets))
650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
621 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
622 d.addCallback(lambda did: self.get_pending_deferred(did, True))
652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
623 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', style, targets=targets, block=block))
653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
624 return d
654 return d
625
655
656 def map(self, func, *sequences):
657 """
658 A parallel version of Python's builtin `map` function.
659
660 This method applies a function to sequences of arguments. It
661 follows the same syntax as the builtin `map`.
662
663 This method creates a mapper objects by calling `self.mapper` with
664 no arguments and then uses that mapper to do the mapping. See
665 the documentation of `mapper` for more details.
666 """
667 return self.mapper().map(func, *sequences)
668
669 def mapper(self, dist='b', targets='all', block=True):
670 """
671 Create a mapper object that has a `map` method.
672
673 This method returns an object that implements the `IMapper`
674 interface. This method is a factory that is used to control how
675 the map happens.
676
677 :Parameters:
678 dist : str
679 What decomposition to use, 'b' is the only one supported
680 currently
681 targets : str, int, sequence of ints
682 Which engines to use for the map
683 block : boolean
684 Should calls to `map` block or not
685 """
686 return MultiEngineMapper(self, dist, targets, block)
687
688 def parallel(self, dist='b', targets='all', block=True):
689 """
690 A decorator that turns a function into a parallel function.
691
692 This can be used as:
693
694 @parallel()
695 def f(x, y)
696 ...
697
698 f(range(10), range(10))
699
700 This causes f(0,0), f(1,1), ... to be called in parallel.
701
702 :Parameters:
703 dist : str
704 What decomposition to use, 'b' is the only one supported
705 currently
706 targets : str, int, sequence of ints
707 Which engines to use for the map
708 block : boolean
709 Should calls to `map` block or not
710 """
711 mapper = self.mapper(dist, targets, block)
712 pf = ParallelFunction(mapper)
713 return pf
714
626 #---------------------------------------------------------------------------
715 #---------------------------------------------------------------------------
627 # ISynchronousMultiEngineExtras related methods
716 # ISynchronousMultiEngineExtras related methods
628 #---------------------------------------------------------------------------
717 #---------------------------------------------------------------------------
@@ -16,17 +16,92 b' __docformat__ = "restructuredtext en"'
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 from types import FunctionType
18 from types import FunctionType
19 from zope.interface import Interface, implements
19
20
20 class ParallelFunction:
21
21 """A function that operates in parallel on sequences."""
22 class IMultiEngineParallelDecorator(Interface):
22 def __init__(self, func, multiengine, targets, block):
23 """A decorator that creates a parallel function."""
23 """Create a `ParallelFunction`.
24
25 def parallel(dist='b', targets=None, block=None):
26 """
27 A decorator that turns a function into a parallel function.
28
29 This can be used as:
30
31 @parallel()
32 def f(x, y)
33 ...
34
35 f(range(10), range(10))
36
37 This causes f(0,0), f(1,1), ... to be called in parallel.
38
39 :Parameters:
40 dist : str
41 What decomposition to use, 'b' is the only one supported
42 currently
43 targets : str, int, sequence of ints
44 Which engines to use for the map
45 block : boolean
46 Should calls to `map` block or not
47 """
48
49 class ITaskParallelDecorator(Interface):
50 """A decorator that creates a parallel function."""
51
52 def parallel(clear_before=False, clear_after=False, retries=0,
53 recovery_task=None, depend=None, block=True):
54 """
55 A decorator that turns a function into a parallel function.
56
57 This can be used as:
58
59 @parallel()
60 def f(x, y)
61 ...
62
63 f(range(10), range(10))
64
65 This causes f(0,0), f(1,1), ... to be called in parallel.
66
67 See the documentation for `IPython.kernel.task.BaseTask` for
68 documentation on the arguments to this method.
69 """
70
71 class IParallelFunction(Interface):
72 pass
73
74 class ParallelFunction(object):
75 """
76 The implementation of a parallel function.
77
78 A parallel function is similar to Python's map function:
79
80 map(func, *sequences) -> pfunc(*sequences)
81
82 Parallel functions should be created by using the @parallel decorator.
83 """
84
85 implements(IParallelFunction)
86
87 def __init__(self, mapper):
88 """
89 Create a parallel function from an `IMapper`.
90
91 :Parameters:
92 mapper : an `IMapper` implementer.
93 The mapper to use for the parallel function
94 """
95 self.mapper = mapper
96
97 def __call__(self, func):
98 """
99 Decorate a function to make it run in parallel.
24 """
100 """
25 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
101 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
26 self.func = func
102 self.func = func
27 self.multiengine = multiengine
103 def call_function(*sequences):
28 self.targets = targets
104 return self.mapper.map(self.func, *sequences)
29 self.block = block
105 return call_function
106
30
107
No newline at end of file
31 def __call__(self, sequence):
32 return self.multiengine.map(self.func, sequence, targets=self.targets, block=self.block) No newline at end of file
@@ -168,8 +168,7 b' def startMsg(control_host,control_port=10105):'
168 print 'For interactive use, you can make a MultiEngineClient with:'
168 print 'For interactive use, you can make a MultiEngineClient with:'
169 print
169 print
170 print 'from IPython.kernel import client'
170 print 'from IPython.kernel import client'
171 print "mec = client.MultiEngineClient((%r,%s))" % \
171 print "mec = client.MultiEngineClient()"
172 (control_host,control_port)
173 print
172 print
174 print 'You can then cleanly stop the cluster from IPython using:'
173 print 'You can then cleanly stop the cluster from IPython using:'
175 print
174 print
@@ -191,16 +190,18 b' def clusterLocal(opt,arg):'
191 logfile = pjoin(logdir_base,'ipcluster-')
190 logfile = pjoin(logdir_base,'ipcluster-')
192
191
193 print 'Starting controller:',
192 print 'Starting controller:',
194 controller = Popen(['ipcontroller','--logfile',logfile])
193 controller = Popen(['ipcontroller','--logfile',logfile,'-x','-y'])
195 print 'Controller PID:',controller.pid
194 print 'Controller PID:',controller.pid
196
195
197 print 'Starting engines: ',
196 print 'Starting engines: ',
198 time.sleep(3)
197 time.sleep(5)
199
198
200 englogfile = '%s%s-' % (logfile,controller.pid)
199 englogfile = '%s%s-' % (logfile,controller.pid)
201 mpi = opt.mpi
200 mpi = opt.mpi
202 if mpi: # start with mpi - killing the engines with sigterm will not work if you do this
201 if mpi: # start with mpi - killing the engines with sigterm will not work if you do this
203 engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi', mpi, '--logfile',englogfile])]
202 engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi',
203 mpi, '--logfile',englogfile])]
204 # engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi', mpi])]
204 else: # do what we would normally do
205 else: # do what we would normally do
205 engines = [ Popen(['ipengine','--logfile',englogfile])
206 engines = [ Popen(['ipengine','--logfile',englogfile])
206 for i in range(opt.n) ]
207 for i in range(opt.n) ]
@@ -58,12 +58,14 b' def start_engine():'
58 kernel_config = kernel_config_manager.get_config_obj()
58 kernel_config = kernel_config_manager.get_config_obj()
59 core_config = core_config_manager.get_config_obj()
59 core_config = core_config_manager.get_config_obj()
60
60
61
61 # Execute the mpi import statement that needs to call MPI_Init
62 # Execute the mpi import statement that needs to call MPI_Init
63 global mpi
62 mpikey = kernel_config['mpi']['default']
64 mpikey = kernel_config['mpi']['default']
63 mpi_import_statement = kernel_config['mpi'].get(mpikey, None)
65 mpi_import_statement = kernel_config['mpi'].get(mpikey, None)
64 if mpi_import_statement is not None:
66 if mpi_import_statement is not None:
65 try:
67 try:
66 exec mpi_import_statement in locals(), globals()
68 exec mpi_import_statement in globals()
67 except:
69 except:
68 mpi = None
70 mpi = None
69 else:
71 else:
This diff has been collapsed as it changes many lines, (654 lines changed) Show them Hide them
@@ -5,116 +5,404 b''
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import copy, time
19 import copy, time
20 from types import FunctionType as function
20 from types import FunctionType
21
21
22 import zope.interface as zi, string
22 import zope.interface as zi, string
23 from twisted.internet import defer, reactor
23 from twisted.internet import defer, reactor
24 from twisted.python import components, log, failure
24 from twisted.python import components, log, failure
25
25
26 # from IPython.genutils import time
26 from IPython.kernel.util import printer
27
28 from IPython.kernel import engineservice as es, error
27 from IPython.kernel import engineservice as es, error
29 from IPython.kernel import controllerservice as cs
28 from IPython.kernel import controllerservice as cs
30 from IPython.kernel.twistedutil import gatherBoth, DeferredList
29 from IPython.kernel.twistedutil import gatherBoth, DeferredList
31
30
32 from IPython.kernel.pickleutil import can,uncan, CannedFunction
31 from IPython.kernel.pickleutil import can, uncan, CannedFunction
33
32
34 def canTask(task):
33 #-----------------------------------------------------------------------------
35 t = copy.copy(task)
34 # Definition of the Task objects
36 t.depend = can(t.depend)
35 #-----------------------------------------------------------------------------
37 if t.recovery_task:
38 t.recovery_task = canTask(t.recovery_task)
39 return t
40
41 def uncanTask(task):
42 t = copy.copy(task)
43 t.depend = uncan(t.depend)
44 if t.recovery_task and t.recovery_task is not task:
45 t.recovery_task = uncanTask(t.recovery_task)
46 return t
47
36
48 time_format = '%Y/%m/%d %H:%M:%S'
37 time_format = '%Y/%m/%d %H:%M:%S'
49
38
50 class Task(object):
39 class ITask(zi.Interface):
51 """Our representation of a task for the `TaskController` interface.
40 """
41 This interface provides a generic definition of what constitutes a task.
42
43 There are two sides to a task. First a task needs to take input from
44 a user to determine what work is performed by the task. Second, the
45 task needs to have the logic that knows how to turn that information
46 info specific calls to a worker, through the `IQueuedEngine` interface.
47
48 Many method in this class get two things passed to them: a Deferred
49 and an IQueuedEngine implementer. Such methods should register callbacks
50 on the Deferred that use the IQueuedEngine to accomplish something. See
51 the existing task objects for examples.
52 """
53
54 zi.Attribute('retries','How many times to retry the task')
55 zi.Attribute('recovery_task','A task to try if the initial one fails')
56 zi.Attribute('taskid','the id of the task')
57
58 def start_time(result):
59 """
60 Do anything needed to start the timing of the task.
61
62 Must simply return the result after starting the timers.
63 """
64
65 def stop_time(result):
66 """
67 Do anything needed to stop the timing of the task.
68
69 Must simply return the result after stopping the timers. This
70 method will usually set attributes that are used by `process_result`
71 in building result of the task.
72 """
73
74 def pre_task(d, queued_engine):
75 """Do something with the queued_engine before the task is run.
76
77 This method should simply add callbacks to the input Deferred
78 that do something with the `queued_engine` before the task is run.
79
80 :Parameters:
81 d : Deferred
82 The deferred that actions should be attached to
83 queued_engine : IQueuedEngine implementer
84 The worker that has been allocated to perform the task
85 """
86
87 def post_task(d, queued_engine):
88 """Do something with the queued_engine after the task is run.
89
90 This method should simply add callbacks to the input Deferred
91 that do something with the `queued_engine` before the task is run.
92
93 :Parameters:
94 d : Deferred
95 The deferred that actions should be attached to
96 queued_engine : IQueuedEngine implementer
97 The worker that has been allocated to perform the task
98 """
99
100 def submit_task(d, queued_engine):
101 """Submit a task using the `queued_engine` we have been allocated.
102
103 When a task is ready to run, this method is called. This method
104 must take the internal information of the task and make suitable
105 calls on the queued_engine to have the actual work done.
106
107 This method should simply add callbacks to the input Deferred
108 that do something with the `queued_engine` before the task is run.
109
110 :Parameters:
111 d : Deferred
112 The deferred that actions should be attached to
113 queued_engine : IQueuedEngine implementer
114 The worker that has been allocated to perform the task
115 """
116
117 def process_result(d, result, engine_id):
118 """Take a raw task result.
119
120 Objects that implement `ITask` can choose how the result of running
121 the task is presented. This method takes the raw result and
122 does this logic. Two example are the `MapTask` which simply returns
123 the raw result or a `Failure` object and the `StringTask` which
124 returns a `TaskResult` object.
125
126 :Parameters:
127 d : Deferred
128 The deferred that actions should be attached to
129 result : object
130 The raw task result that needs to be wrapped
131 engine_id : int
132 The id of the engine that did the task
133
134 :Returns:
135 The result, as a tuple of the form: (success, result).
136 Here, success is a boolean indicating if the task
137 succeeded or failed and result is the result.
138 """
52
139
53 The user should create instances of this class to represent a task that
140 def check_depend(properties):
54 needs to be done.
141 """Check properties to see if the task should be run.
142
143 :Parameters:
144 properties : dict
145 A dictionary of properties that an engine has set
146
147 :Returns:
148 True if the task should be run, False otherwise
149 """
150
151 def can_task(self):
152 """Serialize (can) any functions in the task for pickling.
153
154 Subclasses must override this method and make sure that all
155 functions in the task are canned by calling `can` on the
156 function.
157 """
158
159 def uncan_task(self):
160 """Unserialize (uncan) any canned function in the task."""
161
162 class BaseTask(object):
163 """
164 Common fuctionality for all objects implementing `ITask`.
165 """
166
167 zi.implements(ITask)
168
169 def __init__(self, clear_before=False, clear_after=False, retries=0,
170 recovery_task=None, depend=None):
171 """
172 Make a generic task.
55
173
56 :Parameters:
174 :Parameters:
57 expression : str
58 A str that is valid python code that is the task.
59 pull : str or list of str
60 The names of objects to be pulled as results. If not specified,
61 will return {'result', None}
62 push : dict
63 A dict of objects to be pushed into the engines namespace before
64 execution of the expression.
65 clear_before : boolean
175 clear_before : boolean
66 Should the engine's namespace be cleared before the task is run.
176 Should the engines namespace be cleared before the task
67 Default=False.
177 is run
68 clear_after : boolean
178 clear_after : boolean
69 Should the engine's namespace be cleared after the task is run.
179 Should the engines namespace be clear after the task is run
70 Default=False.
71 retries : int
180 retries : int
72 The number of times to resumbit the task if it fails. Default=0.
181 The number of times a task should be retries upon failure
73 recovery_task : Task
182 recovery_task : any task object
74 This is the Task to be run when the task has exhausted its retries
183 If a task fails and it has a recovery_task, that is run
75 Default=None.
184 upon a retry
76 depend : bool function(properties)
185 depend : FunctionType
77 This is the dependency function for the Task, which determines
186 A function that is called to test for properties. This function
78 whether a task can be run on a Worker. `depend` is called with
187 must take one argument, the properties dict and return a boolean
79 one argument, the worker's properties dict, and should return
188 """
80 True if the worker meets the dependencies or False if it does
189 self.clear_before = clear_before
81 not.
190 self.clear_after = clear_after
82 Default=None - run on any worker
191 self.retries = retries
83 options : dict
192 self.recovery_task = recovery_task
84 Any other keyword options for more elaborate uses of tasks
193 self.depend = depend
85
194 self.taskid = None
86 Examples
195
87 --------
196 def start_time(self, result):
197 """
198 Start the basic timers.
199 """
200 self.start = time.time()
201 self.start_struct = time.localtime()
202 return result
203
204 def stop_time(self, result):
205 """
206 Stop the basic timers.
207 """
208 self.stop = time.time()
209 self.stop_struct = time.localtime()
210 self.duration = self.stop - self.start
211 self.submitted = time.strftime(time_format, self.start_struct)
212 self.completed = time.strftime(time_format)
213 return result
214
215 def pre_task(self, d, queued_engine):
216 """
217 Clear the engine before running the task if clear_before is set.
218 """
219 if self.clear_before:
220 d.addCallback(lambda r: queued_engine.reset())
221
222 def post_task(self, d, queued_engine):
223 """
224 Clear the engine after running the task if clear_after is set.
225 """
226 def reseter(result):
227 queued_engine.reset()
228 return result
229 if self.clear_after:
230 d.addBoth(reseter)
231
232 def submit_task(self, d, queued_engine):
233 raise NotImplementedError('submit_task must be implemented in a subclass')
234
235 def process_result(self, result, engine_id):
236 """
237 Process a task result.
238
239 This is the default `process_result` that just returns the raw
240 result or a `Failure`.
241 """
242 if isinstance(result, failure.Failure):
243 return (False, result)
244 else:
245 return (True, result)
246
247 def check_depend(self, properties):
248 """
249 Calls self.depend(properties) to see if a task should be run.
250 """
251 if self.depend is not None:
252 return self.depend(properties)
253 else:
254 return True
255
256 def can_task(self):
257 self.depend = can(self.depend)
258 if isinstance(self.recovery_task, BaseTask):
259 self.recovery_task.can_task()
260
261 def uncan_task(self):
262 self.depend = uncan(self.depend)
263 if isinstance(self.recovery_task, BaseTask):
264 self.recovery_task.uncan_task()
265
266 class MapTask(BaseTask):
267 """
268 A task that consists of a function and arguments.
269 """
88
270
89 >>> t = Task('dostuff(args)')
271 zi.implements(ITask)
90 >>> t = Task('a=5', pull='a')
272
91 >>> t = Task('a=5\nb=4', pull=['a','b'])
273 def __init__(self, function, args=None, kwargs=None, clear_before=False,
92 >>> t = Task('os.kill(os.getpid(),9)', retries=100) # this is a bad idea
274 clear_after=False, retries=0, recovery_task=None, depend=None):
93 # A dependency case:
275 """
94 >>> def hasMPI(props):
276 Create a task based on a function, args and kwargs.
95 ... return props.get('mpi') is not None
277
96 >>> t = Task('mpi.send(blah,blah)', depend = hasMPI)
278 This is a simple type of task that consists of calling:
279 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
280
281 The return value of the function, or a `Failure` wrapping an
282 exception is the task result for this type of task.
283 """
284 BaseTask.__init__(self, clear_before, clear_after, retries,
285 recovery_task, depend)
286 if not isinstance(function, FunctionType):
287 raise TypeError('a task function must be a FunctionType')
288 self.function = function
289 if args is None:
290 self.args = ()
291 else:
292 self.args = args
293 if not isinstance(self.args, (list, tuple)):
294 raise TypeError('a task args must be a list or tuple')
295 if kwargs is None:
296 self.kwargs = {}
297 else:
298 self.kwargs = kwargs
299 if not isinstance(self.kwargs, dict):
300 raise TypeError('a task kwargs must be a dict')
301
302 def submit_task(self, d, queued_engine):
303 d.addCallback(lambda r: queued_engine.push_function(
304 dict(_ipython_task_function=self.function))
305 )
306 d.addCallback(lambda r: queued_engine.push(
307 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
308 )
309 d.addCallback(lambda r: queued_engine.execute(
310 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
311 )
312 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
313
314 def can_task(self):
315 self.function = can(self.function)
316 BaseTask.can_task(self)
317
318 def uncan_task(self):
319 self.function = uncan(self.function)
320 BaseTask.uncan_task(self)
321
322
323 class StringTask(BaseTask):
324 """
325 A task that consists of a string of Python code to run.
97 """
326 """
98
327
99 def __init__(self, expression, pull=None, push=None,
328 def __init__(self, expression, pull=None, push=None,
100 clear_before=False, clear_after=False, retries=0,
329 clear_before=False, clear_after=False, retries=0,
101 recovery_task=None, depend=None, **options):
330 recovery_task=None, depend=None):
331 """
332 Create a task based on a Python expression and variables
333
334 This type of task lets you push a set of variables to the engines
335 namespace, run a Python string in that namespace and then bring back
336 a different set of Python variables as the result.
337
338 Because this type of task can return many results (through the
339 `pull` keyword argument) it returns a special `TaskResult` object
340 that wraps the pulled variables, statistics about the run and
341 any exceptions raised.
342 """
343 if not isinstance(expression, str):
344 raise TypeError('a task expression must be a string')
102 self.expression = expression
345 self.expression = expression
103 if isinstance(pull, str):
346
104 self.pull = [pull]
347 if pull==None:
105 else:
348 self.pull = ()
349 elif isinstance(pull, str):
350 self.pull = (pull,)
351 elif isinstance(pull, (list, tuple)):
106 self.pull = pull
352 self.pull = pull
353 else:
354 raise TypeError('pull must be str or a sequence of strs')
355
356 if push==None:
357 self.push = {}
358 elif isinstance(push, dict):
107 self.push = push
359 self.push = push
108 self.clear_before = clear_before
360 else:
109 self.clear_after = clear_after
361 raise TypeError('push must be a dict')
110 self.retries=retries
362
111 self.recovery_task = recovery_task
363 BaseTask.__init__(self, clear_before, clear_after, retries,
112 self.depend = depend
364 recovery_task, depend)
113 self.options = options
114 self.taskid = None
115
365
116 class ResultNS:
366 def submit_task(self, d, queued_engine):
117 """The result namespace object for use in TaskResult objects as tr.ns.
367 if self.push is not None:
368 d.addCallback(lambda r: queued_engine.push(self.push))
369
370 d.addCallback(lambda r: queued_engine.execute(self.expression))
371
372 if self.pull is not None:
373 d.addCallback(lambda r: queued_engine.pull(self.pull))
374 else:
375 d.addCallback(lambda r: None)
376
377 def process_result(self, result, engine_id):
378 if isinstance(result, failure.Failure):
379 tr = TaskResult(result, engine_id)
380 else:
381 if self.pull is None:
382 resultDict = {}
383 elif len(self.pull) == 1:
384 resultDict = {self.pull[0]:result}
385 else:
386 resultDict = dict(zip(self.pull, result))
387 tr = TaskResult(resultDict, engine_id)
388 # Assign task attributes
389 tr.submitted = self.submitted
390 tr.completed = self.completed
391 tr.duration = self.duration
392 if hasattr(self,'taskid'):
393 tr.taskid = self.taskid
394 else:
395 tr.taskid = None
396 if isinstance(result, failure.Failure):
397 return (False, tr)
398 else:
399 return (True, tr)
400
401 class ResultNS(object):
402 """
403 A dict like object for holding the results of a task.
404
405 The result namespace object for use in `TaskResult` objects as tr.ns.
118 It builds an object from a dictionary, such that it has attributes
406 It builds an object from a dictionary, such that it has attributes
119 according to the key,value pairs of the dictionary.
407 according to the key,value pairs of the dictionary.
120
408
@@ -152,7 +440,7 b' class ResultNS:'
152
440
153 class TaskResult(object):
441 class TaskResult(object):
154 """
442 """
155 An object for returning task results.
443 An object for returning task results for certain types of tasks.
156
444
157 This object encapsulates the results of a task. On task
445 This object encapsulates the results of a task. On task
158 success it will have a keys attribute that will have a list
446 success it will have a keys attribute that will have a list
@@ -162,21 +450,21 b' class TaskResult(object):'
162
450
163 In task failure, keys will be empty, but failure will contain
451 In task failure, keys will be empty, but failure will contain
164 the failure object that encapsulates the remote exception.
452 the failure object that encapsulates the remote exception.
165 One can also simply call the raiseException() method of
453 One can also simply call the `raise_exception` method of
166 this class to re-raise any remote exception in the local
454 this class to re-raise any remote exception in the local
167 session.
455 session.
168
456
169 The TaskResult has a .ns member, which is a property for access
457 The `TaskResult` has a `.ns` member, which is a property for access
170 to the results. If the Task had pull=['a', 'b'], then the
458 to the results. If the Task had pull=['a', 'b'], then the
171 Task Result will have attributes tr.ns.a, tr.ns.b for those values.
459 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
172 Accessing tr.ns will raise the remote failure if the task failed.
460 Accessing `tr.ns` will raise the remote failure if the task failed.
173
461
174 The engineid attribute should have the engineid of the engine
462 The `engineid` attribute should have the `engineid` of the engine
175 that ran the task. But, because engines can come and go in
463 that ran the task. But, because engines can come and go,
176 the ipython task system, the engineid may not continue to be
464 the `engineid` may not continue to be
177 valid or accurate.
465 valid or accurate.
178
466
179 The taskid attribute simply gives the taskid that the task
467 The `taskid` attribute simply gives the `taskid` that the task
180 is tracked under.
468 is tracked under.
181 """
469 """
182 taskid = None
470 taskid = None
@@ -188,7 +476,7 b' class TaskResult(object):'
188 return self._ns
476 return self._ns
189
477
190 def _setNS(self, v):
478 def _setNS(self, v):
191 raise Exception("I am protected!")
479 raise Exception("the ns attribute cannot be changed")
192
480
193 ns = property(_getNS, _setNS)
481 ns = property(_getNS, _setNS)
194
482
@@ -214,15 +502,19 b' class TaskResult(object):'
214
502
215 def __getitem__(self, key):
503 def __getitem__(self, key):
216 if self.failure is not None:
504 if self.failure is not None:
217 self.raiseException()
505 self.raise_exception()
218 return self.results[key]
506 return self.results[key]
219
507
220 def raiseException(self):
508 def raise_exception(self):
221 """Re-raise any remote exceptions in the local python session."""
509 """Re-raise any remote exceptions in the local python session."""
222 if self.failure is not None:
510 if self.failure is not None:
223 self.failure.raiseException()
511 self.failure.raiseException()
224
512
225
513
514 #-----------------------------------------------------------------------------
515 # The controller side of things
516 #-----------------------------------------------------------------------------
517
226 class IWorker(zi.Interface):
518 class IWorker(zi.Interface):
227 """The Basic Worker Interface.
519 """The Basic Worker Interface.
228
520
@@ -237,12 +529,15 b' class IWorker(zi.Interface):'
237 :Parameters:
529 :Parameters:
238 task : a `Task` object
530 task : a `Task` object
239
531
240 :Returns: `Deferred` to a `TaskResult` object.
532 :Returns: `Deferred` to a tuple of (success, result) where
533 success if a boolean that signifies success or failure
534 and result is the task result.
241 """
535 """
242
536
243
537
244 class WorkerFromQueuedEngine(object):
538 class WorkerFromQueuedEngine(object):
245 """Adapt an `IQueuedEngine` to an `IWorker` object"""
539 """Adapt an `IQueuedEngine` to an `IWorker` object"""
540
246 zi.implements(IWorker)
541 zi.implements(IWorker)
247
542
248 def __init__(self, qe):
543 def __init__(self, qe):
@@ -257,52 +552,26 b' class WorkerFromQueuedEngine(object):'
257 def run(self, task):
552 def run(self, task):
258 """Run task in worker's namespace.
553 """Run task in worker's namespace.
259
554
555 This takes a task and calls methods on the task that actually
556 cause `self.queuedEngine` to do the task. See the methods of
557 `ITask` for more information about how these methods are called.
558
260 :Parameters:
559 :Parameters:
261 task : a `Task` object
560 task : a `Task` object
262
561
263 :Returns: `Deferred` to a `TaskResult` object.
562 :Returns: `Deferred` to a tuple of (success, result) where
563 success if a boolean that signifies success or failure
564 and result is the task result.
264 """
565 """
265 if task.clear_before:
266 d = self.queuedEngine.reset()
267 else:
268 d = defer.succeed(None)
566 d = defer.succeed(None)
269
567 d.addCallback(task.start_time)
270 if task.push is not None:
568 task.pre_task(d, self.queuedEngine)
271 d.addCallback(lambda r: self.queuedEngine.push(task.push))
569 task.submit_task(d, self.queuedEngine)
272
570 task.post_task(d, self.queuedEngine)
273 d.addCallback(lambda r: self.queuedEngine.execute(task.expression))
571 d.addBoth(task.stop_time)
274
572 d.addBoth(task.process_result, self.queuedEngine.id)
275 if task.pull is not None:
573 # At this point, there will be (success, result) coming down the line
276 d.addCallback(lambda r: self.queuedEngine.pull(task.pull))
574 return d
277 else:
278 d.addCallback(lambda r: None)
279
280 def reseter(result):
281 self.queuedEngine.reset()
282 return result
283
284 if task.clear_after:
285 d.addBoth(reseter)
286
287 return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime())
288
289 def _zipResults(self, result, names, start, start_struct):
290 """Callback for construting the TaskResult object."""
291 if isinstance(result, failure.Failure):
292 tr = TaskResult(result, self.queuedEngine.id)
293 else:
294 if names is None:
295 resultDict = {}
296 elif len(names) == 1:
297 resultDict = {names[0]:result}
298 else:
299 resultDict = dict(zip(names, result))
300 tr = TaskResult(resultDict, self.queuedEngine.id)
301 # the time info
302 tr.submitted = time.strftime(time_format, start_struct)
303 tr.completed = time.strftime(time_format)
304 tr.duration = time.time()-start
305 return tr
306
575
307
576
308 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
577 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
@@ -319,14 +588,14 b' class IScheduler(zi.Interface):'
319 """Add a task to the queue of the Scheduler.
588 """Add a task to the queue of the Scheduler.
320
589
321 :Parameters:
590 :Parameters:
322 task : a `Task` object
591 task : an `ITask` implementer
323 The task to be queued.
592 The task to be queued.
324 flags : dict
593 flags : dict
325 General keywords for more sophisticated scheduling
594 General keywords for more sophisticated scheduling
326 """
595 """
327
596
328 def pop_task(id=None):
597 def pop_task(id=None):
329 """Pops a Task object.
598 """Pops a task object from the queue.
330
599
331 This gets the next task to be run. If no `id` is requested, the highest priority
600 This gets the next task to be run. If no `id` is requested, the highest priority
332 task is returned.
601 task is returned.
@@ -336,7 +605,7 b' class IScheduler(zi.Interface):'
336 The id of the task to be popped. The default (None) is to return
605 The id of the task to be popped. The default (None) is to return
337 the highest priority task.
606 the highest priority task.
338
607
339 :Returns: a `Task` object
608 :Returns: an `ITask` implementer
340
609
341 :Exceptions:
610 :Exceptions:
342 IndexError : raised if no taskid in queue
611 IndexError : raised if no taskid in queue
@@ -346,8 +615,9 b' class IScheduler(zi.Interface):'
346 """Add a worker to the worker queue.
615 """Add a worker to the worker queue.
347
616
348 :Parameters:
617 :Parameters:
349 worker : an IWorker implementing object
618 worker : an `IWorker` implementer
350 flags : General keywords for more sophisticated scheduling
619 flags : dict
620 General keywords for more sophisticated scheduling
351 """
621 """
352
622
353 def pop_worker(id=None):
623 def pop_worker(id=None):
@@ -370,15 +640,15 b' class IScheduler(zi.Interface):'
370 """Returns True if there is something to do, False otherwise"""
640 """Returns True if there is something to do, False otherwise"""
371
641
372 def schedule():
642 def schedule():
373 """Returns a tuple of the worker and task pair for the next
643 """Returns (worker,task) pair for the next task to be run."""
374 task to be run.
375 """
376
644
377
645
378 class FIFOScheduler(object):
646 class FIFOScheduler(object):
379 """A basic First-In-First-Out (Queue) Scheduler.
647 """
380 This is the default Scheduler for the TaskController.
648 A basic First-In-First-Out (Queue) Scheduler.
381 See the docstrings for IScheduler for interface details.
649
650 This is the default Scheduler for the `TaskController`.
651 See the docstrings for `IScheduler` for interface details.
382 """
652 """
383
653
384 zi.implements(IScheduler)
654 zi.implements(IScheduler)
@@ -435,7 +705,9 b' class FIFOScheduler(object):'
435 for t in self.tasks:
705 for t in self.tasks:
436 for w in self.workers:
706 for w in self.workers:
437 try:# do not allow exceptions to break this
707 try:# do not allow exceptions to break this
438 cando = t.depend is None or t.depend(w.properties)
708 # Allow the task to check itself using its
709 # check_depend method.
710 cando = t.check_depend(w.properties)
439 except:
711 except:
440 cando = False
712 cando = False
441 if cando:
713 if cando:
@@ -445,9 +717,12 b' class FIFOScheduler(object):'
445
717
446
718
447 class LIFOScheduler(FIFOScheduler):
719 class LIFOScheduler(FIFOScheduler):
448 """A Last-In-First-Out (Stack) Scheduler. This scheduler should naively
720 """
449 reward fast engines by giving them more jobs. This risks starvation, but
721 A Last-In-First-Out (Stack) Scheduler.
450 only in cases with low load, where starvation does not really matter.
722
723 This scheduler should naively reward fast engines by giving
724 them more jobs. This risks starvation, but only in cases with
725 low load, where starvation does not really matter.
451 """
726 """
452
727
453 def add_task(self, task, **flags):
728 def add_task(self, task, **flags):
@@ -462,13 +737,15 b' class LIFOScheduler(FIFOScheduler):'
462
737
463
738
464 class ITaskController(cs.IControllerBase):
739 class ITaskController(cs.IControllerBase):
465 """The Task based interface to a `ControllerService` object
740 """
741 The Task based interface to a `ControllerService` object
466
742
467 This adapts a `ControllerService` to the ITaskController interface.
743 This adapts a `ControllerService` to the ITaskController interface.
468 """
744 """
469
745
470 def run(task):
746 def run(task):
471 """Run a task.
747 """
748 Run a task.
472
749
473 :Parameters:
750 :Parameters:
474 task : an IPython `Task` object
751 task : an IPython `Task` object
@@ -477,13 +754,14 b' class ITaskController(cs.IControllerBase):'
477 """
754 """
478
755
479 def get_task_result(taskid, block=False):
756 def get_task_result(taskid, block=False):
480 """Get the result of a task by its ID.
757 """
758 Get the result of a task by its ID.
481
759
482 :Parameters:
760 :Parameters:
483 taskid : int
761 taskid : int
484 the id of the task whose result is requested
762 the id of the task whose result is requested
485
763
486 :Returns: `Deferred` to (taskid, actualResult) if the task is done, and None
764 :Returns: `Deferred` to the task result if the task is done, and None
487 if not.
765 if not.
488
766
489 :Exceptions:
767 :Exceptions:
@@ -508,23 +786,35 b' class ITaskController(cs.IControllerBase):'
508 """
786 """
509
787
510 def barrier(taskids):
788 def barrier(taskids):
511 """Block until the list of taskids are completed.
789 """
790 Block until the list of taskids are completed.
512
791
513 Returns None on success.
792 Returns None on success.
514 """
793 """
515
794
516 def spin():
795 def spin():
517 """touch the scheduler, to resume scheduling without submitting
796 """
518 a task.
797 Touch the scheduler, to resume scheduling without submitting a task.
519 """
798 """
520
799
521 def queue_status(self, verbose=False):
800 def queue_status(verbose=False):
522 """Get a dictionary with the current state of the task queue.
801 """
802 Get a dictionary with the current state of the task queue.
523
803
524 If verbose is True, then return lists of taskids, otherwise,
804 If verbose is True, then return lists of taskids, otherwise,
525 return the number of tasks with each status.
805 return the number of tasks with each status.
526 """
806 """
527
807
808 def clear():
809 """
810 Clear all previously run tasks from the task controller.
811
812 This is needed because the task controller keep all task results
813 in memory. This can be a problem is there are many completed
814 tasks. Users should call this periodically to clean out these
815 cached task results.
816 """
817
528
818
529 class TaskController(cs.ControllerAdapterBase):
819 class TaskController(cs.ControllerAdapterBase):
530 """The Task based interface to a Controller object.
820 """The Task based interface to a Controller object.
@@ -561,7 +851,7 b' class TaskController(cs.ControllerAdapterBase):'
561 def registerWorker(self, id):
851 def registerWorker(self, id):
562 """Called by controller.register_engine."""
852 """Called by controller.register_engine."""
563 if self.workers.get(id):
853 if self.workers.get(id):
564 raise "We already have one! This should not happen."
854 raise ValueError("worker with id %s already exists. This should not happen." % id)
565 self.workers[id] = IWorker(self.controller.engines[id])
855 self.workers[id] = IWorker(self.controller.engines[id])
566 self.workers[id].workerid = id
856 self.workers[id].workerid = id
567 if not self.pendingTasks.has_key(id):# if not working
857 if not self.pendingTasks.has_key(id):# if not working
@@ -586,21 +876,25 b' class TaskController(cs.ControllerAdapterBase):'
586 #---------------------------------------------------------------------------
876 #---------------------------------------------------------------------------
587
877
588 def run(self, task):
878 def run(self, task):
589 """Run a task and return `Deferred` to its taskid."""
879 """
880 Run a task and return `Deferred` to its taskid.
881 """
590 task.taskid = self.taskid
882 task.taskid = self.taskid
591 task.start = time.localtime()
883 task.start = time.localtime()
592 self.taskid += 1
884 self.taskid += 1
593 d = defer.Deferred()
885 d = defer.Deferred()
594 self.scheduler.add_task(task)
886 self.scheduler.add_task(task)
595 # log.msg('Queuing task: %i' % task.taskid)
887 log.msg('Queuing task: %i' % task.taskid)
596
888
597 self.deferredResults[task.taskid] = []
889 self.deferredResults[task.taskid] = []
598 self.distributeTasks()
890 self.distributeTasks()
599 return defer.succeed(task.taskid)
891 return defer.succeed(task.taskid)
600
892
601 def get_task_result(self, taskid, block=False):
893 def get_task_result(self, taskid, block=False):
602 """Returns a `Deferred` to a TaskResult tuple or None."""
894 """
603 # log.msg("Getting task result: %i" % taskid)
895 Returns a `Deferred` to the task result, or None.
896 """
897 log.msg("Getting task result: %i" % taskid)
604 if self.finishedResults.has_key(taskid):
898 if self.finishedResults.has_key(taskid):
605 tr = self.finishedResults[taskid]
899 tr = self.finishedResults[taskid]
606 return defer.succeed(tr)
900 return defer.succeed(tr)
@@ -615,7 +909,9 b' class TaskController(cs.ControllerAdapterBase):'
615 return defer.fail(IndexError("task ID not registered: %r" % taskid))
909 return defer.fail(IndexError("task ID not registered: %r" % taskid))
616
910
617 def abort(self, taskid):
911 def abort(self, taskid):
618 """Remove a task from the queue if it has not been run already."""
912 """
913 Remove a task from the queue if it has not been run already.
914 """
619 if not isinstance(taskid, int):
915 if not isinstance(taskid, int):
620 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
916 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
621 try:
917 try:
@@ -674,8 +970,10 b' class TaskController(cs.ControllerAdapterBase):'
674 #---------------------------------------------------------------------------
970 #---------------------------------------------------------------------------
675
971
676 def _doAbort(self, taskid):
972 def _doAbort(self, taskid):
677 """Helper function for aborting a pending task."""
973 """
678 # log.msg("Task aborted: %i" % taskid)
974 Helper function for aborting a pending task.
975 """
976 log.msg("Task aborted: %i" % taskid)
679 result = failure.Failure(error.TaskAborted())
977 result = failure.Failure(error.TaskAborted())
680 self._finishTask(taskid, result)
978 self._finishTask(taskid, result)
681 if taskid in self.abortPending:
979 if taskid in self.abortPending:
@@ -683,14 +981,16 b' class TaskController(cs.ControllerAdapterBase):'
683
981
684 def _finishTask(self, taskid, result):
982 def _finishTask(self, taskid, result):
685 dlist = self.deferredResults.pop(taskid)
983 dlist = self.deferredResults.pop(taskid)
686 result.taskid = taskid # The TaskResult should save the taskid
984 # result.taskid = taskid # The TaskResult should save the taskid
687 self.finishedResults[taskid] = result
985 self.finishedResults[taskid] = result
688 for d in dlist:
986 for d in dlist:
689 d.callback(result)
987 d.callback(result)
690
988
691 def distributeTasks(self):
989 def distributeTasks(self):
692 """Distribute tasks while self.scheduler has things to do."""
990 """
693 # log.msg("distributing Tasks")
991 Distribute tasks while self.scheduler has things to do.
992 """
993 log.msg("distributing Tasks")
694 worker, task = self.scheduler.schedule()
994 worker, task = self.scheduler.schedule()
695 if not worker and not task:
995 if not worker and not task:
696 if self.idleLater and self.idleLater.called:# we are inside failIdle
996 if self.idleLater and self.idleLater.called:# we are inside failIdle
@@ -705,7 +1005,7 b' class TaskController(cs.ControllerAdapterBase):'
705 self.pendingTasks[worker.workerid] = task
1005 self.pendingTasks[worker.workerid] = task
706 # run/link callbacks
1006 # run/link callbacks
707 d = worker.run(task)
1007 d = worker.run(task)
708 # log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1008 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
709 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
1009 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
710 worker, task = self.scheduler.schedule()
1010 worker, task = self.scheduler.schedule()
711 # check for idle timeout:
1011 # check for idle timeout:
@@ -727,14 +1027,15 b' class TaskController(cs.ControllerAdapterBase):'
727 t = self.scheduler.pop_task()
1027 t = self.scheduler.pop_task()
728 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
1028 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
729 msg += " for %i seconds"%self.timeout
1029 msg += " for %i seconds"%self.timeout
730 # log.msg("Task aborted by timeout: %i" % t.taskid)
1030 log.msg("Task aborted by timeout: %i" % t.taskid)
731 f = failure.Failure(error.TaskTimeout(msg))
1031 f = failure.Failure(error.TaskTimeout(msg))
732 self._finishTask(t.taskid, f)
1032 self._finishTask(t.taskid, f)
733 self.idleLater = None
1033 self.idleLater = None
734
1034
735
1035
736 def taskCompleted(self, result, taskid, workerid):
1036 def taskCompleted(self, success_and_result, taskid, workerid):
737 """This is the err/callback for a completed task."""
1037 """This is the err/callback for a completed task."""
1038 success, result = success_and_result
738 try:
1039 try:
739 task = self.pendingTasks.pop(workerid)
1040 task = self.pendingTasks.pop(workerid)
740 except:
1041 except:
@@ -751,7 +1052,7 b' class TaskController(cs.ControllerAdapterBase):'
751 aborted = True
1052 aborted = True
752
1053
753 if not aborted:
1054 if not aborted:
754 if result.failure is not None and isinstance(result.failure, failure.Failure): # we failed
1055 if not success:
755 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1056 log.msg("Task %i failed on worker %i"% (taskid, workerid))
756 if task.retries > 0: # resubmit
1057 if task.retries > 0: # resubmit
757 task.retries -= 1
1058 task.retries -= 1
@@ -759,7 +1060,7 b' class TaskController(cs.ControllerAdapterBase):'
759 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
1060 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
760 log.msg(s)
1061 log.msg(s)
761 self.distributeTasks()
1062 self.distributeTasks()
762 elif isinstance(task.recovery_task, Task) and \
1063 elif isinstance(task.recovery_task, BaseTask) and \
763 task.recovery_task.retries > -1:
1064 task.recovery_task.retries > -1:
764 # retries = -1 is to prevent infinite recovery_task loop
1065 # retries = -1 is to prevent infinite recovery_task loop
765 task.retries = -1
1066 task.retries = -1
@@ -775,17 +1076,18 b' class TaskController(cs.ControllerAdapterBase):'
775 # it may have died, and not yet been unregistered
1076 # it may have died, and not yet been unregistered
776 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1077 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
777 else: # we succeeded
1078 else: # we succeeded
778 # log.msg("Task completed: %i"% taskid)
1079 log.msg("Task completed: %i"% taskid)
779 self._finishTask(taskid, result)
1080 self._finishTask(taskid, result)
780 self.readmitWorker(workerid)
1081 self.readmitWorker(workerid)
781 else:# we aborted the task
1082 else: # we aborted the task
782 if result.failure is not None and isinstance(result.failure, failure.Failure): # it failed, penalize worker
1083 if not success:
783 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1084 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
784 else:
1085 else:
785 self.readmitWorker(workerid)
1086 self.readmitWorker(workerid)
786
1087
787 def readmitWorker(self, workerid):
1088 def readmitWorker(self, workerid):
788 """Readmit a worker to the scheduler.
1089 """
1090 Readmit a worker to the scheduler.
789
1091
790 This is outside `taskCompleted` because of the `failurePenalty` being
1092 This is outside `taskCompleted` because of the `failurePenalty` being
791 implemented through `reactor.callLater`.
1093 implemented through `reactor.callLater`.
@@ -795,5 +1097,17 b' class TaskController(cs.ControllerAdapterBase):'
795 self.scheduler.add_worker(self.workers[workerid])
1097 self.scheduler.add_worker(self.workers[workerid])
796 self.distributeTasks()
1098 self.distributeTasks()
797
1099
1100 def clear(self):
1101 """
1102 Clear all previously run tasks from the task controller.
1103
1104 This is needed because the task controller keep all task results
1105 in memory. This can be a problem is there are many completed
1106 tasks. Users should call this periodically to clean out these
1107 cached task results.
1108 """
1109 self.finishedResults = {}
1110 return defer.succeed(None)
1111
798
1112
799 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
1113 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
@@ -1,9 +1,8 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3
3
4 """The Generic Task Client object.
4 """
5
5 A blocking version of the task client.
6 This must be subclassed based on your connection method.
7 """
6 """
8
7
9 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
@@ -24,116 +23,97 b' from twisted.python import components, log'
24
23
25 from IPython.kernel.twistedutil import blockingCallFromThread
24 from IPython.kernel.twistedutil import blockingCallFromThread
26 from IPython.kernel import task, error
25 from IPython.kernel import task, error
26 from IPython.kernel.mapper import (
27 SynchronousTaskMapper,
28 ITaskMapperFactory,
29 IMapper
30 )
31 from IPython.kernel.parallelfunction import (
32 ParallelFunction,
33 ITaskParallelDecorator
34 )
27
35
28 #-------------------------------------------------------------------------------
36 #-------------------------------------------------------------------------------
29 # Connecting Task Client
37 # The task client
30 #-------------------------------------------------------------------------------
38 #-------------------------------------------------------------------------------
31
39
32 class InteractiveTaskClient(object):
33
34 def irun(self, *args, **kwargs):
35 """Run a task on the `TaskController`.
36
37 This method is a shorthand for run(task) and its arguments are simply
38 passed onto a `Task` object:
39
40 irun(*args, **kwargs) -> run(Task(*args, **kwargs))
41
42 :Parameters:
43 expression : str
44 A str that is valid python code that is the task.
45 pull : str or list of str
46 The names of objects to be pulled as results.
47 push : dict
48 A dict of objects to be pushed into the engines namespace before
49 execution of the expression.
50 clear_before : boolean
51 Should the engine's namespace be cleared before the task is run.
52 Default=False.
53 clear_after : boolean
54 Should the engine's namespace be cleared after the task is run.
55 Default=False.
56 retries : int
57 The number of times to resumbit the task if it fails. Default=0.
58 options : dict
59 Any other keyword options for more elaborate uses of tasks
60
61 :Returns: A `TaskResult` object.
62 """
63 block = kwargs.pop('block', False)
64 if len(args) == 1 and isinstance(args[0], task.Task):
65 t = args[0]
66 else:
67 t = task.Task(*args, **kwargs)
68 taskid = self.run(t)
69 print "TaskID = %i"%taskid
70 if block:
71 return self.get_task_result(taskid, block)
72 else:
73 return taskid
74
75 class IBlockingTaskClient(Interface):
40 class IBlockingTaskClient(Interface):
76 """
41 """
77 An interface for blocking task clients.
42 A vague interface of the blocking task client
78 """
43 """
79 pass
44 pass
80
45
81
46 class BlockingTaskClient(object):
82 class BlockingTaskClient(InteractiveTaskClient):
83 """
47 """
84 This class provides a blocking task client.
48 A blocking task client that adapts a non-blocking one.
85 """
49 """
86
50
87 implements(IBlockingTaskClient)
51 implements(
52 IBlockingTaskClient,
53 ITaskMapperFactory,
54 IMapper,
55 ITaskParallelDecorator
56 )
88
57
89 def __init__(self, task_controller):
58 def __init__(self, task_controller):
90 self.task_controller = task_controller
59 self.task_controller = task_controller
91 self.block = True
60 self.block = True
92
61
93 def run(self, task):
62 def run(self, task, block=False):
94 """
63 """Run a task on the `TaskController`.
95 Run a task and return a task id that can be used to get the task result.
64
65 See the documentation of the `MapTask` and `StringTask` classes for
66 details on how to build a task of different types.
96
67
97 :Parameters:
68 :Parameters:
98 task : `Task`
69 task : an `ITask` implementer
99 The `Task` object to run
70
71 :Returns: The int taskid of the submitted task. Pass this to
72 `get_task_result` to get the `TaskResult` object.
100 """
73 """
101 return blockingCallFromThread(self.task_controller.run, task)
74 tid = blockingCallFromThread(self.task_controller.run, task)
75 if block:
76 return self.get_task_result(tid, block=True)
77 else:
78 return tid
102
79
103 def get_task_result(self, taskid, block=False):
80 def get_task_result(self, taskid, block=False):
104 """
81 """
105 Get or poll for a task result.
82 Get a task result by taskid.
106
83
107 :Parameters:
84 :Parameters:
108 taskid : int
85 taskid : int
109 The id of the task whose result to get
86 The taskid of the task to be retrieved.
110 block : boolean
87 block : boolean
111 If True, wait until the task is done and then result the
88 Should I block until the task is done?
112 `TaskResult` object. If False, just poll for the result and
89
113 return None if the task is not done.
90 :Returns: A `TaskResult` object that encapsulates the task result.
114 """
91 """
115 return blockingCallFromThread(self.task_controller.get_task_result,
92 return blockingCallFromThread(self.task_controller.get_task_result,
116 taskid, block)
93 taskid, block)
117
94
118 def abort(self, taskid):
95 def abort(self, taskid):
119 """
96 """
120 Abort a task by task id if it has not been started.
97 Abort a task by taskid.
98
99 :Parameters:
100 taskid : int
101 The taskid of the task to be aborted.
121 """
102 """
122 return blockingCallFromThread(self.task_controller.abort, taskid)
103 return blockingCallFromThread(self.task_controller.abort, taskid)
123
104
124 def barrier(self, taskids):
105 def barrier(self, taskids):
125 """
106 """Block until a set of tasks are completed.
126 Wait for a set of tasks to finish.
127
107
128 :Parameters:
108 :Parameters:
129 taskids : list of ints
109 taskids : list, tuple
130 A list of task ids to wait for.
110 A sequence of taskids to block on.
131 """
111 """
132 return blockingCallFromThread(self.task_controller.barrier, taskids)
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
133
113
134 def spin(self):
114 def spin(self):
135 """
115 """
136 Cause the scheduler to schedule tasks.
116 Touch the scheduler, to resume scheduling without submitting a task.
137
117
138 This method only needs to be called in unusual situations where the
118 This method only needs to be called in unusual situations where the
139 scheduler is idle for some reason.
119 scheduler is idle for some reason.
@@ -154,6 +134,45 b' class BlockingTaskClient(InteractiveTaskClient):'
154 """
134 """
155 return blockingCallFromThread(self.task_controller.queue_status, verbose)
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
156
136
137 def clear(self):
138 """
139 Clear all previously run tasks from the task controller.
140
141 This is needed because the task controller keep all task results
142 in memory. This can be a problem is there are many completed
143 tasks. Users should call this periodically to clean out these
144 cached task results.
145 """
146 return blockingCallFromThread(self.task_controller.clear)
147
148 def map(self, func, *sequences):
149 """
150 Apply func to *sequences elementwise. Like Python's builtin map.
151
152 This version is load balanced.
153 """
154 return self.mapper().map(func, *sequences)
155
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
157 recovery_task=None, depend=None, block=True):
158 """
159 Create an `IMapper` implementer with a given set of arguments.
160
161 The `IMapper` created using a task controller is load balanced.
162
163 See the documentation for `IPython.kernel.task.BaseTask` for
164 documentation on the arguments to this method.
165 """
166 return SynchronousTaskMapper(self, clear_before=clear_before,
167 clear_after=clear_after, retries=retries,
168 recovery_task=recovery_task, depend=depend, block=block)
169
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
171 recovery_task=None, depend=None, block=True):
172 mapper = self.mapper(clear_before, clear_after, retries,
173 recovery_task, depend, block)
174 pf = ParallelFunction(mapper)
175 return pf
157
176
158 components.registerAdapter(BlockingTaskClient,
177 components.registerAdapter(BlockingTaskClient,
159 task.ITaskController, IBlockingTaskClient)
178 task.ITaskController, IBlockingTaskClient)
@@ -34,6 +34,15 b' from IPython.kernel.clientinterfaces import ('
34 IFCClientInterfaceProvider,
34 IFCClientInterfaceProvider,
35 IBlockingClientAdaptor
35 IBlockingClientAdaptor
36 )
36 )
37 from IPython.kernel.mapper import (
38 TaskMapper,
39 ITaskMapperFactory,
40 IMapper
41 )
42 from IPython.kernel.parallelfunction import (
43 ParallelFunction,
44 ITaskParallelDecorator
45 )
37
46
38 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
39 # The Controller side of things
48 # The Controller side of things
@@ -43,32 +52,38 b' from IPython.kernel.clientinterfaces import ('
43 class IFCTaskController(Interface):
52 class IFCTaskController(Interface):
44 """Foolscap interface to task controller.
53 """Foolscap interface to task controller.
45
54
46 See the documentation of ITaskController for documentation about the methods.
55 See the documentation of `ITaskController` for more information.
47 """
56 """
48 def remote_run(request, binTask):
57 def remote_run(binTask):
49 """"""
58 """"""
50
59
51 def remote_abort(request, taskid):
60 def remote_abort(taskid):
52 """"""
61 """"""
53
62
54 def remote_get_task_result(request, taskid, block=False):
63 def remote_get_task_result(taskid, block=False):
55 """"""
64 """"""
56
65
57 def remote_barrier(request, taskids):
66 def remote_barrier(taskids):
58 """"""
67 """"""
59
68
60 def remote_spin(request):
69 def remote_spin():
61 """"""
70 """"""
62
71
63 def remote_queue_status(request, verbose):
72 def remote_queue_status(verbose):
73 """"""
74
75 def remote_clear():
64 """"""
76 """"""
65
77
66
78
67 class FCTaskControllerFromTaskController(Referenceable):
79 class FCTaskControllerFromTaskController(Referenceable):
68 """XML-RPC attachmeot for controller.
80 """
81 Adapt a `TaskController` to an `IFCTaskController`
69
82
70 See IXMLRPCTaskController and ITaskController (and its children) for documentation.
83 This class is used to expose a `TaskController` over the wire using
84 the Foolscap network protocol.
71 """
85 """
86
72 implements(IFCTaskController, IFCClientInterfaceProvider)
87 implements(IFCTaskController, IFCClientInterfaceProvider)
73
88
74 def __init__(self, taskController):
89 def __init__(self, taskController):
@@ -92,8 +107,8 b' class FCTaskControllerFromTaskController(Referenceable):'
92
107
93 def remote_run(self, ptask):
108 def remote_run(self, ptask):
94 try:
109 try:
95 ctask = pickle.loads(ptask)
110 task = pickle.loads(ptask)
96 task = taskmodule.uncanTask(ctask)
111 task.uncan_task()
97 except:
112 except:
98 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
113 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
99 else:
114 else:
@@ -132,6 +147,9 b' class FCTaskControllerFromTaskController(Referenceable):'
132 d.addErrback(self.packageFailure)
147 d.addErrback(self.packageFailure)
133 return d
148 return d
134
149
150 def remote_clear(self):
151 return self.taskController.clear()
152
135 def remote_get_client_name(self):
153 def remote_get_client_name(self):
136 return 'IPython.kernel.taskfc.FCTaskClient'
154 return 'IPython.kernel.taskfc.FCTaskClient'
137
155
@@ -144,13 +162,23 b' components.registerAdapter(FCTaskControllerFromTaskController,'
144 #-------------------------------------------------------------------------------
162 #-------------------------------------------------------------------------------
145
163
146 class FCTaskClient(object):
164 class FCTaskClient(object):
147 """XML-RPC based TaskController client that implements ITaskController.
165 """
166 Client class for Foolscap exposed `TaskController`.
148
167
149 :Parameters:
168 This class is an adapter that makes a `RemoteReference` to a
150 addr : (ip, port)
169 `TaskController` look like an actual `ITaskController` on the client side.
151 The ip (str) and port (int) tuple of the `TaskController`.
170
171 This class also implements `IBlockingClientAdaptor` so that clients can
172 automatically get a blocking version of this class.
152 """
173 """
153 implements(taskmodule.ITaskController, IBlockingClientAdaptor)
174
175 implements(
176 taskmodule.ITaskController,
177 IBlockingClientAdaptor,
178 ITaskMapperFactory,
179 IMapper,
180 ITaskParallelDecorator
181 )
154
182
155 def __init__(self, remote_reference):
183 def __init__(self, remote_reference):
156 self.remote_reference = remote_reference
184 self.remote_reference = remote_reference
@@ -168,48 +196,26 b' class FCTaskClient(object):'
168 def run(self, task):
196 def run(self, task):
169 """Run a task on the `TaskController`.
197 """Run a task on the `TaskController`.
170
198
199 See the documentation of the `MapTask` and `StringTask` classes for
200 details on how to build a task of different types.
201
171 :Parameters:
202 :Parameters:
172 task : a `Task` object
203 task : an `ITask` implementer
173
174 The Task object is created using the following signature:
175
176 Task(expression, pull=None, push={}, clear_before=False,
177 clear_after=False, retries=0, **options):)
178
179 The meaning of the arguments is as follows:
180
181 :Task Parameters:
182 expression : str
183 A str that is valid python code that is the task.
184 pull : str or list of str
185 The names of objects to be pulled as results.
186 push : dict
187 A dict of objects to be pushed into the engines namespace before
188 execution of the expression.
189 clear_before : boolean
190 Should the engine's namespace be cleared before the task is run.
191 Default=False.
192 clear_after : boolean
193 Should the engine's namespace be cleared after the task is run.
194 Default=False.
195 retries : int
196 The number of times to resumbit the task if it fails. Default=0.
197 options : dict
198 Any other keyword options for more elaborate uses of tasks
199
204
200 :Returns: The int taskid of the submitted task. Pass this to
205 :Returns: The int taskid of the submitted task. Pass this to
201 `get_task_result` to get the `TaskResult` object.
206 `get_task_result` to get the `TaskResult` object.
202 """
207 """
203 assert isinstance(task, taskmodule.Task), "task must be a Task object!"
208 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
204 ctask = taskmodule.canTask(task) # handles arbitrary function in .depend
209 task.can_task()
205 # as well as arbitrary recovery_task chains
210 ptask = pickle.dumps(task, 2)
206 ptask = pickle.dumps(ctask, 2)
211 task.uncan_task()
207 d = self.remote_reference.callRemote('run', ptask)
212 d = self.remote_reference.callRemote('run', ptask)
208 d.addCallback(self.unpackage)
213 d.addCallback(self.unpackage)
209 return d
214 return d
210
215
211 def get_task_result(self, taskid, block=False):
216 def get_task_result(self, taskid, block=False):
212 """The task result by taskid.
217 """
218 Get a task result by taskid.
213
219
214 :Parameters:
220 :Parameters:
215 taskid : int
221 taskid : int
@@ -224,20 +230,19 b' class FCTaskClient(object):'
224 return d
230 return d
225
231
226 def abort(self, taskid):
232 def abort(self, taskid):
227 """Abort a task by taskid.
233 """
234 Abort a task by taskid.
228
235
229 :Parameters:
236 :Parameters:
230 taskid : int
237 taskid : int
231 The taskid of the task to be aborted.
238 The taskid of the task to be aborted.
232 block : boolean
233 Should I block until the task is aborted.
234 """
239 """
235 d = self.remote_reference.callRemote('abort', taskid)
240 d = self.remote_reference.callRemote('abort', taskid)
236 d.addCallback(self.unpackage)
241 d.addCallback(self.unpackage)
237 return d
242 return d
238
243
239 def barrier(self, taskids):
244 def barrier(self, taskids):
240 """Block until all tasks are completed.
245 """Block until a set of tasks are completed.
241
246
242 :Parameters:
247 :Parameters:
243 taskids : list, tuple
248 taskids : list, tuple
@@ -248,20 +253,77 b' class FCTaskClient(object):'
248 return d
253 return d
249
254
250 def spin(self):
255 def spin(self):
251 """touch the scheduler, to resume scheduling without submitting
256 """
252 a task.
257 Touch the scheduler, to resume scheduling without submitting a task.
258
259 This method only needs to be called in unusual situations where the
260 scheduler is idle for some reason.
253 """
261 """
254 d = self.remote_reference.callRemote('spin')
262 d = self.remote_reference.callRemote('spin')
255 d.addCallback(self.unpackage)
263 d.addCallback(self.unpackage)
256 return d
264 return d
257
265
258 def queue_status(self, verbose=False):
266 def queue_status(self, verbose=False):
259 """Return a dict with the status of the task queue."""
267 """
268 Get a dictionary with the current state of the task queue.
269
270 :Parameters:
271 verbose : boolean
272 If True, return a list of taskids. If False, simply give
273 the number of tasks with each status.
274
275 :Returns:
276 A dict with the queue status.
277 """
260 d = self.remote_reference.callRemote('queue_status', verbose)
278 d = self.remote_reference.callRemote('queue_status', verbose)
261 d.addCallback(self.unpackage)
279 d.addCallback(self.unpackage)
262 return d
280 return d
263
281
282 def clear(self):
283 """
284 Clear all previously run tasks from the task controller.
285
286 This is needed because the task controller keep all task results
287 in memory. This can be a problem is there are many completed
288 tasks. Users should call this periodically to clean out these
289 cached task results.
290 """
291 d = self.remote_reference.callRemote('clear')
292 return d
293
264 def adapt_to_blocking_client(self):
294 def adapt_to_blocking_client(self):
295 """
296 Wrap self in a blocking version that implements `IBlockingTaskClient.
297 """
265 from IPython.kernel.taskclient import IBlockingTaskClient
298 from IPython.kernel.taskclient import IBlockingTaskClient
266 return IBlockingTaskClient(self)
299 return IBlockingTaskClient(self)
267
300
301 def map(self, func, *sequences):
302 """
303 Apply func to *sequences elementwise. Like Python's builtin map.
304
305 This version is load balanced.
306 """
307 return self.mapper().map(func, *sequences)
308
309 def mapper(self, clear_before=False, clear_after=False, retries=0,
310 recovery_task=None, depend=None, block=True):
311 """
312 Create an `IMapper` implementer with a given set of arguments.
313
314 The `IMapper` created using a task controller is load balanced.
315
316 See the documentation for `IPython.kernel.task.BaseTask` for
317 documentation on the arguments to this method.
318 """
319 return TaskMapper(self, clear_before=clear_before,
320 clear_after=clear_after, retries=retries,
321 recovery_task=recovery_task, depend=depend, block=block)
322
323 def parallel(self, clear_before=False, clear_after=False, retries=0,
324 recovery_task=None, depend=None, block=True):
325 mapper = self.mapper(clear_before, clear_after, retries,
326 recovery_task, depend, block)
327 pf = ParallelFunction(mapper)
328 return pf
329
@@ -163,7 +163,6 b' class IEngineCoreTestCase(object):'
163 try:
163 try:
164 import numpy
164 import numpy
165 except:
165 except:
166 print 'no numpy, ',
167 return
166 return
168 a = numpy.random.random(1000)
167 a = numpy.random.random(1000)
169 d = self.engine.push(dict(a=a))
168 d = self.engine.push(dict(a=a))
@@ -750,16 +750,6 b' class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase'
750 d.addCallback(lambda r: assert_array_equal(r, a))
750 d.addCallback(lambda r: assert_array_equal(r, a))
751 return d
751 return d
752
752
753 def testMapNonblocking(self):
754 self.addEngine(4)
755 def f(x):
756 return x**2
757 data = range(16)
758 d= self.multiengine.map(f, data, block=False)
759 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
760 d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data]))
761 return d
762
763 def test_clear_pending_deferreds(self):
753 def test_clear_pending_deferreds(self):
764 self.addEngine(4)
754 self.addEngine(4)
765 did_list = []
755 did_list = []
@@ -43,23 +43,23 b' class TaskTestBase(object):'
43
43
44 class ITaskControllerTestCase(TaskTestBase):
44 class ITaskControllerTestCase(TaskTestBase):
45
45
46 def testTaskIDs(self):
46 def test_task_ids(self):
47 self.addEngine(1)
47 self.addEngine(1)
48 d = self.tc.run(task.Task('a=5'))
48 d = self.tc.run(task.StringTask('a=5'))
49 d.addCallback(lambda r: self.assertEquals(r, 0))
49 d.addCallback(lambda r: self.assertEquals(r, 0))
50 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
50 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
51 d.addCallback(lambda r: self.assertEquals(r, 1))
51 d.addCallback(lambda r: self.assertEquals(r, 1))
52 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
52 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
53 d.addCallback(lambda r: self.assertEquals(r, 2))
53 d.addCallback(lambda r: self.assertEquals(r, 2))
54 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
54 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
55 d.addCallback(lambda r: self.assertEquals(r, 3))
55 d.addCallback(lambda r: self.assertEquals(r, 3))
56 return d
56 return d
57
57
58 def testAbort(self):
58 def test_abort(self):
59 """Cannot do a proper abort test, because blocking execution prevents
59 """Cannot do a proper abort test, because blocking execution prevents
60 abort from being called before task completes"""
60 abort from being called before task completes"""
61 self.addEngine(1)
61 self.addEngine(1)
62 t = task.Task('a=5')
62 t = task.StringTask('a=5')
63 d = self.tc.abort(0)
63 d = self.tc.abort(0)
64 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
64 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
65 d.addCallback(lambda _:self.tc.run(t))
65 d.addCallback(lambda _:self.tc.run(t))
@@ -67,15 +67,15 b' class ITaskControllerTestCase(TaskTestBase):'
67 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
67 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
68 return d
68 return d
69
69
70 def testAbortType(self):
70 def test_abort_type(self):
71 self.addEngine(1)
71 self.addEngine(1)
72 d = self.tc.abort('asdfadsf')
72 d = self.tc.abort('asdfadsf')
73 d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException))
73 d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException))
74 return d
74 return d
75
75
76 def testClears(self):
76 def test_clear_before_and_after(self):
77 self.addEngine(1)
77 self.addEngine(1)
78 t = task.Task('a=1', clear_before=True, pull='b', clear_after=True)
78 t = task.StringTask('a=1', clear_before=True, pull='b', clear_after=True)
79 d = self.multiengine.execute('b=1', targets=0)
79 d = self.multiengine.execute('b=1', targets=0)
80 d.addCallback(lambda _: self.tc.run(t))
80 d.addCallback(lambda _: self.tc.run(t))
81 d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True))
81 d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True))
@@ -85,10 +85,10 b' class ITaskControllerTestCase(TaskTestBase):'
85 d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
85 d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
86 return d
86 return d
87
87
88 def testSimpleRetries(self):
88 def test_simple_retries(self):
89 self.addEngine(1)
89 self.addEngine(1)
90 t = task.Task("i += 1\nassert i == 16", pull='i',retries=10)
90 t = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10)
91 t2 = task.Task("i += 1\nassert i == 16", pull='i',retries=10)
91 t2 = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10)
92 d = self.multiengine.execute('i=0', targets=0)
92 d = self.multiengine.execute('i=0', targets=0)
93 d.addCallback(lambda r: self.tc.run(t))
93 d.addCallback(lambda r: self.tc.run(t))
94 d.addCallback(self.tc.get_task_result, block=True)
94 d.addCallback(self.tc.get_task_result, block=True)
@@ -101,10 +101,10 b' class ITaskControllerTestCase(TaskTestBase):'
101 d.addCallback(lambda r: self.assertEquals(r, 16))
101 d.addCallback(lambda r: self.assertEquals(r, 16))
102 return d
102 return d
103
103
104 def testRecoveryTasks(self):
104 def test_recovery_tasks(self):
105 self.addEngine(1)
105 self.addEngine(1)
106 t = task.Task("i=16", pull='i')
106 t = task.StringTask("i=16", pull='i')
107 t2 = task.Task("raise Exception", recovery_task=t, retries = 2)
107 t2 = task.StringTask("raise Exception", recovery_task=t, retries = 2)
108
108
109 d = self.tc.run(t2)
109 d = self.tc.run(t2)
110 d.addCallback(self.tc.get_task_result, block=True)
110 d.addCallback(self.tc.get_task_result, block=True)
@@ -112,47 +112,76 b' class ITaskControllerTestCase(TaskTestBase):'
112 d.addCallback(lambda r: self.assertEquals(r, 16))
112 d.addCallback(lambda r: self.assertEquals(r, 16))
113 return d
113 return d
114
114
115 # def testInfiniteRecoveryLoop(self):
115 def test_setup_ns(self):
116 # self.addEngine(1)
117 # t = task.Task("raise Exception", retries = 5)
118 # t2 = task.Task("assert True", retries = 2, recovery_task = t)
119 # t.recovery_task = t2
120 #
121 # d = self.tc.run(t)
122 # d.addCallback(self.tc.get_task_result, block=True)
123 # d.addCallback(lambda tr: tr.ns.i)
124 # d.addBoth(printer)
125 # d.addErrback(lambda f: self.assertRaises(AssertionError, f.raiseException))
126 # return d
127 #
128 def testSetupNS(self):
129 self.addEngine(1)
116 self.addEngine(1)
130 d = self.multiengine.execute('a=0', targets=0)
117 d = self.multiengine.execute('a=0', targets=0)
131 ns = dict(a=1, b=0)
118 ns = dict(a=1, b=0)
132 t = task.Task("", push=ns, pull=['a','b'])
119 t = task.StringTask("", push=ns, pull=['a','b'])
133 d.addCallback(lambda r: self.tc.run(t))
120 d.addCallback(lambda r: self.tc.run(t))
134 d.addCallback(self.tc.get_task_result, block=True)
121 d.addCallback(self.tc.get_task_result, block=True)
135 d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']})
122 d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']})
136 d.addCallback(lambda r: self.assertEquals(r, ns))
123 d.addCallback(lambda r: self.assertEquals(r, ns))
137 return d
124 return d
138
125
139 def testTaskResults(self):
126 def test_string_task_results(self):
140 self.addEngine(1)
127 self.addEngine(1)
141 t1 = task.Task('a=5', pull='a')
128 t1 = task.StringTask('a=5', pull='a')
142 d = self.tc.run(t1)
129 d = self.tc.run(t1)
143 d.addCallback(self.tc.get_task_result, block=True)
130 d.addCallback(self.tc.get_task_result, block=True)
144 d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raiseException()))
131 d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raise_exception()))
145 d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None)))
132 d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None)))
146
133
147 t2 = task.Task('7=5')
134 t2 = task.StringTask('7=5')
148 d.addCallback(lambda r: self.tc.run(t2))
135 d.addCallback(lambda r: self.tc.run(t2))
149 d.addCallback(self.tc.get_task_result, block=True)
136 d.addCallback(self.tc.get_task_result, block=True)
150 d.addCallback(lambda tr: tr.ns)
137 d.addCallback(lambda tr: tr.ns)
151 d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException))
138 d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException))
152
139
153 t3 = task.Task('', pull='b')
140 t3 = task.StringTask('', pull='b')
154 d.addCallback(lambda r: self.tc.run(t3))
141 d.addCallback(lambda r: self.tc.run(t3))
155 d.addCallback(self.tc.get_task_result, block=True)
142 d.addCallback(self.tc.get_task_result, block=True)
156 d.addCallback(lambda tr: tr.ns)
143 d.addCallback(lambda tr: tr.ns)
157 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
144 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
158 return d
145 return d
146
147 def test_map_task(self):
148 self.addEngine(1)
149 t1 = task.MapTask(lambda x: 2*x,(10,))
150 d = self.tc.run(t1)
151 d.addCallback(self.tc.get_task_result, block=True)
152 d.addCallback(lambda r: self.assertEquals(r,20))
153
154 t2 = task.MapTask(lambda : 20)
155 d.addCallback(lambda _: self.tc.run(t2))
156 d.addCallback(self.tc.get_task_result, block=True)
157 d.addCallback(lambda r: self.assertEquals(r,20))
158
159 t3 = task.MapTask(lambda x: x,(),{'x':20})
160 d.addCallback(lambda _: self.tc.run(t3))
161 d.addCallback(self.tc.get_task_result, block=True)
162 d.addCallback(lambda r: self.assertEquals(r,20))
163 return d
164
165 def test_map_task_failure(self):
166 self.addEngine(1)
167 t1 = task.MapTask(lambda x: 1/0,(10,))
168 d = self.tc.run(t1)
169 d.addCallback(self.tc.get_task_result, block=True)
170 d.addErrback(lambda f: self.assertRaises(ZeroDivisionError, f.raiseException))
171 return d
172
173 def test_map_task_args(self):
174 self.assertRaises(TypeError, task.MapTask, 'asdfasdf')
175 self.assertRaises(TypeError, task.MapTask, lambda x: x, 10)
176 self.assertRaises(TypeError, task.MapTask, lambda x: x, (10,),30)
177
178 def test_clear(self):
179 self.addEngine(1)
180 t1 = task.MapTask(lambda x: 2*x,(10,))
181 d = self.tc.run(t1)
182 d.addCallback(lambda _: self.tc.get_task_result(0, block=True))
183 d.addCallback(lambda r: self.assertEquals(r,20))
184 d.addCallback(lambda _: self.tc.clear())
185 d.addCallback(lambda _: self.tc.get_task_result(0, block=True))
186 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
187 return d
@@ -38,7 +38,7 b' try:'
38 IEngineQueuedTestCase
38 IEngineQueuedTestCase
39 except ImportError:
39 except ImportError:
40 print "we got an error!!!"
40 print "we got an error!!!"
41 pass
41 raise
42 else:
42 else:
43 class EngineFCTest(DeferredTestCase,
43 class EngineFCTest(DeferredTestCase,
44 IEngineCoreTestCase,
44 IEngineCoreTestCase,
@@ -26,9 +26,20 b' try:'
26 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
26 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
27 from IPython.kernel import multiengine as me
27 from IPython.kernel import multiengine as me
28 from IPython.kernel.clientconnector import ClientConnector
28 from IPython.kernel.clientconnector import ClientConnector
29 from IPython.kernel.parallelfunction import ParallelFunction
30 from IPython.kernel.error import CompositeError
31 from IPython.kernel.util import printer
29 except ImportError:
32 except ImportError:
30 pass
33 pass
31 else:
34 else:
35
36 def _raise_it(f):
37 try:
38 f.raiseException()
39 except CompositeError, e:
40 e.raise_exception()
41
42
32 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
43 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
33
44
34 def setUp(self):
45 def setUp(self):
@@ -68,3 +79,66 b' else:'
68 d.addBoth(lambda _: self.controller.stopService())
79 d.addBoth(lambda _: self.controller.stopService())
69 dlist.append(d)
80 dlist.append(d)
70 return defer.DeferredList(dlist)
81 return defer.DeferredList(dlist)
82
83 def test_mapper(self):
84 self.addEngine(4)
85 m = self.multiengine.mapper()
86 self.assertEquals(m.multiengine,self.multiengine)
87 self.assertEquals(m.dist,'b')
88 self.assertEquals(m.targets,'all')
89 self.assertEquals(m.block,True)
90
91 def test_map_default(self):
92 self.addEngine(4)
93 m = self.multiengine.mapper()
94 d = m.map(lambda x: 2*x, range(10))
95 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
96 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
97 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
98 return d
99
100 def test_map_noblock(self):
101 self.addEngine(4)
102 m = self.multiengine.mapper(block=False)
103 d = m.map(lambda x: 2*x, range(10))
104 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
105 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 return d
107
108 def test_mapper_fail(self):
109 self.addEngine(4)
110 m = self.multiengine.mapper()
111 d = m.map(lambda x: 1/0, range(10))
112 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
113 return d
114
115 def test_parallel(self):
116 self.addEngine(4)
117 p = self.multiengine.parallel()
118 self.assert_(isinstance(p, ParallelFunction))
119 @p
120 def f(x): return 2*x
121 d = f(range(10))
122 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
123 return d
124
125 def test_parallel_noblock(self):
126 self.addEngine(1)
127 p = self.multiengine.parallel(block=False)
128 self.assert_(isinstance(p, ParallelFunction))
129 @p
130 def f(x): return 2*x
131 d = f(range(10))
132 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
133 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
134 return d
135
136 def test_parallel_fail(self):
137 self.addEngine(4)
138 p = self.multiengine.parallel()
139 self.assert_(isinstance(p, ParallelFunction))
140 @p
141 def f(x): return 1/0
142 d = f(range(10))
143 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
144 return d No newline at end of file
@@ -20,8 +20,6 b' try:'
20 from twisted.internet import defer
20 from twisted.internet import defer
21 from twisted.python import failure
21 from twisted.python import failure
22
22
23 from IPython.testing import tcommon
24 from IPython.testing.tcommon import *
25 from IPython.testing.util import DeferredTestCase
23 from IPython.testing.util import DeferredTestCase
26 import IPython.kernel.pendingdeferred as pd
24 import IPython.kernel.pendingdeferred as pd
27 from IPython.kernel import error
25 from IPython.kernel import error
@@ -30,25 +28,6 b' except ImportError:'
30 pass
28 pass
31 else:
29 else:
32
30
33 #-------------------------------------------------------------------------------
34 # Setup for inline and standalone doctests
35 #-------------------------------------------------------------------------------
36
37
38 # If you have standalone doctests in a separate file, set their names in the
39 # dt_files variable (as a single string or a list thereof):
40 dt_files = []
41
42 # If you have any modules whose docstrings should be scanned for embedded tests
43 # as examples accorging to standard doctest practice, set them here (as a
44 # single string or a list thereof):
45 dt_modules = []
46
47 #-------------------------------------------------------------------------------
48 # Regular Unittests
49 #-------------------------------------------------------------------------------
50
51
52 class Foo(object):
31 class Foo(object):
53
32
54 def bar(self, bahz):
33 def bar(self, bahz):
@@ -205,14 +184,3 b' else:'
205 d3 = self.pdm.get_pending_deferred(did,False)
184 d3 = self.pdm.get_pending_deferred(did,False)
206 d3.addCallback(lambda r: self.assertEquals(r,'bar'))
185 d3.addCallback(lambda r: self.assertEquals(r,'bar'))
207
186
208 #-------------------------------------------------------------------------------
209 # Regular Unittests
210 #-------------------------------------------------------------------------------
211
212 # This ensures that the code will run either standalone as a script, or that it
213 # can be picked up by Twisted's `trial` test wrapper to run all the tests.
214 if tcommon.pexpect is not None:
215 if __name__ == '__main__':
216 unittest.main(testLoader=IPDocTestLoader(dt_files,dt_modules))
217 else:
218 testSuite = lambda : makeTestSuite(__name__,dt_files,dt_modules)
@@ -30,6 +30,8 b' try:'
30 from IPython.kernel.util import printer
30 from IPython.kernel.util import printer
31 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
31 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
32 from IPython.kernel.clientconnector import ClientConnector
32 from IPython.kernel.clientconnector import ClientConnector
33 from IPython.kernel.error import CompositeError
34 from IPython.kernel.parallelfunction import ParallelFunction
33 except ImportError:
35 except ImportError:
34 pass
36 pass
35 else:
37 else:
@@ -38,6 +40,12 b' else:'
38 # Tests
40 # Tests
39 #-------------------------------------------------------------------------------
41 #-------------------------------------------------------------------------------
40
42
43 def _raise_it(f):
44 try:
45 f.raiseException()
46 except CompositeError, e:
47 e.raise_exception()
48
41 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
42
50
43 def setUp(self):
51 def setUp(self):
@@ -88,3 +96,66 b' else:'
88 dlist.append(d)
96 dlist.append(d)
89 return defer.DeferredList(dlist)
97 return defer.DeferredList(dlist)
90
98
99 def test_mapper(self):
100 self.addEngine(1)
101 m = self.tc.mapper()
102 self.assertEquals(m.task_controller,self.tc)
103 self.assertEquals(m.clear_before,False)
104 self.assertEquals(m.clear_after,False)
105 self.assertEquals(m.retries,0)
106 self.assertEquals(m.recovery_task,None)
107 self.assertEquals(m.depend,None)
108 self.assertEquals(m.block,True)
109
110 def test_map_default(self):
111 self.addEngine(1)
112 m = self.tc.mapper()
113 d = m.map(lambda x: 2*x, range(10))
114 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
117 return d
118
119 def test_map_noblock(self):
120 self.addEngine(1)
121 m = self.tc.mapper(block=False)
122 d = m.map(lambda x: 2*x, range(10))
123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
124 return d
125
126 def test_mapper_fail(self):
127 self.addEngine(1)
128 m = self.tc.mapper()
129 d = m.map(lambda x: 1/0, range(10))
130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
131 return d
132
133 def test_parallel(self):
134 self.addEngine(1)
135 p = self.tc.parallel()
136 self.assert_(isinstance(p, ParallelFunction))
137 @p
138 def f(x): return 2*x
139 d = f(range(10))
140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
141 return d
142
143 def test_parallel_noblock(self):
144 self.addEngine(1)
145 p = self.tc.parallel(block=False)
146 self.assert_(isinstance(p, ParallelFunction))
147 @p
148 def f(x): return 2*x
149 d = f(range(10))
150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
151 return d
152
153 def test_parallel_fail(self):
154 self.addEngine(1)
155 p = self.tc.parallel()
156 self.assert_(isinstance(p, ParallelFunction))
157 @p
158 def f(x): return 1/0
159 d = f(range(10))
160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
161 return d No newline at end of file
@@ -3,13 +3,8 b''
3
3
4 Importing this module should give you the implementations that are correct
4 Importing this module should give you the implementations that are correct
5 for your operation system, from platutils_PLATFORMNAME module.
5 for your operation system, from platutils_PLATFORMNAME module.
6
7 $Id: ipstruct.py 1005 2006-01-12 08:39:26Z fperez $
8
9
10 """
6 """
11
7
12
13 #*****************************************************************************
8 #*****************************************************************************
14 # Copyright (C) 2001-2006 Fernando Perez <fperez@colorado.edu>
9 # Copyright (C) 2001-2006 Fernando Perez <fperez@colorado.edu>
15 #
10 #
@@ -21,15 +16,32 b' from IPython import Release'
21 __author__ = '%s <%s>' % Release.authors['Ville']
16 __author__ = '%s <%s>' % Release.authors['Ville']
22 __license__ = Release.license
17 __license__ = Release.license
23
18
24 import os,sys
19 import os
20 import sys
25
21
22 # Import the platform-specific implementations
26 if os.name == 'posix':
23 if os.name == 'posix':
27 from platutils_posix import *
24 import platutils_posix as _platutils
28 elif sys.platform == 'win32':
25 elif sys.platform == 'win32':
29 from platutils_win32 import *
26 import platutils_win32 as _platutils
30 else:
27 else:
31 from platutils_dummy import *
28 import platutils_dummy as _platutils
32 import warnings
29 import warnings
33 warnings.warn("Platutils not available for platform '%s', some features may be missing" %
30 warnings.warn("Platutils not available for platform '%s', some features may be missing" %
34 os.name)
31 os.name)
35 del warnings
32 del warnings
33
34
35 # Functionality that's logically common to all platforms goes here, each
36 # platform-specific module only provides the bits that are OS-dependent.
37
38 def freeze_term_title():
39 _platutils.ignore_termtitle = True
40
41
42 def set_term_title(title):
43 """Set terminal title using the necessary platform-dependent calls."""
44
45 if _platutils.ignore_termtitle:
46 return
47 _platutils.set_term_title(title)
@@ -3,13 +3,8 b''
3
3
4 This has empty implementation of the platutils functions, used for
4 This has empty implementation of the platutils functions, used for
5 unsupported operating systems.
5 unsupported operating systems.
6
7 $Id: ipstruct.py 1005 2006-01-12 08:39:26Z fperez $
8
9
10 """
6 """
11
7
12
13 #*****************************************************************************
8 #*****************************************************************************
14 # Copyright (C) 2001-2006 Fernando Perez <fperez@colorado.edu>
9 # Copyright (C) 2001-2006 Fernando Perez <fperez@colorado.edu>
15 #
10 #
@@ -21,9 +16,9 b' from IPython import Release'
21 __author__ = '%s <%s>' % Release.authors['Ville']
16 __author__ = '%s <%s>' % Release.authors['Ville']
22 __license__ = Release.license
17 __license__ = Release.license
23
18
19 # This variable is part of the expected API of the module:
20 ignore_termtitle = True
24
21
25 def _dummy(*args,**kw):
22 def set_term_title(*args,**kw):
23 """Dummy no-op."""
26 pass
24 pass
27
28 set_term_title = _dummy
29
@@ -3,12 +3,8 b''
3
3
4 Importing this module directly is not portable - rather, import platutils
4 Importing this module directly is not portable - rather, import platutils
5 to use these functions in platform agnostic fashion.
5 to use these functions in platform agnostic fashion.
6
7 $Id: ipstruct.py 1005 2006-01-12 08:39:26Z fperez $
8
9 """
6 """
10
7
11
12 #*****************************************************************************
8 #*****************************************************************************
13 # Copyright (C) 2001-2006 Fernando Perez <fperez@colorado.edu>
9 # Copyright (C) 2001-2006 Fernando Perez <fperez@colorado.edu>
14 #
10 #
@@ -31,9 +27,6 b' def _dummy_op(*a, **b):'
31 def _set_term_title_xterm(title):
27 def _set_term_title_xterm(title):
32 """ Change virtual terminal title in xterm-workalikes """
28 """ Change virtual terminal title in xterm-workalikes """
33
29
34 if ignore_termtitle:
35 return
36
37 sys.stdout.write('\033]%d;%s\007' % (0,title))
30 sys.stdout.write('\033]%d;%s\007' % (0,title))
38
31
39
32
@@ -41,7 +34,3 b" if os.environ.get('TERM','') == 'xterm':"
41 set_term_title = _set_term_title_xterm
34 set_term_title = _set_term_title_xterm
42 else:
35 else:
43 set_term_title = _dummy_op
36 set_term_title = _dummy_op
44
45 def freeze_term_title():
46 global ignore_termtitle
47 ignore_termtitle = True
@@ -3,12 +3,8 b''
3
3
4 Importing this module directly is not portable - rather, import platutils
4 Importing this module directly is not portable - rather, import platutils
5 to use these functions in platform agnostic fashion.
5 to use these functions in platform agnostic fashion.
6
7 $Id: ipstruct.py 1005 2006-01-12 08:39:26Z fperez $
8
9 """
6 """
10
7
11
12 #*****************************************************************************
8 #*****************************************************************************
13 # Copyright (C) 2001-2006 Fernando Perez <fperez@colorado.edu>
9 # Copyright (C) 2001-2006 Fernando Perez <fperez@colorado.edu>
14 #
10 #
@@ -22,35 +18,30 b' __license__ = Release.license'
22
18
23 import os
19 import os
24
20
25 ignore_termtitle = 0
21 ignore_termtitle = False
26
22
27 try:
23 try:
28 import ctypes
24 import ctypes
25
29 SetConsoleTitleW=ctypes.windll.kernel32.SetConsoleTitleW
26 SetConsoleTitleW = ctypes.windll.kernel32.SetConsoleTitleW
30 SetConsoleTitleW.argtypes=[ctypes.c_wchar_p]
27 SetConsoleTitleW.argtypes = [ctypes.c_wchar_p]
31 def _set_term_title(title):
28
32 """ Set terminal title using the ctypes"""
29 def set_term_title(title):
30 """Set terminal title using ctypes to access the Win32 APIs."""
33 SetConsoleTitleW(title)
31 SetConsoleTitleW(title)
34
32
35 except ImportError:
33 except ImportError:
36 def _set_term_title(title):
34 def set_term_title(title):
37 """ Set terminal title using the 'title' command """
35 """Set terminal title using the 'title' command."""
36 global ignore_termtitle
37
38 try:
39 # Cannot be on network share when issuing system commands
38 curr=os.getcwd()
40 curr = os.getcwd()
39 os.chdir("C:") #Cannot be on network share when issuing system commands
41 os.chdir("C:")
40 ret = os.system("title " + title)
42 ret = os.system("title " + title)
43 finally:
41 os.chdir(curr)
44 os.chdir(curr)
42 if ret:
45 if ret:
43 ignore_termtitle = 1
46 # non-zero return code signals error, don't try again
44
47 ignore_termtitle = True
45 def set_term_title(title):
46 """ Set terminal title using the 'title' command """
47 global ignore_termtitle
48
49 if ignore_termtitle:
50 return
51 _set_term_title(title)
52
53 def freeze_term_title():
54 global ignore_termtitle
55 ignore_termtitle = 1
56
1 NO CONTENT: file renamed from IPython/testing/ipdoctest.py to IPython/testing/attic/ipdoctest.py
NO CONTENT: file renamed from IPython/testing/ipdoctest.py to IPython/testing/attic/ipdoctest.py
1 NO CONTENT: file renamed from IPython/testing/tcommon.py to IPython/testing/attic/tcommon.py
NO CONTENT: file renamed from IPython/testing/tcommon.py to IPython/testing/attic/tcommon.py
1 NO CONTENT: file renamed from IPython/testing/testTEMPLATE.py to IPython/testing/attic/testTEMPLATE.py
NO CONTENT: file renamed from IPython/testing/testTEMPLATE.py to IPython/testing/attic/testTEMPLATE.py
1 NO CONTENT: file renamed from IPython/testing/tstTEMPLATE_doctest.py to IPython/testing/attic/tstTEMPLATE_doctest.py
NO CONTENT: file renamed from IPython/testing/tstTEMPLATE_doctest.py to IPython/testing/attic/tstTEMPLATE_doctest.py
1 NO CONTENT: file renamed from IPython/testing/tstTEMPLATE_doctest.txt to IPython/testing/attic/tstTEMPLATE_doctest.txt
NO CONTENT: file renamed from IPython/testing/tstTEMPLATE_doctest.txt to IPython/testing/attic/tstTEMPLATE_doctest.txt
@@ -1,6 +1,20 b''
1 """Utilities for testing code.
1 """DEPRECATED - use IPython.testing.util instead.
2
3 Utilities for testing code.
2 """
4 """
3
5
6 #############################################################################
7
8 # This was old testing code we never really used in IPython. The pieces of
9 # testing machinery from snakeoil that were good have already been merged into
10 # the nose plugin, so this can be taken away soon. Leave a warning for now,
11 # we'll remove it in a later release (around 0.10 or so).
12 from warnings import warn
13 warn('This will be removed soon. Use IPython.testing.util instead',
14 DeprecationWarning)
15
16 #############################################################################
17
4 # Required modules and packages
18 # Required modules and packages
5
19
6 # Standard Python lib
20 # Standard Python lib
@@ -1,10 +0,0 b''
1 # encoding: utf-8
2 __docformat__ = "restructuredtext en"
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2005 Fernando Perez <fperez@colorado.edu>
5 # Brian E Granger <ellisonbg@gmail.com>
6 # Benjamin Ragan-Kelley <benjaminrk@gmail.com>
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
10 #-------------------------------------------------------------------------------
@@ -3,22 +3,8 b''
3 =========================================
3 =========================================
4
4
5 The way doctest loads these, the entire document is applied as a single test
5 The way doctest loads these, the entire document is applied as a single test
6 rather than multiple individual ones, unfortunately.
6 rather than multiple individual ones, unfortunately::
7
7
8
9 Auto-generated tests
10 ====================
11
12
13 ----------------------------------------------------------------------------
14
15 Begin included file tst_tools_utils_doctest2.py::
16
17 # Setup - all imports are done in tcommon
18 >>> from IPython.testing import tcommon
19 >>> from IPython.testing.tcommon import *
20
21 # Doctest code begins here
22 >>> from IPython.tools import utils
8 >>> from IPython.tools import utils
23
9
24 # Some other tests for utils
10 # Some other tests for utils
@@ -29,14 +15,3 b' Begin included file tst_tools_utils_doctest2.py::'
29 >>> utils.marquee('Another test',30,'.')
15 >>> utils.marquee('Another test',30,'.')
30 '........ Another test ........'
16 '........ Another test ........'
31
17
32
33 End included file tst_tools_utils_doctest2.py
34
35 ----------------------------------------------------------------------------
36
37
38
39 Manually generated tests
40 ========================
41
42 These are one-off tests written by hand, copied from an interactive prompt.
@@ -31,13 +31,20 b''
31 ;; To start an interactive ipython session run `py-shell' with ``M-x py-shell``
31 ;; To start an interactive ipython session run `py-shell' with ``M-x py-shell``
32 ;; (or the default keybinding ``C-c C-!``).
32 ;; (or the default keybinding ``C-c C-!``).
33 ;;
33 ;;
34 ;; You can customize the arguments passed to the IPython instance at startup by
35 ;; setting the ``py-python-command-args`` variable. For example, to start
36 ;; always in ``pylab`` mode with hardcoded light-background colors, you can
37 ;; use::
38 ;;
39 ;; (setq py-python-command-args '("-pylab" "-colors" "LightBG"))
40 ;;
41 ;;
34 ;; NOTE: This mode is currently somewhat alpha and although I hope that it
42 ;; NOTE: This mode is currently somewhat alpha and although I hope that it
35 ;; will work fine for most cases, doing certain things (like the
43 ;; will work fine for most cases, doing certain things (like the
36 ;; autocompletion and a decent scheme to switch between python interpreters)
44 ;; autocompletion and a decent scheme to switch between python interpreters)
37 ;; properly will also require changes to ipython that will likely have to wait
45 ;; properly will also require changes to ipython that will likely have to wait
38 ;; for a larger rewrite scheduled some time in the future.
46 ;; for a larger rewrite scheduled some time in the future.
39 ;;
47 ;;
40 ;; Also note that you currently NEED THE CVS VERSION OF PYTHON.EL.
41 ;;
48 ;;
42 ;; Further note that I don't know whether this runs under windows or not and
49 ;; Further note that I don't know whether this runs under windows or not and
43 ;; that if it doesn't I can't really help much, not being afflicted myself.
50 ;; that if it doesn't I can't really help much, not being afflicted myself.
@@ -46,14 +53,15 b''
46 ;; Hints for effective usage
53 ;; Hints for effective usage
47 ;; -------------------------
54 ;; -------------------------
48 ;;
55 ;;
49 ;; - IMO the best feature by far of the ipython/emacs combo is how much easier it
56 ;; - IMO the best feature by far of the ipython/emacs combo is how much easier
50 ;; makes it to find and fix bugs thanks to the ``%pdb on``/ pdbtrack combo. Try
57 ;; it makes it to find and fix bugs thanks to the ``%pdb on or %debug``/
51 ;; it: first in the ipython to shell do ``%pdb on`` then do something that will
58 ;; pdbtrack combo. Try it: first in the ipython to shell do ``%pdb on`` then
52 ;; raise an exception (FIXME nice example) -- and be amazed how easy it is to
59 ;; do something that will raise an exception (FIXME nice example), or type
53 ;; inspect the live objects in each stack frames and to jump to the
60 ;; ``%debug`` after the exception has been raised. YOu'll be amazed at how
54 ;; corresponding sourcecode locations as you walk up and down the stack trace
61 ;; easy it is to inspect the live objects in each stack frames and to jump to
55 ;; (even without ``%pdb on`` you can always use ``C-c -`` (`py-up-exception')
62 ;; the corresponding sourcecode locations as you walk up and down the stack
56 ;; to jump to the corresponding source code locations).
63 ;; trace (even without ``%pdb on`` you can always use ``C-c -``
64 ;; (`py-up-exception') to jump to the corresponding source code locations).
57 ;;
65 ;;
58 ;; - emacs gives you much more powerful commandline editing and output searching
66 ;; - emacs gives you much more powerful commandline editing and output searching
59 ;; capabilities than ipython-standalone -- isearch is your friend if you
67 ;; capabilities than ipython-standalone -- isearch is your friend if you
@@ -79,7 +87,7 b''
79 ;; variables comes later).
87 ;; variables comes later).
80 ;;
88 ;;
81 ;; Please send comments and feedback to the ipython-list
89 ;; Please send comments and feedback to the ipython-list
82 ;; (<ipython-user@scipy.net>) where I (a.s.) or someone else will try to
90 ;; (<ipython-user@scipy.org>) where I (a.s.) or someone else will try to
83 ;; answer them (it helps if you specify your emacs version, OS etc;
91 ;; answer them (it helps if you specify your emacs version, OS etc;
84 ;; familiarity with <http://www.catb.org/~esr/faqs/smart-questions.html> might
92 ;; familiarity with <http://www.catb.org/~esr/faqs/smart-questions.html> might
85 ;; speed up things further).
93 ;; speed up things further).
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now