##// END OF EJS Templates
The refactoring of the Task system is nearly complete. Now there are...
Brian E Granger -
Show More
@@ -0,0 +1,18 b''
1 from IPython.kernel import client
2
3 mec = client.MultiEngineClient()
4
5 result = mec.map(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
7
8 m = mec.mapper(block=False)
9 pr = m.map(lambda x: 2*x, range(10))
10 print "Submitted map, got PendingResult: ", pr
11 result = pr.r
12 print "Using a mapper: ", result
13
14 @mec.parallel()
15 def f(x): return 2*x
16
17 result = f(range(10))
18 print "Using a parallel function: ", result No newline at end of file
@@ -0,0 +1,19 b''
1 from IPython.kernel import client
2
3 tc = client.TaskClient()
4
5 result = tc.map(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
7
8 m = tc.mapper(block=False, clear_after=True, clear_before=True)
9 tids = m.map(lambda x: 2*x, range(10))
10 print "Submitted tasks, got ids: ", tids
11 tc.barrier(tids)
12 result = [tc.get_task_result(tid) for tid in tids]
13 print "Using a mapper: ", result
14
15 @tc.parallel()
16 def f(x): return 2*x
17
18 result = f(range(10))
19 print "Using a parallel function: ", result No newline at end of file
@@ -27,7 +27,7 b' from IPython.kernel import codeutil'
27 from IPython.kernel.clientconnector import ClientConnector
27 from IPython.kernel.clientconnector import ClientConnector
28
28
29 # Other things that the user will need
29 # Other things that the user will need
30 from IPython.kernel.task import Task
30 from IPython.kernel.task import MapTask, StringTask
31 from IPython.kernel.error import CompositeError
31 from IPython.kernel.error import CompositeError
32
32
33 #-------------------------------------------------------------------------------
33 #-------------------------------------------------------------------------------
@@ -44,7 +44,7 b' from IPython.kernel import codeutil'
44 import IPython.kernel.magic
44 import IPython.kernel.magic
45
45
46 # Other things that the user will need
46 # Other things that the user will need
47 from IPython.kernel.task import Task
47 from IPython.kernel.task import MapTask, StringTask
48 from IPython.kernel.error import CompositeError
48 from IPython.kernel.error import CompositeError
49
49
50 #-------------------------------------------------------------------------------
50 #-------------------------------------------------------------------------------
@@ -79,7 +79,7 b" def magic_px(self,parameter_s=''):"
79 except AttributeError:
79 except AttributeError:
80 print NO_ACTIVE_CONTROLLER
80 print NO_ACTIVE_CONTROLLER
81 else:
81 else:
82 print "Executing command on Controller"
82 print "Parallel execution on engines: %s" % activeController.targets
83 result = activeController.execute(parameter_s)
83 result = activeController.execute(parameter_s)
84 return result
84 return result
85
85
@@ -4,39 +4,230 b''
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #----------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #----------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #----------------------------------------------------------------------------
17
17
18 from types import FunctionType
18 from types import FunctionType
19 from zope.interface import Interface, implements
19 from zope.interface import Interface, implements
20 from IPython.kernel.task import MapTask
21 from IPython.kernel.twistedutil import DeferredList, gatherBoth
22 from IPython.kernel.util import printer
23 from IPython.kernel.error import collect_exceptions
24
25 #----------------------------------------------------------------------------
26 # Code
27 #----------------------------------------------------------------------------
20
28
21 class IMapper(Interface):
29 class IMapper(Interface):
30 """The basic interface for a Mapper.
31
32 This defines a generic interface for mapping. The idea of this is
33 similar to that of Python's builtin `map` function, which applies a function
34 elementwise to a sequence.
35 """
36
37 def map(func, *seqs):
38 """Do map in parallel.
39
40 Equivalent to map(func, *seqs) or:
41
42 [func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...]
43
44 :Parameters:
45 func : FunctionType
46 The function to apply to the sequence
47 sequences : tuple of iterables
48 A sequence of iterables that are used for sucessive function
49 arguments. This work just like map
50 """
51
52 class IMultiEngineMapperFactory(Interface):
53 """
54 An interface for something that creates `IMapper` instances.
55 """
56
57 def mapper(dist='b', targets='all', block=True):
58 """
59 Create an `IMapper` implementer with a given set of arguments.
60
61 The `IMapper` created using a multiengine controller is
62 not load balanced.
63 """
64
65 class ITaskMapperFactory(Interface):
66 """
67 An interface for something that creates `IMapper` instances.
68 """
22
69
23 def __call__(func, *sequences):
70 def mapper(clear_before=False, clear_after=False, retries=0,
24 """Do map in parallel."""
71 recovery_task=None, depend=None, block=True):
72 """
73 Create an `IMapper` implementer with a given set of arguments.
74
75 The `IMapper` created using a task controller is load balanced.
76
77 See the documentation for `IPython.kernel.task.BaseTask` for
78 documentation on the arguments to this method.
79 """
25
80
26 class Mapper(object):
81
82 class MultiEngineMapper(object):
83 """
84 A Mapper for `IMultiEngine` implementers.
85 """
27
86
28 implements(IMapper)
87 implements(IMapper)
29
88
30 def __init__(self, multiengine, dist='b', targets='all', block=True):
89 def __init__(self, multiengine, dist='b', targets='all', block=True):
90 """
91 Create a Mapper for a multiengine.
92
93 The value of all arguments are used for all calls to `map`. This
94 class allows these arguemnts to be set for a series of map calls.
95
96 :Parameters:
97 multiengine : `IMultiEngine` implementer
98 The multiengine to use for running the map commands
99 dist : str
100 The type of decomposition to use. Only block ('b') is
101 supported currently
102 targets : (str, int, tuple of ints)
103 The engines to use in the map
104 block : boolean
105 Whether to block when the map is applied
106 """
31 self.multiengine = multiengine
107 self.multiengine = multiengine
32 self.dist = dist
108 self.dist = dist
33 self.targets = targets
109 self.targets = targets
34 self.block = block
110 self.block = block
35
36 def __call__(self, func, *sequences):
37 return self.map(func, *sequences)
38
111
39 def map(self, func, *sequences):
112 def map(self, func, *sequences):
113 """
114 Apply func to *sequences elementwise. Like Python's builtin map.
115
116 This version is not load balanced.
117 """
118 max_len = max(len(s) for s in sequences)
119 for s in sequences:
120 if len(s)!=max_len:
121 raise ValueError('all sequences must have equal length')
40 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
122 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
41 return self.multiengine._map(func, sequences, dist=self.dist,
123 return self.multiengine.raw_map(func, sequences, dist=self.dist,
42 targets=self.targets, block=self.block) No newline at end of file
124 targets=self.targets, block=self.block)
125
126 class TaskMapper(object):
127 """
128 Make an `ITaskController` look like an `IMapper`.
129
130 This class provides a load balanced version of `map`.
131 """
132
133 def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
134 recovery_task=None, depend=None, block=True):
135 """
136 Create a `IMapper` given a `TaskController` and arguments.
137
138 The additional arguments are those that are common to all types of
139 tasks and are described in the documentation for
140 `IPython.kernel.task.BaseTask`.
141
142 :Parameters:
143 task_controller : an `IBlockingTaskClient` implementer
144 The `TaskController` to use for calls to `map`
145 """
146 self.task_controller = task_controller
147 self.clear_before = clear_before
148 self.clear_after = clear_after
149 self.retries = retries
150 self.recovery_task = recovery_task
151 self.depend = depend
152 self.block = block
153
154 def map(self, func, *sequences):
155 """
156 Apply func to *sequences elementwise. Like Python's builtin map.
157
158 This version is load balanced.
159 """
160 max_len = max(len(s) for s in sequences)
161 for s in sequences:
162 if len(s)!=max_len:
163 raise ValueError('all sequences must have equal length')
164 task_args = zip(*sequences)
165 task_ids = []
166 dlist = []
167 for ta in task_args:
168 task = MapTask(func, ta, clear_before=self.clear_before,
169 clear_after=self.clear_after, retries=self.retries,
170 recovery_task=self.recovery_task, depend=self.depend)
171 dlist.append(self.task_controller.run(task))
172 dlist = gatherBoth(dlist, consumeErrors=1)
173 dlist.addCallback(collect_exceptions,'map')
174 if self.block:
175 def get_results(task_ids):
176 d = self.task_controller.barrier(task_ids)
177 d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1))
178 d.addCallback(collect_exceptions, 'map')
179 return d
180 dlist.addCallback(get_results)
181 return dlist
182
183 class SynchronousTaskMapper(object):
184 """
185 Make an `IBlockingTaskClient` look like an `IMapper`.
186
187 This class provides a load balanced version of `map`.
188 """
189
190 def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
191 recovery_task=None, depend=None, block=True):
192 """
193 Create a `IMapper` given a `IBlockingTaskClient` and arguments.
194
195 The additional arguments are those that are common to all types of
196 tasks and are described in the documentation for
197 `IPython.kernel.task.BaseTask`.
198
199 :Parameters:
200 task_controller : an `IBlockingTaskClient` implementer
201 The `TaskController` to use for calls to `map`
202 """
203 self.task_controller = task_controller
204 self.clear_before = clear_before
205 self.clear_after = clear_after
206 self.retries = retries
207 self.recovery_task = recovery_task
208 self.depend = depend
209 self.block = block
210
211 def map(self, func, *sequences):
212 """
213 Apply func to *sequences elementwise. Like Python's builtin map.
214
215 This version is load balanced.
216 """
217 max_len = max(len(s) for s in sequences)
218 for s in sequences:
219 if len(s)!=max_len:
220 raise ValueError('all sequences must have equal length')
221 task_args = zip(*sequences)
222 task_ids = []
223 for ta in task_args:
224 task = MapTask(func, ta, clear_before=self.clear_before,
225 clear_after=self.clear_after, retries=self.retries,
226 recovery_task=self.recovery_task, depend=self.depend)
227 task_ids.append(self.task_controller.run(task))
228 if self.block:
229 self.task_controller.barrier(task_ids)
230 task_results = [self.task_controller.get_task_result(tid) for tid in task_ids]
231 return task_results
232 else:
233 return task_ids No newline at end of file
@@ -659,17 +659,23 b' class IMultiEngineCoordinator(Interface):'
659 def gather(key, dist='b', targets='all'):
659 def gather(key, dist='b', targets='all'):
660 """Gather object key from targets."""
660 """Gather object key from targets."""
661
661
662 def _map(func, seq, dist='b', targets='all'):
662 def raw_map(func, seqs, dist='b', targets='all'):
663 """A parallelized version of Python's builtin map."""
663 """
664
664 A parallelized version of Python's builtin `map` function.
665 def map(func, *sequences):
665
666 """Do a basic map with default for dist and targets."""
666 This has a slightly different syntax than the builtin `map`.
667
667 This is needed because we need to have keyword arguments and thus
668 def mapper(dist='b', targets='all'):
668 can't use *args to capture all the sequences. Instead, they must
669 """Create a mapper with dist and targets."""
669 be passed in a list or tuple.
670
670
671 def parallel(dist='b', targets='all'):
671 The equivalence is:
672 """A decorator that build a parallel function."""
672
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674
675 Most users will want to use parallel functions or the `mapper`
676 and `map` methods for an API that follows that of the builtin
677 `map`.
678 """
673
679
674
680
675 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
@@ -681,17 +687,21 b' class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):'
681 def gather(key, dist='b', targets='all', block=True):
687 def gather(key, dist='b', targets='all', block=True):
682 """Gather object key from targets"""
688 """Gather object key from targets"""
683
689
684 def _map(func, sequences, dist='b', targets='all', block=True):
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
685 """Perform an actual map."""
691 """
686
692 A parallelized version of Python's builtin map.
687 def map(func, *sequences):
693
688 """Do a basic map with default for dist and targets."""
694 This has a slightly different syntax than the builtin `map`.
689
695 This is needed because we need to have keyword arguments and thus
690 def mapper(dist='b', targets='all', block=True):
696 can't use *args to capture all the sequences. Instead, they must
691 """Create a mapper with dist, targets and block."""
697 be passed in a list or tuple.
692
698
693 def parallel(dist='b', targets='all', block=True):
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
694 """A decorator that build a parallel function."""
700
701 Most users will want to use parallel functions or the `mapper`
702 and `map` methods for an API that follows that of the builtin
703 `map`.
704 """
695
705
696
706
697 #-------------------------------------------------------------------------------
707 #-------------------------------------------------------------------------------
@@ -31,7 +31,11 b' from IPython.ColorANSI import TermColors'
31 from IPython.kernel.twistedutil import blockingCallFromThread
31 from IPython.kernel.twistedutil import blockingCallFromThread
32 from IPython.kernel import error
32 from IPython.kernel import error
33 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.mapper import Mapper
34 from IPython.kernel.mapper import (
35 MultiEngineMapper,
36 IMultiEngineMapperFactory,
37 IMapper
38 )
35 from IPython.kernel import map as Map
39 from IPython.kernel import map as Map
36 from IPython.kernel import multiengine as me
40 from IPython.kernel import multiengine as me
37 from IPython.kernel.multiengine import (IFullMultiEngine,
41 from IPython.kernel.multiengine import (IFullMultiEngine,
@@ -187,10 +191,14 b' class ResultList(list):'
187
191
188 def __repr__(self):
192 def __repr__(self):
189 output = []
193 output = []
190 blue = TermColors.Blue
194 # These colored prompts were not working on Windows
191 normal = TermColors.Normal
195 if sys.platform == 'win32':
192 red = TermColors.Red
196 blue = normal = red = green = ''
193 green = TermColors.Green
197 else:
198 blue = TermColors.Blue
199 normal = TermColors.Normal
200 red = TermColors.Red
201 green = TermColors.Green
194 output.append("<Results List>\n")
202 output.append("<Results List>\n")
195 for cmd in self:
203 for cmd in self:
196 if isinstance(cmd, Failure):
204 if isinstance(cmd, Failure):
@@ -295,35 +303,7 b' class InteractiveMultiEngineClient(object):'
295 def __len__(self):
303 def __len__(self):
296 """Return the number of available engines."""
304 """Return the number of available engines."""
297 return len(self.get_ids())
305 return len(self.get_ids())
298
306
299 def parallelize(self, func, targets=None, block=None):
300 """Build a `ParallelFunction` object for functionName on engines.
301
302 The returned object will implement a parallel version of functionName
303 that takes a local sequence as its only argument and calls (in
304 parallel) functionName on each element of that sequence. The
305 `ParallelFunction` object has a `targets` attribute that controls
306 which engines the function is run on.
307
308 :Parameters:
309 targets : int, list or 'all'
310 The engine ids the action will apply to. Call `get_ids` to see
311 a list of currently available engines.
312 functionName : str
313 A Python string that names a callable defined on the engines.
314
315 :Returns: A `ParallelFunction` object.
316
317 Examples
318 ========
319
320 >>> psin = rc.parallelize('all','lambda x:sin(x)')
321 >>> psin(range(10000))
322 [0,2,4,9,25,36,...]
323 """
324 targets, block = self._findTargetsAndBlock(targets, block)
325 return ParallelFunction(func, self, targets, block)
326
327 #---------------------------------------------------------------------------
307 #---------------------------------------------------------------------------
328 # Make this a context manager for with
308 # Make this a context manager for with
329 #---------------------------------------------------------------------------
309 #---------------------------------------------------------------------------
@@ -423,7 +403,11 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
423 engine, run code on it, etc.
403 engine, run code on it, etc.
424 """
404 """
425
405
426 implements(IFullBlockingMultiEngineClient)
406 implements(
407 IFullBlockingMultiEngineClient,
408 IMultiEngineMapperFactory,
409 IMapper
410 )
427
411
428 def __init__(self, smultiengine):
412 def __init__(self, smultiengine):
429 self.smultiengine = smultiengine
413 self.smultiengine = smultiengine
@@ -796,23 +780,83 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
796 return self._blockFromThread(self.smultiengine.gather, key, dist,
780 return self._blockFromThread(self.smultiengine.gather, key, dist,
797 targets=targets, block=block)
781 targets=targets, block=block)
798
782
799 def _map(self, func, seq, dist='b', targets=None, block=None):
783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
800 """
784 """
801 A parallelized version of Python's builtin map
785 A parallelized version of Python's builtin map.
786
787 This has a slightly different syntax than the builtin `map`.
788 This is needed because we need to have keyword arguments and thus
789 can't use *args to capture all the sequences. Instead, they must
790 be passed in a list or tuple.
791
792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793
794 Most users will want to use parallel functions or the `mapper`
795 and `map` methods for an API that follows that of the builtin
796 `map`.
802 """
797 """
803 targets, block = self._findTargetsAndBlock(targets, block)
798 targets, block = self._findTargetsAndBlock(targets, block)
804 return self._blockFromThread(self.smultiengine._map, func, seq,
799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
805 dist, targets=targets, block=block)
800 dist, targets=targets, block=block)
806
801
807 def map(self, func, *sequences):
802 def map(self, func, *sequences):
808 return self.mapper()(func, *sequences)
803 """
804 A parallel version of Python's builtin `map` function.
805
806 This method applies a function to sequences of arguments. It
807 follows the same syntax as the builtin `map`.
808
809 This method creates a mapper objects by calling `self.mapper` with
810 no arguments and then uses that mapper to do the mapping. See
811 the documentation of `mapper` for more details.
812 """
813 return self.mapper().map(func, *sequences)
809
814
810 def mapper(self, dist='b', targets='all', block=None):
815 def mapper(self, dist='b', targets='all', block=None):
811 return Mapper(self, dist, targets, block)
816 """
817 Create a mapper object that has a `map` method.
818
819 This method returns an object that implements the `IMapper`
820 interface. This method is a factory that is used to control how
821 the map happens.
822
823 :Parameters:
824 dist : str
825 What decomposition to use, 'b' is the only one supported
826 currently
827 targets : str, int, sequence of ints
828 Which engines to use for the map
829 block : boolean
830 Should calls to `map` block or not
831 """
832 return MultiEngineMapper(self, dist, targets, block)
812
833
813 def parallel(self, dist='b', targets=None, block=None):
834 def parallel(self, dist='b', targets=None, block=None):
835 """
836 A decorator that turns a function into a parallel function.
837
838 This can be used as:
839
840 @parallel()
841 def f(x, y)
842 ...
843
844 f(range(10), range(10))
845
846 This causes f(0,0), f(1,1), ... to be called in parallel.
847
848 :Parameters:
849 dist : str
850 What decomposition to use, 'b' is the only one supported
851 currently
852 targets : str, int, sequence of ints
853 Which engines to use for the map
854 block : boolean
855 Should calls to `map` block or not
856 """
814 targets, block = self._findTargetsAndBlock(targets, block)
857 targets, block = self._findTargetsAndBlock(targets, block)
815 pf = ParallelFunction(self, dist=dist, targets=targets, block=block)
858 mapper = self.mapper(dist, targets, block)
859 pf = ParallelFunction(mapper)
816 return pf
860 return pf
817
861
818 #---------------------------------------------------------------------------
862 #---------------------------------------------------------------------------
@@ -30,7 +30,11 b' from IPython.kernel import error'
30 from IPython.kernel.util import printer
30 from IPython.kernel.util import printer
31 from IPython.kernel import map as Map
31 from IPython.kernel import map as Map
32 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.mapper import Mapper
33 from IPython.kernel.mapper import (
34 MultiEngineMapper,
35 IMultiEngineMapperFactory,
36 IMapper
37 )
34 from IPython.kernel.twistedutil import gatherBoth
38 from IPython.kernel.twistedutil import gatherBoth
35 from IPython.kernel.multiengine import (MultiEngine,
39 from IPython.kernel.multiengine import (MultiEngine,
36 IMultiEngine,
40 IMultiEngine,
@@ -282,7 +286,12 b' components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,'
282
286
283 class FCFullSynchronousMultiEngineClient(object):
287 class FCFullSynchronousMultiEngineClient(object):
284
288
285 implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor)
289 implements(
290 IFullSynchronousMultiEngine,
291 IBlockingClientAdaptor,
292 IMultiEngineMapperFactory,
293 IMapper
294 )
286
295
287 def __init__(self, remote_reference):
296 def __init__(self, remote_reference):
288 self.remote_reference = remote_reference
297 self.remote_reference = remote_reference
@@ -606,9 +615,20 b' class FCFullSynchronousMultiEngineClient(object):'
606 d.addCallback(do_gather)
615 d.addCallback(do_gather)
607 return d
616 return d
608
617
609 def _map(self, func, sequences, dist='b', targets='all', block=True):
618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
610 """
619 """
611 Call a callable on elements of a sequence.
620 A parallelized version of Python's builtin map.
621
622 This has a slightly different syntax than the builtin `map`.
623 This is needed because we need to have keyword arguments and thus
624 can't use *args to capture all the sequences. Instead, they must
625 be passed in a list or tuple.
626
627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628
629 Most users will want to use parallel functions or the `mapper`
630 and `map` methods for an API that follows that of the builtin
631 `map`.
612 """
632 """
613 if not isinstance(sequences, (list, tuple)):
633 if not isinstance(sequences, (list, tuple)):
614 raise TypeError('sequences must be a list or tuple')
634 raise TypeError('sequences must be a list or tuple')
@@ -634,13 +654,62 b' class FCFullSynchronousMultiEngineClient(object):'
634 return d
654 return d
635
655
636 def map(self, func, *sequences):
656 def map(self, func, *sequences):
637 return self.mapper()(func, *sequences)
657 """
658 A parallel version of Python's builtin `map` function.
659
660 This method applies a function to sequences of arguments. It
661 follows the same syntax as the builtin `map`.
662
663 This method creates a mapper objects by calling `self.mapper` with
664 no arguments and then uses that mapper to do the mapping. See
665 the documentation of `mapper` for more details.
666 """
667 return self.mapper().map(func, *sequences)
638
668
639 def mapper(self, dist='b', targets='all', block=True):
669 def mapper(self, dist='b', targets='all', block=True):
640 return Mapper(self, dist, targets, block)
670 """
671 Create a mapper object that has a `map` method.
672
673 This method returns an object that implements the `IMapper`
674 interface. This method is a factory that is used to control how
675 the map happens.
676
677 :Parameters:
678 dist : str
679 What decomposition to use, 'b' is the only one supported
680 currently
681 targets : str, int, sequence of ints
682 Which engines to use for the map
683 block : boolean
684 Should calls to `map` block or not
685 """
686 return MultiEngineMapper(self, dist, targets, block)
641
687
642 def parallel(self, dist='b', targets='all', block=True):
688 def parallel(self, dist='b', targets='all', block=True):
643 pf = ParallelFunction(self, dist=dist, targets=targets, block=True)
689 """
690 A decorator that turns a function into a parallel function.
691
692 This can be used as:
693
694 @parallel()
695 def f(x, y)
696 ...
697
698 f(range(10), range(10))
699
700 This causes f(0,0), f(1,1), ... to be called in parallel.
701
702 :Parameters:
703 dist : str
704 What decomposition to use, 'b' is the only one supported
705 currently
706 targets : str, int, sequence of ints
707 Which engines to use for the map
708 block : boolean
709 Should calls to `map` block or not
710 """
711 mapper = self.mapper(dist, targets, block)
712 pf = ParallelFunction(mapper)
644 return pf
713 return pf
645
714
646 #---------------------------------------------------------------------------
715 #---------------------------------------------------------------------------
@@ -19,29 +19,89 b' from types import FunctionType'
19 from zope.interface import Interface, implements
19 from zope.interface import Interface, implements
20
20
21
21
22 class IMultiEngineParallelDecorator(Interface):
23 """A decorator that creates a parallel function."""
24
25 def parallel(dist='b', targets=None, block=None):
26 """
27 A decorator that turns a function into a parallel function.
28
29 This can be used as:
30
31 @parallel()
32 def f(x, y)
33 ...
34
35 f(range(10), range(10))
36
37 This causes f(0,0), f(1,1), ... to be called in parallel.
38
39 :Parameters:
40 dist : str
41 What decomposition to use, 'b' is the only one supported
42 currently
43 targets : str, int, sequence of ints
44 Which engines to use for the map
45 block : boolean
46 Should calls to `map` block or not
47 """
48
49 class ITaskParallelDecorator(Interface):
50 """A decorator that creates a parallel function."""
51
52 def parallel(clear_before=False, clear_after=False, retries=0,
53 recovery_task=None, depend=None, block=True):
54 """
55 A decorator that turns a function into a parallel function.
56
57 This can be used as:
58
59 @parallel()
60 def f(x, y)
61 ...
62
63 f(range(10), range(10))
64
65 This causes f(0,0), f(1,1), ... to be called in parallel.
66
67 See the documentation for `IPython.kernel.task.BaseTask` for
68 documentation on the arguments to this method.
69 """
70
71 class IParallelFunction(Interface):
72 pass
73
22 class ParallelFunction(object):
74 class ParallelFunction(object):
23 """
75 """
24 A decorator for building parallel functions.
76 The implementation of a parallel function.
77
78 A parallel function is similar to Python's map function:
79
80 map(func, *sequences) -> pfunc(*sequences)
81
82 Parallel functions should be created by using the @parallel decorator.
25 """
83 """
26
84
27 def __init__(self, multiengine, dist='b', targets='all', block=True):
85 implements(IParallelFunction)
86
87 def __init__(self, mapper):
28 """
88 """
29 Create a `ParallelFunction decorator`.
89 Create a parallel function from an `IMapper`.
90
91 :Parameters:
92 mapper : an `IMapper` implementer.
93 The mapper to use for the parallel function
30 """
94 """
31 self.multiengine = multiengine
95 self.mapper = mapper
32 self.dist = dist
33 self.targets = targets
34 self.block = block
35
96
36 def __call__(self, func):
97 def __call__(self, func):
37 """
98 """
38 Decorate the function to make it run in parallel.
99 Decorate a function to make it run in parallel.
39 """
100 """
40 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
101 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
41 self.func = func
102 self.func = func
42 def call_function(*sequences):
103 def call_function(*sequences):
43 return self.multiengine._map(self.func, sequences, dist=self.dist,
104 return self.mapper.map(self.func, *sequences)
44 targets=self.targets, block=self.block)
45 return call_function
105 return call_function
46
106
47 No newline at end of file
107
This diff has been collapsed as it changes many lines, (693 lines changed) Show them Hide them
@@ -5,16 +5,16 b''
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import copy, time
19 import copy, time
20 from types import FunctionType
20 from types import FunctionType
@@ -23,108 +23,386 b' import zope.interface as zi, string'
23 from twisted.internet import defer, reactor
23 from twisted.internet import defer, reactor
24 from twisted.python import components, log, failure
24 from twisted.python import components, log, failure
25
25
26 # from IPython.genutils import time
26 from IPython.kernel.util import printer
27
28 from IPython.kernel import engineservice as es, error
27 from IPython.kernel import engineservice as es, error
29 from IPython.kernel import controllerservice as cs
28 from IPython.kernel import controllerservice as cs
30 from IPython.kernel.twistedutil import gatherBoth, DeferredList
29 from IPython.kernel.twistedutil import gatherBoth, DeferredList
31
30
32 from IPython.kernel.pickleutil import can,uncan, CannedFunction
31 from IPython.kernel.pickleutil import can, uncan, CannedFunction
33
34 def can_task(task):
35 t = copy.copy(task)
36 t.depend = can(t.depend)
37 t.expression = can(t.expression)
38 if t.recovery_task:
39 t.recovery_task = can_task(t.recovery_task)
40 return t
41
32
42 def uncan_task(task):
33 #-----------------------------------------------------------------------------
43 t = copy.copy(task)
34 # Definition of the Task objects
44 t.depend = uncan(t.depend)
35 #-----------------------------------------------------------------------------
45 t.expression = uncan(t.expression)
46 if t.recovery_task and t.recovery_task is not task:
47 t.recovery_task = uncan_task(t.recovery_task)
48 return t
49
36
50 time_format = '%Y/%m/%d %H:%M:%S'
37 time_format = '%Y/%m/%d %H:%M:%S'
51
38
52 class Task(object):
39 class ITask(zi.Interface):
53 """Our representation of a task for the `TaskController` interface.
40 """
54
41 This interface provides a generic definition of what constitutes a task.
55 The user should create instances of this class to represent a task that
42
56 needs to be done.
43 There are two sides to a task. First a task needs to take input from
57
44 a user to determine what work is performed by the task. Second, the
58 :Parameters:
45 task needs to have the logic that knows how to turn that information
59 expression : str
46 info specific calls to a worker, through the `IQueuedEngine` interface.
60 A str that is valid python code that is the task.
61 pull : str or list of str
62 The names of objects to be pulled as results. If not specified,
63 will return {'result', None}
64 push : dict
65 A dict of objects to be pushed into the engines namespace before
66 execution of the expression.
67 clear_before : boolean
68 Should the engine's namespace be cleared before the task is run.
69 Default=False.
70 clear_after : boolean
71 Should the engine's namespace be cleared after the task is run.
72 Default=False.
73 retries : int
74 The number of times to resumbit the task if it fails. Default=0.
75 recovery_task : Task
76 This is the Task to be run when the task has exhausted its retries
77 Default=None.
78 depend : bool function(properties)
79 This is the dependency function for the Task, which determines
80 whether a task can be run on a Worker. `depend` is called with
81 one argument, the worker's properties dict, and should return
82 True if the worker meets the dependencies or False if it does
83 not.
84 Default=None - run on any worker
85 options : dict
86 Any other keyword options for more elaborate uses of tasks
87
88 Examples
89 --------
90
47
91 >>> t = Task('dostuff(args)')
48 Many method in this class get two things passed to them: a Deferred
92 >>> t = Task('a=5', pull='a')
49 and an IQueuedEngine implementer. Such methods should register callbacks
93 >>> t = Task('a=5\nb=4', pull=['a','b'])
50 on the Deferred that use the IQueuedEngine to accomplish something. See
94 >>> t = Task('os.kill(os.getpid(),9)', retries=100) # this is a bad idea
51 the existing task objects for examples.
95 # A dependency case:
96 >>> def hasMPI(props):
97 ... return props.get('mpi') is not None
98 >>> t = Task('mpi.send(blah,blah)', depend = hasMPI)
99 """
52 """
100
53
101 def __init__(self, expression, args=None, kwargs=None, pull=None, push=None,
54 zi.Attribute('retries','How many times to retry the task')
102 clear_before=False, clear_after=False, retries=0,
55 zi.Attribute('recovery_task','A task to try if the initial one fails')
103 recovery_task=None, depend=None, **options):
56 zi.Attribute('taskid','the id of the task')
104 self.expression = expression
57
58 def start_time(result):
59 """
60 Do anything needed to start the timing of the task.
61
62 Must simply return the result after starting the timers.
63 """
64
65 def stop_time(result):
66 """
67 Do anything needed to stop the timing of the task.
68
69 Must simply return the result after stopping the timers. This
70 method will usually set attributes that are used by `process_result`
71 in building result of the task.
72 """
73
74 def pre_task(d, queued_engine):
75 """Do something with the queued_engine before the task is run.
76
77 This method should simply add callbacks to the input Deferred
78 that do something with the `queued_engine` before the task is run.
79
80 :Parameters:
81 d : Deferred
82 The deferred that actions should be attached to
83 queued_engine : IQueuedEngine implementer
84 The worker that has been allocated to perform the task
85 """
86
87 def post_task(d, queued_engine):
88 """Do something with the queued_engine after the task is run.
89
90 This method should simply add callbacks to the input Deferred
91 that do something with the `queued_engine` before the task is run.
92
93 :Parameters:
94 d : Deferred
95 The deferred that actions should be attached to
96 queued_engine : IQueuedEngine implementer
97 The worker that has been allocated to perform the task
98 """
99
100 def submit_task(d, queued_engine):
101 """Submit a task using the `queued_engine` we have been allocated.
102
103 When a task is ready to run, this method is called. This method
104 must take the internal information of the task and make suitable
105 calls on the queued_engine to have the actual work done.
106
107 This method should simply add callbacks to the input Deferred
108 that do something with the `queued_engine` before the task is run.
109
110 :Parameters:
111 d : Deferred
112 The deferred that actions should be attached to
113 queued_engine : IQueuedEngine implementer
114 The worker that has been allocated to perform the task
115 """
116
117 def process_result(d, result, engine_id):
118 """Take a raw task result.
119
120 Objects that implement `ITask` can choose how the result of running
121 the task is presented. This method takes the raw result and
122 does this logic. Two example are the `MapTask` which simply returns
123 the raw result or a `Failure` object and the `StringTask` which
124 returns a `TaskResult` object.
125
126 :Parameters:
127 d : Deferred
128 The deferred that actions should be attached to
129 result : object
130 The raw task result that needs to be wrapped
131 engine_id : int
132 The id of the engine that did the task
133
134 :Returns:
135 The result, as a tuple of the form: (success, result).
136 Here, success is a boolean indicating if the task
137 succeeded or failed and result is the result.
138 """
139
140 def check_depend(properties):
141 """Check properties to see if the task should be run.
142
143 :Parameters:
144 properties : dict
145 A dictionary of properties that an engine has set
146
147 :Returns:
148 True if the task should be run, False otherwise
149 """
150
151 def can_task(self):
152 """Serialize (can) any functions in the task for pickling.
153
154 Subclasses must override this method and make sure that all
155 functions in the task are canned by calling `can` on the
156 function.
157 """
158
159 def uncan_task(self):
160 """Unserialize (uncan) any canned function in the task."""
161
162 class BaseTask(object):
163 """
164 Common fuctionality for all objects implementing `ITask`.
165 """
166
167 zi.implements(ITask)
168
169 def __init__(self, clear_before=False, clear_after=False, retries=0,
170 recovery_task=None, depend=None):
171 """
172 Make a generic task.
173
174 :Parameters:
175 clear_before : boolean
176 Should the engines namespace be cleared before the task
177 is run
178 clear_after : boolean
179 Should the engines namespace be clear after the task is run
180 retries : int
181 The number of times a task should be retries upon failure
182 recovery_task : any task object
183 If a task fails and it has a recovery_task, that is run
184 upon a retry
185 depend : FunctionType
186 A function that is called to test for properties. This function
187 must take one argument, the properties dict and return a boolean
188 """
189 self.clear_before = clear_before
190 self.clear_after = clear_after
191 self.retries = retries
192 self.recovery_task = recovery_task
193 self.depend = depend
194 self.taskid = None
195
196 def start_time(self, result):
197 """
198 Start the basic timers.
199 """
200 self.start = time.time()
201 self.start_struct = time.localtime()
202 return result
203
204 def stop_time(self, result):
205 """
206 Stop the basic timers.
207 """
208 self.stop = time.time()
209 self.stop_struct = time.localtime()
210 self.duration = self.stop - self.start
211 self.submitted = time.strftime(time_format, self.start_struct)
212 self.completed = time.strftime(time_format)
213 return result
214
215 def pre_task(self, d, queued_engine):
216 """
217 Clear the engine before running the task if clear_before is set.
218 """
219 if self.clear_before:
220 d.addCallback(lambda r: queued_engine.reset())
221
222 def post_task(self, d, queued_engine):
223 """
224 Clear the engine after running the task if clear_after is set.
225 """
226 def reseter(result):
227 queued_engine.reset()
228 return result
229 if self.clear_after:
230 d.addBoth(reseter)
231
232 def submit_task(self, d, queued_engine):
233 raise NotImplementedError('submit_task must be implemented in a subclass')
234
235 def process_result(self, result, engine_id):
236 """
237 Process a task result.
238
239 This is the default `process_result` that just returns the raw
240 result or a `Failure`.
241 """
242 if isinstance(result, failure.Failure):
243 return (False, result)
244 else:
245 return (True, result)
246
247 def check_depend(self, properties):
248 """
249 Calls self.depend(properties) to see if a task should be run.
250 """
251 if self.depend is not None:
252 return self.depend(properties)
253 else:
254 return True
255
256 def can_task(self):
257 self.depend = can(self.depend)
258 if isinstance(self.recovery_task, BaseTask):
259 self.recovery_task.can_task()
260
261 def uncan_task(self):
262 self.depend = uncan(self.depend)
263 if isinstance(self.recovery_task, BaseTask):
264 self.recovery_task.uncan_task()
265
266 class MapTask(BaseTask):
267 """
268 A task that consists of a function and arguments.
269 """
270
271 zi.implements(ITask)
272
273 def __init__(self, function, args=None, kwargs=None, clear_before=False,
274 clear_after=False, retries=0, recovery_task=None, depend=None):
275 """
276 Create a task based on a function, args and kwargs.
277
278 This is a simple type of task that consists of calling:
279 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
280
281 The return value of the function, or a `Failure` wrapping an
282 exception is the task result for this type of task.
283 """
284 BaseTask.__init__(self, clear_before, clear_after, retries,
285 recovery_task, depend)
286 if not isinstance(function, FunctionType):
287 raise TypeError('a task function must be a FunctionType')
288 self.function = function
105 if args is None:
289 if args is None:
106 self.args = ()
290 self.args = ()
107 else:
291 else:
108 self.args = args
292 self.args = args
293 if not isinstance(self.args, (list, tuple)):
294 raise TypeError('a task args must be a list or tuple')
109 if kwargs is None:
295 if kwargs is None:
110 self.kwargs = {}
296 self.kwargs = {}
111 else:
297 else:
112 self.kwargs = kwargs
298 self.kwargs = kwargs
113 if isinstance(pull, str):
299 if not isinstance(self.kwargs, dict):
114 self.pull = [pull]
300 raise TypeError('a task kwargs must be a dict')
115 else:
301
302 def submit_task(self, d, queued_engine):
303 d.addCallback(lambda r: queued_engine.push_function(
304 dict(_ipython_task_function=self.function))
305 )
306 d.addCallback(lambda r: queued_engine.push(
307 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
308 )
309 d.addCallback(lambda r: queued_engine.execute(
310 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
311 )
312 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
313
314 def can_task(self):
315 self.function = can(self.function)
316 BaseTask.can_task(self)
317
318 def uncan_task(self):
319 self.function = uncan(self.function)
320 BaseTask.uncan_task(self)
321
322
323 class StringTask(BaseTask):
324 """
325 A task that consists of a string of Python code to run.
326 """
327
328 def __init__(self, expression, pull=None, push=None,
329 clear_before=False, clear_after=False, retries=0,
330 recovery_task=None, depend=None):
331 """
332 Create a task based on a Python expression and variables
333
334 This type of task lets you push a set of variables to the engines
335 namespace, run a Python string in that namespace and then bring back
336 a different set of Python variables as the result.
337
338 Because this type of task can return many results (through the
339 `pull` keyword argument) it returns a special `TaskResult` object
340 that wraps the pulled variables, statistics about the run and
341 any exceptions raised.
342 """
343 if not isinstance(expression, str):
344 raise TypeError('a task expression must be a string')
345 self.expression = expression
346
347 if pull==None:
348 self.pull = ()
349 elif isinstance(pull, str):
350 self.pull = (pull,)
351 elif isinstance(pull, (list, tuple)):
116 self.pull = pull
352 self.pull = pull
117 self.push = push
353 else:
118 self.clear_before = clear_before
354 raise TypeError('pull must be str or a sequence of strs')
119 self.clear_after = clear_after
355
120 self.retries=retries
356 if push==None:
121 self.recovery_task = recovery_task
357 self.push = {}
122 self.depend = depend
358 elif isinstance(push, dict):
123 self.options = options
359 self.push = push
124 self.taskid = None
360 else:
361 raise TypeError('push must be a dict')
362
363 BaseTask.__init__(self, clear_before, clear_after, retries,
364 recovery_task, depend)
365
366 def submit_task(self, d, queued_engine):
367 if self.push is not None:
368 d.addCallback(lambda r: queued_engine.push(self.push))
369
370 d.addCallback(lambda r: queued_engine.execute(self.expression))
371
372 if self.pull is not None:
373 d.addCallback(lambda r: queued_engine.pull(self.pull))
374 else:
375 d.addCallback(lambda r: None)
376
377 def process_result(self, result, engine_id):
378 if isinstance(result, failure.Failure):
379 tr = TaskResult(result, engine_id)
380 else:
381 if self.pull is None:
382 resultDict = {}
383 elif len(self.pull) == 1:
384 resultDict = {self.pull[0]:result}
385 else:
386 resultDict = dict(zip(self.pull, result))
387 tr = TaskResult(resultDict, engine_id)
388 # Assign task attributes
389 tr.submitted = self.submitted
390 tr.completed = self.completed
391 tr.duration = self.duration
392 if hasattr(self,'taskid'):
393 tr.taskid = self.taskid
394 else:
395 tr.taskid = None
396 if isinstance(result, failure.Failure):
397 return (False, tr)
398 else:
399 return (True, tr)
125
400
126 class ResultNS:
401 class ResultNS(object):
127 """The result namespace object for use in TaskResult objects as tr.ns.
402 """
403 A dict like object for holding the results of a task.
404
405 The result namespace object for use in `TaskResult` objects as tr.ns.
128 It builds an object from a dictionary, such that it has attributes
406 It builds an object from a dictionary, such that it has attributes
129 according to the key,value pairs of the dictionary.
407 according to the key,value pairs of the dictionary.
130
408
@@ -162,7 +440,7 b' class ResultNS:'
162
440
163 class TaskResult(object):
441 class TaskResult(object):
164 """
442 """
165 An object for returning task results.
443 An object for returning task results for certain types of tasks.
166
444
167 This object encapsulates the results of a task. On task
445 This object encapsulates the results of a task. On task
168 success it will have a keys attribute that will have a list
446 success it will have a keys attribute that will have a list
@@ -172,21 +450,21 b' class TaskResult(object):'
172
450
173 In task failure, keys will be empty, but failure will contain
451 In task failure, keys will be empty, but failure will contain
174 the failure object that encapsulates the remote exception.
452 the failure object that encapsulates the remote exception.
175 One can also simply call the raiseException() method of
453 One can also simply call the `raise_exception` method of
176 this class to re-raise any remote exception in the local
454 this class to re-raise any remote exception in the local
177 session.
455 session.
178
456
179 The TaskResult has a .ns member, which is a property for access
457 The `TaskResult` has a `.ns` member, which is a property for access
180 to the results. If the Task had pull=['a', 'b'], then the
458 to the results. If the Task had pull=['a', 'b'], then the
181 Task Result will have attributes tr.ns.a, tr.ns.b for those values.
459 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
182 Accessing tr.ns will raise the remote failure if the task failed.
460 Accessing `tr.ns` will raise the remote failure if the task failed.
183
461
184 The engineid attribute should have the engineid of the engine
462 The `engineid` attribute should have the `engineid` of the engine
185 that ran the task. But, because engines can come and go in
463 that ran the task. But, because engines can come and go,
186 the ipython task system, the engineid may not continue to be
464 the `engineid` may not continue to be
187 valid or accurate.
465 valid or accurate.
188
466
189 The taskid attribute simply gives the taskid that the task
467 The `taskid` attribute simply gives the `taskid` that the task
190 is tracked under.
468 is tracked under.
191 """
469 """
192 taskid = None
470 taskid = None
@@ -224,15 +502,19 b' class TaskResult(object):'
224
502
225 def __getitem__(self, key):
503 def __getitem__(self, key):
226 if self.failure is not None:
504 if self.failure is not None:
227 self.raiseException()
505 self.raise_exception()
228 return self.results[key]
506 return self.results[key]
229
507
230 def raiseException(self):
508 def raise_exception(self):
231 """Re-raise any remote exceptions in the local python session."""
509 """Re-raise any remote exceptions in the local python session."""
232 if self.failure is not None:
510 if self.failure is not None:
233 self.failure.raiseException()
511 self.failure.raiseException()
234
512
235
513
514 #-----------------------------------------------------------------------------
515 # The controller side of things
516 #-----------------------------------------------------------------------------
517
236 class IWorker(zi.Interface):
518 class IWorker(zi.Interface):
237 """The Basic Worker Interface.
519 """The Basic Worker Interface.
238
520
@@ -247,12 +529,15 b' class IWorker(zi.Interface):'
247 :Parameters:
529 :Parameters:
248 task : a `Task` object
530 task : a `Task` object
249
531
250 :Returns: `Deferred` to a `TaskResult` object.
532 :Returns: `Deferred` to a tuple of (success, result) where
533 success if a boolean that signifies success or failure
534 and result is the task result.
251 """
535 """
252
536
253
537
254 class WorkerFromQueuedEngine(object):
538 class WorkerFromQueuedEngine(object):
255 """Adapt an `IQueuedEngine` to an `IWorker` object"""
539 """Adapt an `IQueuedEngine` to an `IWorker` object"""
540
256 zi.implements(IWorker)
541 zi.implements(IWorker)
257
542
258 def __init__(self, qe):
543 def __init__(self, qe):
@@ -267,74 +552,27 b' class WorkerFromQueuedEngine(object):'
267 def run(self, task):
552 def run(self, task):
268 """Run task in worker's namespace.
553 """Run task in worker's namespace.
269
554
555 This takes a task and calls methods on the task that actually
556 cause `self.queuedEngine` to do the task. See the methods of
557 `ITask` for more information about how these methods are called.
558
270 :Parameters:
559 :Parameters:
271 task : a `Task` object
560 task : a `Task` object
272
561
273 :Returns: `Deferred` to a `TaskResult` object.
562 :Returns: `Deferred` to a tuple of (success, result) where
563 success if a boolean that signifies success or failure
564 and result is the task result.
274 """
565 """
275 if task.clear_before:
566 d = defer.succeed(None)
276 d = self.queuedEngine.reset()
567 d.addCallback(task.start_time)
277 else:
568 task.pre_task(d, self.queuedEngine)
278 d = defer.succeed(None)
569 task.submit_task(d, self.queuedEngine)
279
570 task.post_task(d, self.queuedEngine)
280 if isinstance(task.expression, FunctionType):
571 d.addBoth(task.stop_time)
281 d.addCallback(lambda r: self.queuedEngine.push_function(
572 d.addBoth(task.process_result, self.queuedEngine.id)
282 dict(_ipython_task_function=task.expression))
573 # At this point, there will be (success, result) coming down the line
283 )
574 return d
284 d.addCallback(lambda r: self.queuedEngine.push(
575
285 dict(_ipython_task_args=task.args,_ipython_task_kwargs=task.kwargs))
286 )
287 d.addCallback(lambda r: self.queuedEngine.execute(
288 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
289 )
290 d.addCallback(lambda r: self.queuedEngine.pull('_ipython_task_result'))
291 elif isinstance(task.expression, str):
292 if task.push is not None:
293 d.addCallback(lambda r: self.queuedEngine.push(task.push))
294
295 d.addCallback(lambda r: self.queuedEngine.execute(task.expression))
296
297 if task.pull is not None:
298 d.addCallback(lambda r: self.queuedEngine.pull(task.pull))
299 else:
300 d.addCallback(lambda r: None)
301 else:
302 raise TypeError("task expression must be a str or function")
303
304 def reseter(result):
305 self.queuedEngine.reset()
306 return result
307
308 if task.clear_after:
309 d.addBoth(reseter)
310
311 if isinstance(task.expression, FunctionType):
312 return d.addBoth(self._zipResults, None, time.time(), time.localtime())
313 else:
314 return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime())
315
316 def _zipResults(self, result, names, start, start_struct):
317 """Callback for construting the TaskResult object."""
318 if isinstance(result, failure.Failure):
319 tr = TaskResult(result, self.queuedEngine.id)
320 else:
321 if names is None:
322 resultDict = {}
323 elif len(names) == 1:
324 resultDict = {names[0]:result}
325 else:
326 resultDict = dict(zip(names, result))
327 tr = TaskResult(resultDict, self.queuedEngine.id)
328 if names is None:
329 tr.result = result
330 else:
331 tr.result = None
332 # the time info
333 tr.submitted = time.strftime(time_format, start_struct)
334 tr.completed = time.strftime(time_format)
335 tr.duration = time.time()-start
336 return tr
337
338
576
339 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
577 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
340
578
@@ -350,14 +588,14 b' class IScheduler(zi.Interface):'
350 """Add a task to the queue of the Scheduler.
588 """Add a task to the queue of the Scheduler.
351
589
352 :Parameters:
590 :Parameters:
353 task : a `Task` object
591 task : an `ITask` implementer
354 The task to be queued.
592 The task to be queued.
355 flags : dict
593 flags : dict
356 General keywords for more sophisticated scheduling
594 General keywords for more sophisticated scheduling
357 """
595 """
358
596
359 def pop_task(id=None):
597 def pop_task(id=None):
360 """Pops a Task object.
598 """Pops a task object from the queue.
361
599
362 This gets the next task to be run. If no `id` is requested, the highest priority
600 This gets the next task to be run. If no `id` is requested, the highest priority
363 task is returned.
601 task is returned.
@@ -367,7 +605,7 b' class IScheduler(zi.Interface):'
367 The id of the task to be popped. The default (None) is to return
605 The id of the task to be popped. The default (None) is to return
368 the highest priority task.
606 the highest priority task.
369
607
370 :Returns: a `Task` object
608 :Returns: an `ITask` implementer
371
609
372 :Exceptions:
610 :Exceptions:
373 IndexError : raised if no taskid in queue
611 IndexError : raised if no taskid in queue
@@ -377,8 +615,9 b' class IScheduler(zi.Interface):'
377 """Add a worker to the worker queue.
615 """Add a worker to the worker queue.
378
616
379 :Parameters:
617 :Parameters:
380 worker : an IWorker implementing object
618 worker : an `IWorker` implementer
381 flags : General keywords for more sophisticated scheduling
619 flags : dict
620 General keywords for more sophisticated scheduling
382 """
621 """
383
622
384 def pop_worker(id=None):
623 def pop_worker(id=None):
@@ -401,15 +640,15 b' class IScheduler(zi.Interface):'
401 """Returns True if there is something to do, False otherwise"""
640 """Returns True if there is something to do, False otherwise"""
402
641
403 def schedule():
642 def schedule():
404 """Returns a tuple of the worker and task pair for the next
643 """Returns (worker,task) pair for the next task to be run."""
405 task to be run.
406 """
407
644
408
645
409 class FIFOScheduler(object):
646 class FIFOScheduler(object):
410 """A basic First-In-First-Out (Queue) Scheduler.
647 """
411 This is the default Scheduler for the TaskController.
648 A basic First-In-First-Out (Queue) Scheduler.
412 See the docstrings for IScheduler for interface details.
649
650 This is the default Scheduler for the `TaskController`.
651 See the docstrings for `IScheduler` for interface details.
413 """
652 """
414
653
415 zi.implements(IScheduler)
654 zi.implements(IScheduler)
@@ -466,7 +705,9 b' class FIFOScheduler(object):'
466 for t in self.tasks:
705 for t in self.tasks:
467 for w in self.workers:
706 for w in self.workers:
468 try:# do not allow exceptions to break this
707 try:# do not allow exceptions to break this
469 cando = t.depend is None or t.depend(w.properties)
708 # Allow the task to check itself using its
709 # check_depend method.
710 cando = t.check_depend(w.properties)
470 except:
711 except:
471 cando = False
712 cando = False
472 if cando:
713 if cando:
@@ -476,9 +717,12 b' class FIFOScheduler(object):'
476
717
477
718
478 class LIFOScheduler(FIFOScheduler):
719 class LIFOScheduler(FIFOScheduler):
479 """A Last-In-First-Out (Stack) Scheduler. This scheduler should naively
720 """
480 reward fast engines by giving them more jobs. This risks starvation, but
721 A Last-In-First-Out (Stack) Scheduler.
481 only in cases with low load, where starvation does not really matter.
722
723 This scheduler should naively reward fast engines by giving
724 them more jobs. This risks starvation, but only in cases with
725 low load, where starvation does not really matter.
482 """
726 """
483
727
484 def add_task(self, task, **flags):
728 def add_task(self, task, **flags):
@@ -493,13 +737,15 b' class LIFOScheduler(FIFOScheduler):'
493
737
494
738
495 class ITaskController(cs.IControllerBase):
739 class ITaskController(cs.IControllerBase):
496 """The Task based interface to a `ControllerService` object
740 """
741 The Task based interface to a `ControllerService` object
497
742
498 This adapts a `ControllerService` to the ITaskController interface.
743 This adapts a `ControllerService` to the ITaskController interface.
499 """
744 """
500
745
501 def run(task):
746 def run(task):
502 """Run a task.
747 """
748 Run a task.
503
749
504 :Parameters:
750 :Parameters:
505 task : an IPython `Task` object
751 task : an IPython `Task` object
@@ -508,13 +754,14 b' class ITaskController(cs.IControllerBase):'
508 """
754 """
509
755
510 def get_task_result(taskid, block=False):
756 def get_task_result(taskid, block=False):
511 """Get the result of a task by its ID.
757 """
758 Get the result of a task by its ID.
512
759
513 :Parameters:
760 :Parameters:
514 taskid : int
761 taskid : int
515 the id of the task whose result is requested
762 the id of the task whose result is requested
516
763
517 :Returns: `Deferred` to (taskid, actualResult) if the task is done, and None
764 :Returns: `Deferred` to the task result if the task is done, and None
518 if not.
765 if not.
519
766
520 :Exceptions:
767 :Exceptions:
@@ -539,23 +786,35 b' class ITaskController(cs.IControllerBase):'
539 """
786 """
540
787
541 def barrier(taskids):
788 def barrier(taskids):
542 """Block until the list of taskids are completed.
789 """
790 Block until the list of taskids are completed.
543
791
544 Returns None on success.
792 Returns None on success.
545 """
793 """
546
794
547 def spin():
795 def spin():
548 """touch the scheduler, to resume scheduling without submitting
796 """
549 a task.
797 Touch the scheduler, to resume scheduling without submitting a task.
550 """
798 """
551
799
552 def queue_status(self, verbose=False):
800 def queue_status(verbose=False):
553 """Get a dictionary with the current state of the task queue.
801 """
802 Get a dictionary with the current state of the task queue.
554
803
555 If verbose is True, then return lists of taskids, otherwise,
804 If verbose is True, then return lists of taskids, otherwise,
556 return the number of tasks with each status.
805 return the number of tasks with each status.
557 """
806 """
558
807
808 def clear():
809 """
810 Clear all previously run tasks from the task controller.
811
812 This is needed because the task controller keep all task results
813 in memory. This can be a problem is there are many completed
814 tasks. Users should call this periodically to clean out these
815 cached task results.
816 """
817
559
818
560 class TaskController(cs.ControllerAdapterBase):
819 class TaskController(cs.ControllerAdapterBase):
561 """The Task based interface to a Controller object.
820 """The Task based interface to a Controller object.
@@ -592,7 +851,7 b' class TaskController(cs.ControllerAdapterBase):'
592 def registerWorker(self, id):
851 def registerWorker(self, id):
593 """Called by controller.register_engine."""
852 """Called by controller.register_engine."""
594 if self.workers.get(id):
853 if self.workers.get(id):
595 raise "We already have one! This should not happen."
854 raise ValueError("worker with id %s already exists. This should not happen." % id)
596 self.workers[id] = IWorker(self.controller.engines[id])
855 self.workers[id] = IWorker(self.controller.engines[id])
597 self.workers[id].workerid = id
856 self.workers[id].workerid = id
598 if not self.pendingTasks.has_key(id):# if not working
857 if not self.pendingTasks.has_key(id):# if not working
@@ -617,21 +876,25 b' class TaskController(cs.ControllerAdapterBase):'
617 #---------------------------------------------------------------------------
876 #---------------------------------------------------------------------------
618
877
619 def run(self, task):
878 def run(self, task):
620 """Run a task and return `Deferred` to its taskid."""
879 """
880 Run a task and return `Deferred` to its taskid.
881 """
621 task.taskid = self.taskid
882 task.taskid = self.taskid
622 task.start = time.localtime()
883 task.start = time.localtime()
623 self.taskid += 1
884 self.taskid += 1
624 d = defer.Deferred()
885 d = defer.Deferred()
625 self.scheduler.add_task(task)
886 self.scheduler.add_task(task)
626 # log.msg('Queuing task: %i' % task.taskid)
887 log.msg('Queuing task: %i' % task.taskid)
627
888
628 self.deferredResults[task.taskid] = []
889 self.deferredResults[task.taskid] = []
629 self.distributeTasks()
890 self.distributeTasks()
630 return defer.succeed(task.taskid)
891 return defer.succeed(task.taskid)
631
892
632 def get_task_result(self, taskid, block=False):
893 def get_task_result(self, taskid, block=False):
633 """Returns a `Deferred` to a TaskResult tuple or None."""
894 """
634 # log.msg("Getting task result: %i" % taskid)
895 Returns a `Deferred` to the task result, or None.
896 """
897 log.msg("Getting task result: %i" % taskid)
635 if self.finishedResults.has_key(taskid):
898 if self.finishedResults.has_key(taskid):
636 tr = self.finishedResults[taskid]
899 tr = self.finishedResults[taskid]
637 return defer.succeed(tr)
900 return defer.succeed(tr)
@@ -646,7 +909,9 b' class TaskController(cs.ControllerAdapterBase):'
646 return defer.fail(IndexError("task ID not registered: %r" % taskid))
909 return defer.fail(IndexError("task ID not registered: %r" % taskid))
647
910
648 def abort(self, taskid):
911 def abort(self, taskid):
649 """Remove a task from the queue if it has not been run already."""
912 """
913 Remove a task from the queue if it has not been run already.
914 """
650 if not isinstance(taskid, int):
915 if not isinstance(taskid, int):
651 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
916 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
652 try:
917 try:
@@ -705,8 +970,10 b' class TaskController(cs.ControllerAdapterBase):'
705 #---------------------------------------------------------------------------
970 #---------------------------------------------------------------------------
706
971
707 def _doAbort(self, taskid):
972 def _doAbort(self, taskid):
708 """Helper function for aborting a pending task."""
973 """
709 # log.msg("Task aborted: %i" % taskid)
974 Helper function for aborting a pending task.
975 """
976 log.msg("Task aborted: %i" % taskid)
710 result = failure.Failure(error.TaskAborted())
977 result = failure.Failure(error.TaskAborted())
711 self._finishTask(taskid, result)
978 self._finishTask(taskid, result)
712 if taskid in self.abortPending:
979 if taskid in self.abortPending:
@@ -714,14 +981,16 b' class TaskController(cs.ControllerAdapterBase):'
714
981
715 def _finishTask(self, taskid, result):
982 def _finishTask(self, taskid, result):
716 dlist = self.deferredResults.pop(taskid)
983 dlist = self.deferredResults.pop(taskid)
717 result.taskid = taskid # The TaskResult should save the taskid
984 # result.taskid = taskid # The TaskResult should save the taskid
718 self.finishedResults[taskid] = result
985 self.finishedResults[taskid] = result
719 for d in dlist:
986 for d in dlist:
720 d.callback(result)
987 d.callback(result)
721
988
722 def distributeTasks(self):
989 def distributeTasks(self):
723 """Distribute tasks while self.scheduler has things to do."""
990 """
724 # log.msg("distributing Tasks")
991 Distribute tasks while self.scheduler has things to do.
992 """
993 log.msg("distributing Tasks")
725 worker, task = self.scheduler.schedule()
994 worker, task = self.scheduler.schedule()
726 if not worker and not task:
995 if not worker and not task:
727 if self.idleLater and self.idleLater.called:# we are inside failIdle
996 if self.idleLater and self.idleLater.called:# we are inside failIdle
@@ -736,7 +1005,7 b' class TaskController(cs.ControllerAdapterBase):'
736 self.pendingTasks[worker.workerid] = task
1005 self.pendingTasks[worker.workerid] = task
737 # run/link callbacks
1006 # run/link callbacks
738 d = worker.run(task)
1007 d = worker.run(task)
739 # log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1008 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
740 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
1009 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
741 worker, task = self.scheduler.schedule()
1010 worker, task = self.scheduler.schedule()
742 # check for idle timeout:
1011 # check for idle timeout:
@@ -758,14 +1027,15 b' class TaskController(cs.ControllerAdapterBase):'
758 t = self.scheduler.pop_task()
1027 t = self.scheduler.pop_task()
759 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
1028 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
760 msg += " for %i seconds"%self.timeout
1029 msg += " for %i seconds"%self.timeout
761 # log.msg("Task aborted by timeout: %i" % t.taskid)
1030 log.msg("Task aborted by timeout: %i" % t.taskid)
762 f = failure.Failure(error.TaskTimeout(msg))
1031 f = failure.Failure(error.TaskTimeout(msg))
763 self._finishTask(t.taskid, f)
1032 self._finishTask(t.taskid, f)
764 self.idleLater = None
1033 self.idleLater = None
765
1034
766
1035
767 def taskCompleted(self, result, taskid, workerid):
1036 def taskCompleted(self, success_and_result, taskid, workerid):
768 """This is the err/callback for a completed task."""
1037 """This is the err/callback for a completed task."""
1038 success, result = success_and_result
769 try:
1039 try:
770 task = self.pendingTasks.pop(workerid)
1040 task = self.pendingTasks.pop(workerid)
771 except:
1041 except:
@@ -782,7 +1052,7 b' class TaskController(cs.ControllerAdapterBase):'
782 aborted = True
1052 aborted = True
783
1053
784 if not aborted:
1054 if not aborted:
785 if result.failure is not None and isinstance(result.failure, failure.Failure): # we failed
1055 if not success:
786 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1056 log.msg("Task %i failed on worker %i"% (taskid, workerid))
787 if task.retries > 0: # resubmit
1057 if task.retries > 0: # resubmit
788 task.retries -= 1
1058 task.retries -= 1
@@ -790,7 +1060,7 b' class TaskController(cs.ControllerAdapterBase):'
790 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
1060 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
791 log.msg(s)
1061 log.msg(s)
792 self.distributeTasks()
1062 self.distributeTasks()
793 elif isinstance(task.recovery_task, Task) and \
1063 elif isinstance(task.recovery_task, BaseTask) and \
794 task.recovery_task.retries > -1:
1064 task.recovery_task.retries > -1:
795 # retries = -1 is to prevent infinite recovery_task loop
1065 # retries = -1 is to prevent infinite recovery_task loop
796 task.retries = -1
1066 task.retries = -1
@@ -806,17 +1076,18 b' class TaskController(cs.ControllerAdapterBase):'
806 # it may have died, and not yet been unregistered
1076 # it may have died, and not yet been unregistered
807 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1077 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
808 else: # we succeeded
1078 else: # we succeeded
809 # log.msg("Task completed: %i"% taskid)
1079 log.msg("Task completed: %i"% taskid)
810 self._finishTask(taskid, result)
1080 self._finishTask(taskid, result)
811 self.readmitWorker(workerid)
1081 self.readmitWorker(workerid)
812 else:# we aborted the task
1082 else: # we aborted the task
813 if result.failure is not None and isinstance(result.failure, failure.Failure): # it failed, penalize worker
1083 if not success:
814 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1084 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
815 else:
1085 else:
816 self.readmitWorker(workerid)
1086 self.readmitWorker(workerid)
817
1087
818 def readmitWorker(self, workerid):
1088 def readmitWorker(self, workerid):
819 """Readmit a worker to the scheduler.
1089 """
1090 Readmit a worker to the scheduler.
820
1091
821 This is outside `taskCompleted` because of the `failurePenalty` being
1092 This is outside `taskCompleted` because of the `failurePenalty` being
822 implemented through `reactor.callLater`.
1093 implemented through `reactor.callLater`.
@@ -825,6 +1096,18 b' class TaskController(cs.ControllerAdapterBase):'
825 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
1096 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
826 self.scheduler.add_worker(self.workers[workerid])
1097 self.scheduler.add_worker(self.workers[workerid])
827 self.distributeTasks()
1098 self.distributeTasks()
1099
1100 def clear(self):
1101 """
1102 Clear all previously run tasks from the task controller.
1103
1104 This is needed because the task controller keep all task results
1105 in memory. This can be a problem is there are many completed
1106 tasks. Users should call this periodically to clean out these
1107 cached task results.
1108 """
1109 self.finishedResults = {}
1110 return defer.succeed(None)
828
1111
829
1112
830 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
1113 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
@@ -1,9 +1,8 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3
3
4 """The Generic Task Client object.
4 """
5
5 A blocking version of the task client.
6 This must be subclassed based on your connection method.
7 """
6 """
8
7
9 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
@@ -24,119 +23,100 b' from twisted.python import components, log'
24
23
25 from IPython.kernel.twistedutil import blockingCallFromThread
24 from IPython.kernel.twistedutil import blockingCallFromThread
26 from IPython.kernel import task, error
25 from IPython.kernel import task, error
26 from IPython.kernel.mapper import (
27 SynchronousTaskMapper,
28 ITaskMapperFactory,
29 IMapper
30 )
31 from IPython.kernel.parallelfunction import (
32 ParallelFunction,
33 ITaskParallelDecorator
34 )
27
35
28 #-------------------------------------------------------------------------------
36 #-------------------------------------------------------------------------------
29 # Connecting Task Client
37 # The task client
30 #-------------------------------------------------------------------------------
38 #-------------------------------------------------------------------------------
31
39
32 class InteractiveTaskClient(object):
33
34 def irun(self, *args, **kwargs):
35 """Run a task on the `TaskController`.
36
37 This method is a shorthand for run(task) and its arguments are simply
38 passed onto a `Task` object:
39
40 irun(*args, **kwargs) -> run(Task(*args, **kwargs))
41
42 :Parameters:
43 expression : str
44 A str that is valid python code that is the task.
45 pull : str or list of str
46 The names of objects to be pulled as results.
47 push : dict
48 A dict of objects to be pushed into the engines namespace before
49 execution of the expression.
50 clear_before : boolean
51 Should the engine's namespace be cleared before the task is run.
52 Default=False.
53 clear_after : boolean
54 Should the engine's namespace be cleared after the task is run.
55 Default=False.
56 retries : int
57 The number of times to resumbit the task if it fails. Default=0.
58 options : dict
59 Any other keyword options for more elaborate uses of tasks
60
61 :Returns: A `TaskResult` object.
62 """
63 block = kwargs.pop('block', False)
64 if len(args) == 1 and isinstance(args[0], task.Task):
65 t = args[0]
66 else:
67 t = task.Task(*args, **kwargs)
68 taskid = self.run(t)
69 print "TaskID = %i"%taskid
70 if block:
71 return self.get_task_result(taskid, block)
72 else:
73 return taskid
74
75 class IBlockingTaskClient(Interface):
40 class IBlockingTaskClient(Interface):
76 """
41 """
77 An interface for blocking task clients.
42 A vague interface of the blocking task client
78 """
43 """
79 pass
44 pass
80
45
81
46 class BlockingTaskClient(object):
82 class BlockingTaskClient(InteractiveTaskClient):
83 """
47 """
84 This class provides a blocking task client.
48 A blocking task client that adapts a non-blocking one.
85 """
49 """
86
50
87 implements(IBlockingTaskClient)
51 implements(
52 IBlockingTaskClient,
53 ITaskMapperFactory,
54 IMapper,
55 ITaskParallelDecorator
56 )
88
57
89 def __init__(self, task_controller):
58 def __init__(self, task_controller):
90 self.task_controller = task_controller
59 self.task_controller = task_controller
91 self.block = True
60 self.block = True
92
61
93 def run(self, task):
62 def run(self, task, block=False):
94 """
63 """Run a task on the `TaskController`.
95 Run a task and return a task id that can be used to get the task result.
64
65 See the documentation of the `MapTask` and `StringTask` classes for
66 details on how to build a task of different types.
96
67
97 :Parameters:
68 :Parameters:
98 task : `Task`
69 task : an `ITask` implementer
99 The `Task` object to run
70
71 :Returns: The int taskid of the submitted task. Pass this to
72 `get_task_result` to get the `TaskResult` object.
100 """
73 """
101 return blockingCallFromThread(self.task_controller.run, task)
74 tid = blockingCallFromThread(self.task_controller.run, task)
75 if block:
76 return self.get_task_result(tid, block=True)
77 else:
78 return tid
102
79
103 def get_task_result(self, taskid, block=False):
80 def get_task_result(self, taskid, block=False):
104 """
81 """
105 Get or poll for a task result.
82 Get a task result by taskid.
106
83
107 :Parameters:
84 :Parameters:
108 taskid : int
85 taskid : int
109 The id of the task whose result to get
86 The taskid of the task to be retrieved.
110 block : boolean
87 block : boolean
111 If True, wait until the task is done and then result the
88 Should I block until the task is done?
112 `TaskResult` object. If False, just poll for the result and
89
113 return None if the task is not done.
90 :Returns: A `TaskResult` object that encapsulates the task result.
114 """
91 """
115 return blockingCallFromThread(self.task_controller.get_task_result,
92 return blockingCallFromThread(self.task_controller.get_task_result,
116 taskid, block)
93 taskid, block)
117
94
118 def abort(self, taskid):
95 def abort(self, taskid):
119 """
96 """
120 Abort a task by task id if it has not been started.
97 Abort a task by taskid.
98
99 :Parameters:
100 taskid : int
101 The taskid of the task to be aborted.
121 """
102 """
122 return blockingCallFromThread(self.task_controller.abort, taskid)
103 return blockingCallFromThread(self.task_controller.abort, taskid)
123
104
124 def barrier(self, taskids):
105 def barrier(self, taskids):
125 """
106 """Block until a set of tasks are completed.
126 Wait for a set of tasks to finish.
127
107
128 :Parameters:
108 :Parameters:
129 taskids : list of ints
109 taskids : list, tuple
130 A list of task ids to wait for.
110 A sequence of taskids to block on.
131 """
111 """
132 return blockingCallFromThread(self.task_controller.barrier, taskids)
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
133
113
134 def spin(self):
114 def spin(self):
135 """
115 """
136 Cause the scheduler to schedule tasks.
116 Touch the scheduler, to resume scheduling without submitting a task.
137
117
138 This method only needs to be called in unusual situations where the
118 This method only needs to be called in unusual situations where the
139 scheduler is idle for some reason.
119 scheduler is idle for some reason.
140 """
120 """
141 return blockingCallFromThread(self.task_controller.spin)
121 return blockingCallFromThread(self.task_controller.spin)
142
122
@@ -153,7 +133,46 b' class BlockingTaskClient(InteractiveTaskClient):'
153 A dict with the queue status.
133 A dict with the queue status.
154 """
134 """
155 return blockingCallFromThread(self.task_controller.queue_status, verbose)
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
136
137 def clear(self):
138 """
139 Clear all previously run tasks from the task controller.
140
141 This is needed because the task controller keep all task results
142 in memory. This can be a problem is there are many completed
143 tasks. Users should call this periodically to clean out these
144 cached task results.
145 """
146 return blockingCallFromThread(self.task_controller.clear)
147
148 def map(self, func, *sequences):
149 """
150 Apply func to *sequences elementwise. Like Python's builtin map.
151
152 This version is load balanced.
153 """
154 return self.mapper().map(func, *sequences)
156
155
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
157 recovery_task=None, depend=None, block=True):
158 """
159 Create an `IMapper` implementer with a given set of arguments.
160
161 The `IMapper` created using a task controller is load balanced.
162
163 See the documentation for `IPython.kernel.task.BaseTask` for
164 documentation on the arguments to this method.
165 """
166 return SynchronousTaskMapper(self, clear_before=clear_before,
167 clear_after=clear_after, retries=retries,
168 recovery_task=recovery_task, depend=depend, block=block)
169
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
171 recovery_task=None, depend=None, block=True):
172 mapper = self.mapper(clear_before, clear_after, retries,
173 recovery_task, depend, block)
174 pf = ParallelFunction(mapper)
175 return pf
157
176
158 components.registerAdapter(BlockingTaskClient,
177 components.registerAdapter(BlockingTaskClient,
159 task.ITaskController, IBlockingTaskClient)
178 task.ITaskController, IBlockingTaskClient)
@@ -34,6 +34,15 b' from IPython.kernel.clientinterfaces import ('
34 IFCClientInterfaceProvider,
34 IFCClientInterfaceProvider,
35 IBlockingClientAdaptor
35 IBlockingClientAdaptor
36 )
36 )
37 from IPython.kernel.mapper import (
38 TaskMapper,
39 ITaskMapperFactory,
40 IMapper
41 )
42 from IPython.kernel.parallelfunction import (
43 ParallelFunction,
44 ITaskParallelDecorator
45 )
37
46
38 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
39 # The Controller side of things
48 # The Controller side of things
@@ -43,32 +52,38 b' from IPython.kernel.clientinterfaces import ('
43 class IFCTaskController(Interface):
52 class IFCTaskController(Interface):
44 """Foolscap interface to task controller.
53 """Foolscap interface to task controller.
45
54
46 See the documentation of ITaskController for documentation about the methods.
55 See the documentation of `ITaskController` for more information.
47 """
56 """
48 def remote_run(request, binTask):
57 def remote_run(binTask):
49 """"""
58 """"""
50
59
51 def remote_abort(request, taskid):
60 def remote_abort(taskid):
52 """"""
61 """"""
53
62
54 def remote_get_task_result(request, taskid, block=False):
63 def remote_get_task_result(taskid, block=False):
55 """"""
64 """"""
56
65
57 def remote_barrier(request, taskids):
66 def remote_barrier(taskids):
67 """"""
68
69 def remote_spin():
58 """"""
70 """"""
59
71
60 def remote_spin(request):
72 def remote_queue_status(verbose):
61 """"""
73 """"""
62
74
63 def remote_queue_status(request, verbose):
75 def remote_clear():
64 """"""
76 """"""
65
77
66
78
67 class FCTaskControllerFromTaskController(Referenceable):
79 class FCTaskControllerFromTaskController(Referenceable):
68 """XML-RPC attachmeot for controller.
69
70 See IXMLRPCTaskController and ITaskController (and its children) for documentation.
71 """
80 """
81 Adapt a `TaskController` to an `IFCTaskController`
82
83 This class is used to expose a `TaskController` over the wire using
84 the Foolscap network protocol.
85 """
86
72 implements(IFCTaskController, IFCClientInterfaceProvider)
87 implements(IFCTaskController, IFCClientInterfaceProvider)
73
88
74 def __init__(self, taskController):
89 def __init__(self, taskController):
@@ -92,8 +107,8 b' class FCTaskControllerFromTaskController(Referenceable):'
92
107
93 def remote_run(self, ptask):
108 def remote_run(self, ptask):
94 try:
109 try:
95 ctask = pickle.loads(ptask)
110 task = pickle.loads(ptask)
96 task = taskmodule.uncan_task(ctask)
111 task.uncan_task()
97 except:
112 except:
98 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
113 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
99 else:
114 else:
@@ -132,6 +147,9 b' class FCTaskControllerFromTaskController(Referenceable):'
132 d.addErrback(self.packageFailure)
147 d.addErrback(self.packageFailure)
133 return d
148 return d
134
149
150 def remote_clear(self):
151 return self.taskController.clear()
152
135 def remote_get_client_name(self):
153 def remote_get_client_name(self):
136 return 'IPython.kernel.taskfc.FCTaskClient'
154 return 'IPython.kernel.taskfc.FCTaskClient'
137
155
@@ -144,13 +162,23 b' components.registerAdapter(FCTaskControllerFromTaskController,'
144 #-------------------------------------------------------------------------------
162 #-------------------------------------------------------------------------------
145
163
146 class FCTaskClient(object):
164 class FCTaskClient(object):
147 """XML-RPC based TaskController client that implements ITaskController.
148
149 :Parameters:
150 addr : (ip, port)
151 The ip (str) and port (int) tuple of the `TaskController`.
152 """
165 """
153 implements(taskmodule.ITaskController, IBlockingClientAdaptor)
166 Client class for Foolscap exposed `TaskController`.
167
168 This class is an adapter that makes a `RemoteReference` to a
169 `TaskController` look like an actual `ITaskController` on the client side.
170
171 This class also implements `IBlockingClientAdaptor` so that clients can
172 automatically get a blocking version of this class.
173 """
174
175 implements(
176 taskmodule.ITaskController,
177 IBlockingClientAdaptor,
178 ITaskMapperFactory,
179 IMapper,
180 ITaskParallelDecorator
181 )
154
182
155 def __init__(self, remote_reference):
183 def __init__(self, remote_reference):
156 self.remote_reference = remote_reference
184 self.remote_reference = remote_reference
@@ -168,48 +196,26 b' class FCTaskClient(object):'
168 def run(self, task):
196 def run(self, task):
169 """Run a task on the `TaskController`.
197 """Run a task on the `TaskController`.
170
198
171 :Parameters:
199 See the documentation of the `MapTask` and `StringTask` classes for
172 task : a `Task` object
200 details on how to build a task of different types.
173
174 The Task object is created using the following signature:
175
176 Task(expression, pull=None, push={}, clear_before=False,
177 clear_after=False, retries=0, **options):)
178
201
179 The meaning of the arguments is as follows:
202 :Parameters:
203 task : an `ITask` implementer
180
204
181 :Task Parameters:
182 expression : str
183 A str that is valid python code that is the task.
184 pull : str or list of str
185 The names of objects to be pulled as results.
186 push : dict
187 A dict of objects to be pushed into the engines namespace before
188 execution of the expression.
189 clear_before : boolean
190 Should the engine's namespace be cleared before the task is run.
191 Default=False.
192 clear_after : boolean
193 Should the engine's namespace be cleared after the task is run.
194 Default=False.
195 retries : int
196 The number of times to resumbit the task if it fails. Default=0.
197 options : dict
198 Any other keyword options for more elaborate uses of tasks
199
200 :Returns: The int taskid of the submitted task. Pass this to
205 :Returns: The int taskid of the submitted task. Pass this to
201 `get_task_result` to get the `TaskResult` object.
206 `get_task_result` to get the `TaskResult` object.
202 """
207 """
203 assert isinstance(task, taskmodule.Task), "task must be a Task object!"
208 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
204 ctask = taskmodule.can_task(task) # handles arbitrary function in .depend
209 task.can_task()
205 # as well as arbitrary recovery_task chains
210 ptask = pickle.dumps(task, 2)
206 ptask = pickle.dumps(ctask, 2)
211 task.uncan_task()
207 d = self.remote_reference.callRemote('run', ptask)
212 d = self.remote_reference.callRemote('run', ptask)
208 d.addCallback(self.unpackage)
213 d.addCallback(self.unpackage)
209 return d
214 return d
210
215
211 def get_task_result(self, taskid, block=False):
216 def get_task_result(self, taskid, block=False):
212 """The task result by taskid.
217 """
218 Get a task result by taskid.
213
219
214 :Parameters:
220 :Parameters:
215 taskid : int
221 taskid : int
@@ -224,20 +230,19 b' class FCTaskClient(object):'
224 return d
230 return d
225
231
226 def abort(self, taskid):
232 def abort(self, taskid):
227 """Abort a task by taskid.
233 """
234 Abort a task by taskid.
228
235
229 :Parameters:
236 :Parameters:
230 taskid : int
237 taskid : int
231 The taskid of the task to be aborted.
238 The taskid of the task to be aborted.
232 block : boolean
233 Should I block until the task is aborted.
234 """
239 """
235 d = self.remote_reference.callRemote('abort', taskid)
240 d = self.remote_reference.callRemote('abort', taskid)
236 d.addCallback(self.unpackage)
241 d.addCallback(self.unpackage)
237 return d
242 return d
238
243
239 def barrier(self, taskids):
244 def barrier(self, taskids):
240 """Block until all tasks are completed.
245 """Block until a set of tasks are completed.
241
246
242 :Parameters:
247 :Parameters:
243 taskids : list, tuple
248 taskids : list, tuple
@@ -248,20 +253,77 b' class FCTaskClient(object):'
248 return d
253 return d
249
254
250 def spin(self):
255 def spin(self):
251 """touch the scheduler, to resume scheduling without submitting
256 """
252 a task.
257 Touch the scheduler, to resume scheduling without submitting a task.
258
259 This method only needs to be called in unusual situations where the
260 scheduler is idle for some reason.
253 """
261 """
254 d = self.remote_reference.callRemote('spin')
262 d = self.remote_reference.callRemote('spin')
255 d.addCallback(self.unpackage)
263 d.addCallback(self.unpackage)
256 return d
264 return d
257
265
258 def queue_status(self, verbose=False):
266 def queue_status(self, verbose=False):
259 """Return a dict with the status of the task queue."""
267 """
268 Get a dictionary with the current state of the task queue.
269
270 :Parameters:
271 verbose : boolean
272 If True, return a list of taskids. If False, simply give
273 the number of tasks with each status.
274
275 :Returns:
276 A dict with the queue status.
277 """
260 d = self.remote_reference.callRemote('queue_status', verbose)
278 d = self.remote_reference.callRemote('queue_status', verbose)
261 d.addCallback(self.unpackage)
279 d.addCallback(self.unpackage)
262 return d
280 return d
263
281
282 def clear(self):
283 """
284 Clear all previously run tasks from the task controller.
285
286 This is needed because the task controller keep all task results
287 in memory. This can be a problem is there are many completed
288 tasks. Users should call this periodically to clean out these
289 cached task results.
290 """
291 d = self.remote_reference.callRemote('clear')
292 return d
293
264 def adapt_to_blocking_client(self):
294 def adapt_to_blocking_client(self):
295 """
296 Wrap self in a blocking version that implements `IBlockingTaskClient.
297 """
265 from IPython.kernel.taskclient import IBlockingTaskClient
298 from IPython.kernel.taskclient import IBlockingTaskClient
266 return IBlockingTaskClient(self)
299 return IBlockingTaskClient(self)
300
301 def map(self, func, *sequences):
302 """
303 Apply func to *sequences elementwise. Like Python's builtin map.
304
305 This version is load balanced.
306 """
307 return self.mapper().map(func, *sequences)
308
309 def mapper(self, clear_before=False, clear_after=False, retries=0,
310 recovery_task=None, depend=None, block=True):
311 """
312 Create an `IMapper` implementer with a given set of arguments.
313
314 The `IMapper` created using a task controller is load balanced.
315
316 See the documentation for `IPython.kernel.task.BaseTask` for
317 documentation on the arguments to this method.
318 """
319 return TaskMapper(self, clear_before=clear_before,
320 clear_after=clear_after, retries=retries,
321 recovery_task=recovery_task, depend=depend, block=block)
322
323 def parallel(self, clear_before=False, clear_after=False, retries=0,
324 recovery_task=None, depend=None, block=True):
325 mapper = self.mapper(clear_before, clear_after, retries,
326 recovery_task, depend, block)
327 pf = ParallelFunction(mapper)
328 return pf
267
329
@@ -43,23 +43,23 b' class TaskTestBase(object):'
43
43
44 class ITaskControllerTestCase(TaskTestBase):
44 class ITaskControllerTestCase(TaskTestBase):
45
45
46 def testTaskIDs(self):
46 def test_task_ids(self):
47 self.addEngine(1)
47 self.addEngine(1)
48 d = self.tc.run(task.Task('a=5'))
48 d = self.tc.run(task.StringTask('a=5'))
49 d.addCallback(lambda r: self.assertEquals(r, 0))
49 d.addCallback(lambda r: self.assertEquals(r, 0))
50 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
50 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
51 d.addCallback(lambda r: self.assertEquals(r, 1))
51 d.addCallback(lambda r: self.assertEquals(r, 1))
52 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
52 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
53 d.addCallback(lambda r: self.assertEquals(r, 2))
53 d.addCallback(lambda r: self.assertEquals(r, 2))
54 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
54 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
55 d.addCallback(lambda r: self.assertEquals(r, 3))
55 d.addCallback(lambda r: self.assertEquals(r, 3))
56 return d
56 return d
57
57
58 def testAbort(self):
58 def test_abort(self):
59 """Cannot do a proper abort test, because blocking execution prevents
59 """Cannot do a proper abort test, because blocking execution prevents
60 abort from being called before task completes"""
60 abort from being called before task completes"""
61 self.addEngine(1)
61 self.addEngine(1)
62 t = task.Task('a=5')
62 t = task.StringTask('a=5')
63 d = self.tc.abort(0)
63 d = self.tc.abort(0)
64 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
64 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
65 d.addCallback(lambda _:self.tc.run(t))
65 d.addCallback(lambda _:self.tc.run(t))
@@ -67,15 +67,15 b' class ITaskControllerTestCase(TaskTestBase):'
67 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
67 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
68 return d
68 return d
69
69
70 def testAbortType(self):
70 def test_abort_type(self):
71 self.addEngine(1)
71 self.addEngine(1)
72 d = self.tc.abort('asdfadsf')
72 d = self.tc.abort('asdfadsf')
73 d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException))
73 d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException))
74 return d
74 return d
75
75
76 def testClears(self):
76 def test_clear_before_and_after(self):
77 self.addEngine(1)
77 self.addEngine(1)
78 t = task.Task('a=1', clear_before=True, pull='b', clear_after=True)
78 t = task.StringTask('a=1', clear_before=True, pull='b', clear_after=True)
79 d = self.multiengine.execute('b=1', targets=0)
79 d = self.multiengine.execute('b=1', targets=0)
80 d.addCallback(lambda _: self.tc.run(t))
80 d.addCallback(lambda _: self.tc.run(t))
81 d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True))
81 d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True))
@@ -85,10 +85,10 b' class ITaskControllerTestCase(TaskTestBase):'
85 d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
85 d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
86 return d
86 return d
87
87
88 def testSimpleRetries(self):
88 def test_simple_retries(self):
89 self.addEngine(1)
89 self.addEngine(1)
90 t = task.Task("i += 1\nassert i == 16", pull='i',retries=10)
90 t = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10)
91 t2 = task.Task("i += 1\nassert i == 16", pull='i',retries=10)
91 t2 = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10)
92 d = self.multiengine.execute('i=0', targets=0)
92 d = self.multiengine.execute('i=0', targets=0)
93 d.addCallback(lambda r: self.tc.run(t))
93 d.addCallback(lambda r: self.tc.run(t))
94 d.addCallback(self.tc.get_task_result, block=True)
94 d.addCallback(self.tc.get_task_result, block=True)
@@ -101,10 +101,10 b' class ITaskControllerTestCase(TaskTestBase):'
101 d.addCallback(lambda r: self.assertEquals(r, 16))
101 d.addCallback(lambda r: self.assertEquals(r, 16))
102 return d
102 return d
103
103
104 def testRecoveryTasks(self):
104 def test_recovery_tasks(self):
105 self.addEngine(1)
105 self.addEngine(1)
106 t = task.Task("i=16", pull='i')
106 t = task.StringTask("i=16", pull='i')
107 t2 = task.Task("raise Exception", recovery_task=t, retries = 2)
107 t2 = task.StringTask("raise Exception", recovery_task=t, retries = 2)
108
108
109 d = self.tc.run(t2)
109 d = self.tc.run(t2)
110 d.addCallback(self.tc.get_task_result, block=True)
110 d.addCallback(self.tc.get_task_result, block=True)
@@ -112,47 +112,76 b' class ITaskControllerTestCase(TaskTestBase):'
112 d.addCallback(lambda r: self.assertEquals(r, 16))
112 d.addCallback(lambda r: self.assertEquals(r, 16))
113 return d
113 return d
114
114
115 # def testInfiniteRecoveryLoop(self):
115 def test_setup_ns(self):
116 # self.addEngine(1)
117 # t = task.Task("raise Exception", retries = 5)
118 # t2 = task.Task("assert True", retries = 2, recovery_task = t)
119 # t.recovery_task = t2
120 #
121 # d = self.tc.run(t)
122 # d.addCallback(self.tc.get_task_result, block=True)
123 # d.addCallback(lambda tr: tr.ns.i)
124 # d.addBoth(printer)
125 # d.addErrback(lambda f: self.assertRaises(AssertionError, f.raiseException))
126 # return d
127 #
128 def testSetupNS(self):
129 self.addEngine(1)
116 self.addEngine(1)
130 d = self.multiengine.execute('a=0', targets=0)
117 d = self.multiengine.execute('a=0', targets=0)
131 ns = dict(a=1, b=0)
118 ns = dict(a=1, b=0)
132 t = task.Task("", push=ns, pull=['a','b'])
119 t = task.StringTask("", push=ns, pull=['a','b'])
133 d.addCallback(lambda r: self.tc.run(t))
120 d.addCallback(lambda r: self.tc.run(t))
134 d.addCallback(self.tc.get_task_result, block=True)
121 d.addCallback(self.tc.get_task_result, block=True)
135 d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']})
122 d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']})
136 d.addCallback(lambda r: self.assertEquals(r, ns))
123 d.addCallback(lambda r: self.assertEquals(r, ns))
137 return d
124 return d
138
125
139 def testTaskResults(self):
126 def test_string_task_results(self):
140 self.addEngine(1)
127 self.addEngine(1)
141 t1 = task.Task('a=5', pull='a')
128 t1 = task.StringTask('a=5', pull='a')
142 d = self.tc.run(t1)
129 d = self.tc.run(t1)
143 d.addCallback(self.tc.get_task_result, block=True)
130 d.addCallback(self.tc.get_task_result, block=True)
144 d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raiseException()))
131 d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raise_exception()))
145 d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None)))
132 d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None)))
146
133
147 t2 = task.Task('7=5')
134 t2 = task.StringTask('7=5')
148 d.addCallback(lambda r: self.tc.run(t2))
135 d.addCallback(lambda r: self.tc.run(t2))
149 d.addCallback(self.tc.get_task_result, block=True)
136 d.addCallback(self.tc.get_task_result, block=True)
150 d.addCallback(lambda tr: tr.ns)
137 d.addCallback(lambda tr: tr.ns)
151 d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException))
138 d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException))
152
139
153 t3 = task.Task('', pull='b')
140 t3 = task.StringTask('', pull='b')
154 d.addCallback(lambda r: self.tc.run(t3))
141 d.addCallback(lambda r: self.tc.run(t3))
155 d.addCallback(self.tc.get_task_result, block=True)
142 d.addCallback(self.tc.get_task_result, block=True)
156 d.addCallback(lambda tr: tr.ns)
143 d.addCallback(lambda tr: tr.ns)
157 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
144 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
158 return d
145 return d
146
147 def test_map_task(self):
148 self.addEngine(1)
149 t1 = task.MapTask(lambda x: 2*x,(10,))
150 d = self.tc.run(t1)
151 d.addCallback(self.tc.get_task_result, block=True)
152 d.addCallback(lambda r: self.assertEquals(r,20))
153
154 t2 = task.MapTask(lambda : 20)
155 d.addCallback(lambda _: self.tc.run(t2))
156 d.addCallback(self.tc.get_task_result, block=True)
157 d.addCallback(lambda r: self.assertEquals(r,20))
158
159 t3 = task.MapTask(lambda x: x,(),{'x':20})
160 d.addCallback(lambda _: self.tc.run(t3))
161 d.addCallback(self.tc.get_task_result, block=True)
162 d.addCallback(lambda r: self.assertEquals(r,20))
163 return d
164
165 def test_map_task_failure(self):
166 self.addEngine(1)
167 t1 = task.MapTask(lambda x: 1/0,(10,))
168 d = self.tc.run(t1)
169 d.addCallback(self.tc.get_task_result, block=True)
170 d.addErrback(lambda f: self.assertRaises(ZeroDivisionError, f.raiseException))
171 return d
172
173 def test_map_task_args(self):
174 self.assertRaises(TypeError, task.MapTask, 'asdfasdf')
175 self.assertRaises(TypeError, task.MapTask, lambda x: x, 10)
176 self.assertRaises(TypeError, task.MapTask, lambda x: x, (10,),30)
177
178 def test_clear(self):
179 self.addEngine(1)
180 t1 = task.MapTask(lambda x: 2*x,(10,))
181 d = self.tc.run(t1)
182 d.addCallback(lambda _: self.tc.get_task_result(0, block=True))
183 d.addCallback(lambda r: self.assertEquals(r,20))
184 d.addCallback(lambda _: self.tc.clear())
185 d.addCallback(lambda _: self.tc.get_task_result(0, block=True))
186 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
187 return d
@@ -30,6 +30,8 b' try:'
30 from IPython.kernel.util import printer
30 from IPython.kernel.util import printer
31 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
31 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
32 from IPython.kernel.clientconnector import ClientConnector
32 from IPython.kernel.clientconnector import ClientConnector
33 from IPython.kernel.error import CompositeError
34 from IPython.kernel.parallelfunction import ParallelFunction
33 except ImportError:
35 except ImportError:
34 pass
36 pass
35 else:
37 else:
@@ -38,6 +40,12 b' else:'
38 # Tests
40 # Tests
39 #-------------------------------------------------------------------------------
41 #-------------------------------------------------------------------------------
40
42
43 def _raise_it(f):
44 try:
45 f.raiseException()
46 except CompositeError, e:
47 e.raise_exception()
48
41 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
42
50
43 def setUp(self):
51 def setUp(self):
@@ -87,4 +95,67 b' else:'
87 d.addBoth(lambda _: self.controller.stopService())
95 d.addBoth(lambda _: self.controller.stopService())
88 dlist.append(d)
96 dlist.append(d)
89 return defer.DeferredList(dlist)
97 return defer.DeferredList(dlist)
90
98
99 def test_mapper(self):
100 self.addEngine(1)
101 m = self.tc.mapper()
102 self.assertEquals(m.task_controller,self.tc)
103 self.assertEquals(m.clear_before,False)
104 self.assertEquals(m.clear_after,False)
105 self.assertEquals(m.retries,0)
106 self.assertEquals(m.recovery_task,None)
107 self.assertEquals(m.depend,None)
108 self.assertEquals(m.block,True)
109
110 def test_map_default(self):
111 self.addEngine(1)
112 m = self.tc.mapper()
113 d = m.map(lambda x: 2*x, range(10))
114 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
117 return d
118
119 def test_map_noblock(self):
120 self.addEngine(1)
121 m = self.tc.mapper(block=False)
122 d = m.map(lambda x: 2*x, range(10))
123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
124 return d
125
126 def test_mapper_fail(self):
127 self.addEngine(1)
128 m = self.tc.mapper()
129 d = m.map(lambda x: 1/0, range(10))
130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
131 return d
132
133 def test_parallel(self):
134 self.addEngine(1)
135 p = self.tc.parallel()
136 self.assert_(isinstance(p, ParallelFunction))
137 @p
138 def f(x): return 2*x
139 d = f(range(10))
140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
141 return d
142
143 def test_parallel_noblock(self):
144 self.addEngine(1)
145 p = self.tc.parallel(block=False)
146 self.assert_(isinstance(p, ParallelFunction))
147 @p
148 def f(x): return 2*x
149 d = f(range(10))
150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
151 return d
152
153 def test_parallel_fail(self):
154 self.addEngine(1)
155 p = self.tc.parallel()
156 self.assert_(isinstance(p, ParallelFunction))
157 @p
158 def f(x): return 1/0
159 d = f(range(10))
160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
161 return d No newline at end of file
@@ -31,7 +31,7 b' sigma_vals = N.linspace(0.0, 0.2,5)'
31 taskids = []
31 taskids = []
32 for K in K_vals:
32 for K in K_vals:
33 for sigma in sigma_vals:
33 for sigma in sigma_vals:
34 t = client.Task(task_string,
34 t = client.StringTask(task_string,
35 push=dict(sigma=sigma,K=K),
35 push=dict(sigma=sigma,K=K),
36 pull=('vp','ap','vc','ac','sigma','K'))
36 pull=('vp','ap','vc','ac','sigma','K'))
37 taskids.append(tc.run(t))
37 taskids.append(tc.run(t))
@@ -11,7 +11,7 b' b = 10*d'
11 c = a*b*d
11 c = a*b*d
12 """
12 """
13
13
14 t1 = client.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
14 t1 = client.StringTask(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
15 tid1 = tc.run(t1)
15 tid1 = tc.run(t1)
16 tr1 = tc.get_task_result(tid1,block=True)
16 tr1 = tc.get_task_result(tid1,block=True)
17 tr1.raiseException()
17 tr1.raiseException()
@@ -10,7 +10,7 b' mec = client.MultiEngineClient()'
10 mec.execute('import time')
10 mec.execute('import time')
11
11
12 for i in range(24):
12 for i in range(24):
13 tc.irun('time.sleep(1)')
13 tc.run(client.StringTask('time.sleep(1)'))
14
14
15 for i in range(6):
15 for i in range(6):
16 time.sleep(1.0)
16 time.sleep(1.0)
@@ -18,7 +18,7 b' for i in range(6):'
18 print tc.queue_status()
18 print tc.queue_status()
19
19
20 for i in range(24):
20 for i in range(24):
21 tc.irun('time.sleep(1)')
21 tc.run(client.StringTask('time.sleep(1)'))
22
22
23 for i in range(6):
23 for i in range(6):
24 time.sleep(1.0)
24 time.sleep(1.0)
@@ -26,7 +26,7 b' for i in range(6):'
26 print tc.queue_status(True)
26 print tc.queue_status(True)
27
27
28 for i in range(12):
28 for i in range(12):
29 tc.irun('time.sleep(2)')
29 tc.run(client.StringTask('time.sleep(2)'))
30
30
31 print "Queue status (vebose=True)"
31 print "Queue status (vebose=True)"
32 print tc.queue_status(True)
32 print tc.queue_status(True)
@@ -55,7 +55,7 b' def main():'
55
55
56 # the jobs should take a random time within a range
56 # the jobs should take a random time within a range
57 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
57 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
58 tasks = [client.Task("time.sleep(%f)"%t) for t in times]
58 tasks = [client.StringTask("time.sleep(%f)"%t) for t in times]
59 stime = sum(times)
59 stime = sum(times)
60
60
61 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
61 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
General Comments 0
You need to be logged in to leave comments. Login now