##// END OF EJS Templates
Merge with upstream
Fernando Perez -
r1405:dc02b179 merge
parent child Browse files
Show More
@@ -0,0 +1,233 b''
1 # encoding: utf-8
2
3 """A parallelized version of Python's builtin map."""
4
5 __docformat__ = "restructuredtext en"
6
7 #----------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #----------------------------------------------------------------------------
13
14 #----------------------------------------------------------------------------
15 # Imports
16 #----------------------------------------------------------------------------
17
18 from types import FunctionType
19 from zope.interface import Interface, implements
20 from IPython.kernel.task import MapTask
21 from IPython.kernel.twistedutil import DeferredList, gatherBoth
22 from IPython.kernel.util import printer
23 from IPython.kernel.error import collect_exceptions
24
25 #----------------------------------------------------------------------------
26 # Code
27 #----------------------------------------------------------------------------
28
29 class IMapper(Interface):
30 """The basic interface for a Mapper.
31
32 This defines a generic interface for mapping. The idea of this is
33 similar to that of Python's builtin `map` function, which applies a function
34 elementwise to a sequence.
35 """
36
37 def map(func, *seqs):
38 """Do map in parallel.
39
40 Equivalent to map(func, *seqs) or:
41
42 [func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...]
43
44 :Parameters:
45 func : FunctionType
46 The function to apply to the sequence
47 sequences : tuple of iterables
48 A sequence of iterables that are used for sucessive function
49 arguments. This work just like map
50 """
51
52 class IMultiEngineMapperFactory(Interface):
53 """
54 An interface for something that creates `IMapper` instances.
55 """
56
57 def mapper(dist='b', targets='all', block=True):
58 """
59 Create an `IMapper` implementer with a given set of arguments.
60
61 The `IMapper` created using a multiengine controller is
62 not load balanced.
63 """
64
65 class ITaskMapperFactory(Interface):
66 """
67 An interface for something that creates `IMapper` instances.
68 """
69
70 def mapper(clear_before=False, clear_after=False, retries=0,
71 recovery_task=None, depend=None, block=True):
72 """
73 Create an `IMapper` implementer with a given set of arguments.
74
75 The `IMapper` created using a task controller is load balanced.
76
77 See the documentation for `IPython.kernel.task.BaseTask` for
78 documentation on the arguments to this method.
79 """
80
81
82 class MultiEngineMapper(object):
83 """
84 A Mapper for `IMultiEngine` implementers.
85 """
86
87 implements(IMapper)
88
89 def __init__(self, multiengine, dist='b', targets='all', block=True):
90 """
91 Create a Mapper for a multiengine.
92
93 The value of all arguments are used for all calls to `map`. This
94 class allows these arguemnts to be set for a series of map calls.
95
96 :Parameters:
97 multiengine : `IMultiEngine` implementer
98 The multiengine to use for running the map commands
99 dist : str
100 The type of decomposition to use. Only block ('b') is
101 supported currently
102 targets : (str, int, tuple of ints)
103 The engines to use in the map
104 block : boolean
105 Whether to block when the map is applied
106 """
107 self.multiengine = multiengine
108 self.dist = dist
109 self.targets = targets
110 self.block = block
111
112 def map(self, func, *sequences):
113 """
114 Apply func to *sequences elementwise. Like Python's builtin map.
115
116 This version is not load balanced.
117 """
118 max_len = max(len(s) for s in sequences)
119 for s in sequences:
120 if len(s)!=max_len:
121 raise ValueError('all sequences must have equal length')
122 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
123 return self.multiengine.raw_map(func, sequences, dist=self.dist,
124 targets=self.targets, block=self.block)
125
126 class TaskMapper(object):
127 """
128 Make an `ITaskController` look like an `IMapper`.
129
130 This class provides a load balanced version of `map`.
131 """
132
133 def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
134 recovery_task=None, depend=None, block=True):
135 """
136 Create a `IMapper` given a `TaskController` and arguments.
137
138 The additional arguments are those that are common to all types of
139 tasks and are described in the documentation for
140 `IPython.kernel.task.BaseTask`.
141
142 :Parameters:
143 task_controller : an `IBlockingTaskClient` implementer
144 The `TaskController` to use for calls to `map`
145 """
146 self.task_controller = task_controller
147 self.clear_before = clear_before
148 self.clear_after = clear_after
149 self.retries = retries
150 self.recovery_task = recovery_task
151 self.depend = depend
152 self.block = block
153
154 def map(self, func, *sequences):
155 """
156 Apply func to *sequences elementwise. Like Python's builtin map.
157
158 This version is load balanced.
159 """
160 max_len = max(len(s) for s in sequences)
161 for s in sequences:
162 if len(s)!=max_len:
163 raise ValueError('all sequences must have equal length')
164 task_args = zip(*sequences)
165 task_ids = []
166 dlist = []
167 for ta in task_args:
168 task = MapTask(func, ta, clear_before=self.clear_before,
169 clear_after=self.clear_after, retries=self.retries,
170 recovery_task=self.recovery_task, depend=self.depend)
171 dlist.append(self.task_controller.run(task))
172 dlist = gatherBoth(dlist, consumeErrors=1)
173 dlist.addCallback(collect_exceptions,'map')
174 if self.block:
175 def get_results(task_ids):
176 d = self.task_controller.barrier(task_ids)
177 d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1))
178 d.addCallback(collect_exceptions, 'map')
179 return d
180 dlist.addCallback(get_results)
181 return dlist
182
183 class SynchronousTaskMapper(object):
184 """
185 Make an `IBlockingTaskClient` look like an `IMapper`.
186
187 This class provides a load balanced version of `map`.
188 """
189
190 def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
191 recovery_task=None, depend=None, block=True):
192 """
193 Create a `IMapper` given a `IBlockingTaskClient` and arguments.
194
195 The additional arguments are those that are common to all types of
196 tasks and are described in the documentation for
197 `IPython.kernel.task.BaseTask`.
198
199 :Parameters:
200 task_controller : an `IBlockingTaskClient` implementer
201 The `TaskController` to use for calls to `map`
202 """
203 self.task_controller = task_controller
204 self.clear_before = clear_before
205 self.clear_after = clear_after
206 self.retries = retries
207 self.recovery_task = recovery_task
208 self.depend = depend
209 self.block = block
210
211 def map(self, func, *sequences):
212 """
213 Apply func to *sequences elementwise. Like Python's builtin map.
214
215 This version is load balanced.
216 """
217 max_len = max(len(s) for s in sequences)
218 for s in sequences:
219 if len(s)!=max_len:
220 raise ValueError('all sequences must have equal length')
221 task_args = zip(*sequences)
222 task_ids = []
223 for ta in task_args:
224 task = MapTask(func, ta, clear_before=self.clear_before,
225 clear_after=self.clear_after, retries=self.retries,
226 recovery_task=self.recovery_task, depend=self.depend)
227 task_ids.append(self.task_controller.run(task))
228 if self.block:
229 self.task_controller.barrier(task_ids)
230 task_results = [self.task_controller.get_task_result(tid) for tid in task_ids]
231 return task_results
232 else:
233 return task_ids No newline at end of file
1 NO CONTENT: new file 100644
@@ -0,0 +1,18 b''
1 from IPython.kernel import client
2
3 mec = client.MultiEngineClient()
4
5 result = mec.map(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
7
8 m = mec.mapper(block=False)
9 pr = m.map(lambda x: 2*x, range(10))
10 print "Submitted map, got PendingResult: ", pr
11 result = pr.r
12 print "Using a mapper: ", result
13
14 @mec.parallel()
15 def f(x): return 2*x
16
17 result = f(range(10))
18 print "Using a parallel function: ", result No newline at end of file
@@ -0,0 +1,19 b''
1 from IPython.kernel import client
2
3 tc = client.TaskClient()
4
5 result = tc.map(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
7
8 m = tc.mapper(block=False, clear_after=True, clear_before=True)
9 tids = m.map(lambda x: 2*x, range(10))
10 print "Submitted tasks, got ids: ", tids
11 tc.barrier(tids)
12 result = [tc.get_task_result(tid) for tid in tids]
13 print "Using a mapper: ", result
14
15 @tc.parallel()
16 def f(x): return 2*x
17
18 result = f(range(10))
19 print "Using a parallel function: ", result No newline at end of file
@@ -83,7 +83,7 b' class TestAsyncFrontendBase(unittest.TestCase):'
83 83 d.addCallback(self.checkBlockID, expected='TEST_ID')
84 84
85 85 def test_blockID_added_to_failure(self):
86 block = "raise Exception()"
86 block = "raise Exception()"
87 87
88 88 d = self.fb.execute(block,blockID='TEST_ID')
89 89 d.addErrback(self.checkFailureID, expected='TEST_ID')
@@ -27,7 +27,7 b' from IPython.kernel import codeutil'
27 27 from IPython.kernel.clientconnector import ClientConnector
28 28
29 29 # Other things that the user will need
30 from IPython.kernel.task import Task
30 from IPython.kernel.task import MapTask, StringTask
31 31 from IPython.kernel.error import CompositeError
32 32
33 33 #-------------------------------------------------------------------------------
@@ -44,7 +44,7 b' from IPython.kernel import codeutil'
44 44 import IPython.kernel.magic
45 45
46 46 # Other things that the user will need
47 from IPython.kernel.task import Task
47 from IPython.kernel.task import MapTask, StringTask
48 48 from IPython.kernel.error import CompositeError
49 49
50 50 #-------------------------------------------------------------------------------
@@ -141,46 +141,3 b' class RemoteMultiEngine(RemoteContextBase):'
141 141 def __enter__(self):
142 142 src = self.findsource(sys._getframe(1))
143 143 return self.mec.execute(src)
144
145
146 # XXX - Temporary hackish testing, we'll move this into proper tests right
147 # away
148
149 if __name__ == '__main__':
150
151 # XXX - for now, we need a running cluster to be started separately. The
152 # daemon work is almost finished, and will make much of this unnecessary.
153 from IPython.kernel import client
154 mec = client.MultiEngineClient(('127.0.0.1',10105))
155
156 try:
157 mec.get_ids()
158 except ConnectionRefusedError:
159 import os, time
160 os.system('ipcluster -n 2 &')
161 time.sleep(2)
162 mec = client.MultiEngineClient(('127.0.0.1',10105))
163
164 mec.block = False
165
166 import itertools
167 c = itertools.count()
168
169 parallel = RemoteMultiEngine(mec)
170
171 mec.pushAll()
172
173 with parallel as pr:
174 # A comment
175 remote() # this means the code below only runs remotely
176 print 'Hello remote world'
177 x = range(10)
178 # Comments are OK
179 # Even misindented.
180 y = x+1
181
182
183 with pfor('i',sequence) as pr:
184 print x[i]
185
186 print pr.x + pr.y
@@ -79,7 +79,7 b" def magic_px(self,parameter_s=''):"
79 79 except AttributeError:
80 80 print NO_ACTIVE_CONTROLLER
81 81 else:
82 print "Executing command on Controller"
82 print "Parallel execution on engines: %s" % activeController.targets
83 83 result = activeController.execute(parameter_s)
84 84 return result
85 85
@@ -115,7 +115,7 b' class RoundRobinMap(Map):'
115 115 # result.append(concat[i:totalLength:maxPartitionLength])
116 116 return self.concatenate(listOfPartitions)
117 117
118 styles = {'basic':Map}
118 dists = {'b':Map}
119 119
120 120
121 121
@@ -653,67 +653,55 b' components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMul'
653 653 class IMultiEngineCoordinator(Interface):
654 654 """Methods that work on multiple engines explicitly."""
655 655
656 def scatter(key, seq, style='basic', flatten=False, targets='all'):
657 """Partition and distribute a sequence to targets.
656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 """Partition and distribute a sequence to targets."""
658 658
659 :Parameters:
660 key : str
661 The variable name to call the scattered sequence.
662 seq : list, tuple, array
663 The sequence to scatter. The type should be preserved.
664 style : string
665 A specification of how the sequence is partitioned. Currently
666 only 'basic' is implemented.
667 flatten : boolean
668 Should single element sequences be converted to scalars.
669 """
670
671 def gather(key, style='basic', targets='all'):
672 """Gather object key from targets.
659 def gather(key, dist='b', targets='all'):
660 """Gather object key from targets."""
673 661
674 :Parameters:
675 key : string
676 The name of a sequence on the targets to gather.
677 style : string
678 A specification of how the sequence is partitioned. Currently
679 only 'basic' is implemented.
662 def raw_map(func, seqs, dist='b', targets='all'):
680 663 """
681
682 def map(func, seq, style='basic', targets='all'):
683 """A parallelized version of Python's builtin map.
664 A parallelized version of Python's builtin `map` function.
684 665
685 This function implements the following pattern:
666 This has a slightly different syntax than the builtin `map`.
667 This is needed because we need to have keyword arguments and thus
668 can't use *args to capture all the sequences. Instead, they must
669 be passed in a list or tuple.
686 670
687 1. The sequence seq is scattered to the given targets.
688 2. map(functionSource, seq) is called on each engine.
689 3. The resulting sequences are gathered back to the local machine.
690
691 :Parameters:
692 targets : int, list or 'all'
693 The engine ids the action will apply to. Call `get_ids` to see
694 a list of currently available engines.
695 func : str, function
696 An actual function object or a Python string that names a
697 callable defined on the engines.
698 seq : list, tuple or numpy array
699 The local sequence to be scattered.
700 style : str
701 Only 'basic' is supported for now.
702
703 :Returns: A list of len(seq) with functionSource called on each element
704 of seq.
705
706 Example
707 =======
671 The equivalence is:
708 672
709 >>> rc.mapAll('lambda x: x*x', range(10000))
710 [0,2,4,9,25,36,...]
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674
675 Most users will want to use parallel functions or the `mapper`
676 and `map` methods for an API that follows that of the builtin
677 `map`.
711 678 """
712 679
713 680
714 681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
715 682 """Methods that work on multiple engines explicitly."""
716 pass
683
684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 """Partition and distribute a sequence to targets."""
686
687 def gather(key, dist='b', targets='all', block=True):
688 """Gather object key from targets"""
689
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 """
692 A parallelized version of Python's builtin map.
693
694 This has a slightly different syntax than the builtin `map`.
695 This is needed because we need to have keyword arguments and thus
696 can't use *args to capture all the sequences. Instead, they must
697 be passed in a list or tuple.
698
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700
701 Most users will want to use parallel functions or the `mapper`
702 and `map` methods for an API that follows that of the builtin
703 `map`.
704 """
717 705
718 706
719 707 #-------------------------------------------------------------------------------
@@ -722,46 +710,31 b' class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):'
722 710
723 711 class IMultiEngineExtras(Interface):
724 712
725 def zip_pull(targets, *keys):
726 """Pull, but return results in a different format from `pull`.
713 def zip_pull(targets, keys):
714 """
715 Pull, but return results in a different format from `pull`.
727 716
728 717 This method basically returns zip(pull(targets, *keys)), with a few
729 718 edge cases handled differently. Users of chainsaw will find this format
730 719 familiar.
731
732 :Parameters:
733 targets : int, list or 'all'
734 The engine ids the action will apply to. Call `get_ids` to see
735 a list of currently available engines.
736 keys: list or tuple of str
737 A list of variable names as string of the Python objects to be pulled
738 back to the client.
739
740 :Returns: A list of pulled Python objects for each target.
741 720 """
742 721
743 722 def run(targets, fname):
744 """Run a .py file on targets.
745
746 :Parameters:
747 targets : int, list or 'all'
748 The engine ids the action will apply to. Call `get_ids` to see
749 a list of currently available engines.
750 fname : str
751 The filename of a .py file on the local system to be sent to and run
752 on the engines.
753 block : boolean
754 Should I block or not. If block=True, wait for the action to
755 complete and return the result. If block=False, return a
756 `PendingResult` object that can be used to later get the
757 result. If block is not specified, the block attribute
758 will be used instead.
759 """
723 """Run a .py file on targets."""
760 724
761 725
762 726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
763 pass
764
727 def zip_pull(targets, keys, block=True):
728 """
729 Pull, but return results in a different format from `pull`.
730
731 This method basically returns zip(pull(targets, *keys)), with a few
732 edge cases handled differently. Users of chainsaw will find this format
733 familiar.
734 """
735
736 def run(targets, fname, block=True):
737 """Run a .py file on targets."""
765 738
766 739 #-------------------------------------------------------------------------------
767 740 # The full MultiEngine interface
@@ -31,6 +31,11 b' from IPython.ColorANSI import TermColors'
31 31 from IPython.kernel.twistedutil import blockingCallFromThread
32 32 from IPython.kernel import error
33 33 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.mapper import (
35 MultiEngineMapper,
36 IMultiEngineMapperFactory,
37 IMapper
38 )
34 39 from IPython.kernel import map as Map
35 40 from IPython.kernel import multiengine as me
36 41 from IPython.kernel.multiengine import (IFullMultiEngine,
@@ -186,10 +191,14 b' class ResultList(list):'
186 191
187 192 def __repr__(self):
188 193 output = []
189 blue = TermColors.Blue
190 normal = TermColors.Normal
191 red = TermColors.Red
192 green = TermColors.Green
194 # These colored prompts were not working on Windows
195 if sys.platform == 'win32':
196 blue = normal = red = green = ''
197 else:
198 blue = TermColors.Blue
199 normal = TermColors.Normal
200 red = TermColors.Red
201 green = TermColors.Green
193 202 output.append("<Results List>\n")
194 203 for cmd in self:
195 204 if isinstance(cmd, Failure):
@@ -294,28 +303,7 b' class InteractiveMultiEngineClient(object):'
294 303 def __len__(self):
295 304 """Return the number of available engines."""
296 305 return len(self.get_ids())
297
298 def parallelize(self, func, targets=None, block=None):
299 """Build a `ParallelFunction` object for functionName on engines.
300
301 The returned object will implement a parallel version of functionName
302 that takes a local sequence as its only argument and calls (in
303 parallel) functionName on each element of that sequence. The
304 `ParallelFunction` object has a `targets` attribute that controls
305 which engines the function is run on.
306
307 :Parameters:
308 targets : int, list or 'all'
309 The engine ids the action will apply to. Call `get_ids` to see
310 a list of currently available engines.
311 functionName : str
312 A Python string that names a callable defined on the engines.
313
314 :Returns: A `ParallelFunction` object.
315 """
316 targets, block = self._findTargetsAndBlock(targets, block)
317 return ParallelFunction(func, self, targets, block)
318
306
319 307 #---------------------------------------------------------------------------
320 308 # Make this a context manager for with
321 309 #---------------------------------------------------------------------------
@@ -415,7 +403,11 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
415 403 engine, run code on it, etc.
416 404 """
417 405
418 implements(IFullBlockingMultiEngineClient)
406 implements(
407 IFullBlockingMultiEngineClient,
408 IMultiEngineMapperFactory,
409 IMapper
410 )
419 411
420 412 def __init__(self, smultiengine):
421 413 self.smultiengine = smultiengine
@@ -772,29 +764,100 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
772 764 # IMultiEngineCoordinator
773 765 #---------------------------------------------------------------------------
774 766
775 def scatter(self, key, seq, style='basic', flatten=False, targets=None, block=None):
767 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
776 768 """
777 769 Partition a Python sequence and send the partitions to a set of engines.
778 770 """
779 771 targets, block = self._findTargetsAndBlock(targets, block)
780 772 return self._blockFromThread(self.smultiengine.scatter, key, seq,
781 style, flatten, targets=targets, block=block)
773 dist, flatten, targets=targets, block=block)
782 774
783 def gather(self, key, style='basic', targets=None, block=None):
775 def gather(self, key, dist='b', targets=None, block=None):
784 776 """
785 777 Gather a partitioned sequence on a set of engines as a single local seq.
786 778 """
787 779 targets, block = self._findTargetsAndBlock(targets, block)
788 return self._blockFromThread(self.smultiengine.gather, key, style,
780 return self._blockFromThread(self.smultiengine.gather, key, dist,
789 781 targets=targets, block=block)
790 782
791 def map(self, func, seq, style='basic', targets=None, block=None):
783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
792 784 """
793 A parallelized version of Python's builtin map
785 A parallelized version of Python's builtin map.
786
787 This has a slightly different syntax than the builtin `map`.
788 This is needed because we need to have keyword arguments and thus
789 can't use *args to capture all the sequences. Instead, they must
790 be passed in a list or tuple.
791
792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793
794 Most users will want to use parallel functions or the `mapper`
795 and `map` methods for an API that follows that of the builtin
796 `map`.
794 797 """
795 798 targets, block = self._findTargetsAndBlock(targets, block)
796 return self._blockFromThread(self.smultiengine.map, func, seq,
797 style, targets=targets, block=block)
799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
800 dist, targets=targets, block=block)
801
802 def map(self, func, *sequences):
803 """
804 A parallel version of Python's builtin `map` function.
805
806 This method applies a function to sequences of arguments. It
807 follows the same syntax as the builtin `map`.
808
809 This method creates a mapper objects by calling `self.mapper` with
810 no arguments and then uses that mapper to do the mapping. See
811 the documentation of `mapper` for more details.
812 """
813 return self.mapper().map(func, *sequences)
814
815 def mapper(self, dist='b', targets='all', block=None):
816 """
817 Create a mapper object that has a `map` method.
818
819 This method returns an object that implements the `IMapper`
820 interface. This method is a factory that is used to control how
821 the map happens.
822
823 :Parameters:
824 dist : str
825 What decomposition to use, 'b' is the only one supported
826 currently
827 targets : str, int, sequence of ints
828 Which engines to use for the map
829 block : boolean
830 Should calls to `map` block or not
831 """
832 return MultiEngineMapper(self, dist, targets, block)
833
834 def parallel(self, dist='b', targets=None, block=None):
835 """
836 A decorator that turns a function into a parallel function.
837
838 This can be used as:
839
840 @parallel()
841 def f(x, y)
842 ...
843
844 f(range(10), range(10))
845
846 This causes f(0,0), f(1,1), ... to be called in parallel.
847
848 :Parameters:
849 dist : str
850 What decomposition to use, 'b' is the only one supported
851 currently
852 targets : str, int, sequence of ints
853 Which engines to use for the map
854 block : boolean
855 Should calls to `map` block or not
856 """
857 targets, block = self._findTargetsAndBlock(targets, block)
858 mapper = self.mapper(dist, targets, block)
859 pf = ParallelFunction(mapper)
860 return pf
798 861
799 862 #---------------------------------------------------------------------------
800 863 # IMultiEngineExtras
@@ -29,6 +29,12 b' from foolscap import Referenceable'
29 29 from IPython.kernel import error
30 30 from IPython.kernel.util import printer
31 31 from IPython.kernel import map as Map
32 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.mapper import (
34 MultiEngineMapper,
35 IMultiEngineMapperFactory,
36 IMapper
37 )
32 38 from IPython.kernel.twistedutil import gatherBoth
33 39 from IPython.kernel.multiengine import (MultiEngine,
34 40 IMultiEngine,
@@ -280,7 +286,12 b' components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,'
280 286
281 287 class FCFullSynchronousMultiEngineClient(object):
282 288
283 implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor)
289 implements(
290 IFullSynchronousMultiEngine,
291 IBlockingClientAdaptor,
292 IMultiEngineMapperFactory,
293 IMapper
294 )
284 295
285 296 def __init__(self, remote_reference):
286 297 self.remote_reference = remote_reference
@@ -475,7 +486,7 b' class FCFullSynchronousMultiEngineClient(object):'
475 486 d.addCallback(create_targets)
476 487 return d
477 488
478 def scatter(self, key, seq, style='basic', flatten=False, targets='all', block=True):
489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
479 490
480 491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
481 492 # This enables us to collect a bunch fo deferred ids and make a secondary
@@ -483,7 +494,7 b' class FCFullSynchronousMultiEngineClient(object):'
483 494 # difficult to get right though.
484 495 def do_scatter(engines):
485 496 nEngines = len(engines)
486 mapClass = Map.styles[style]
497 mapClass = Map.dists[dist]
487 498 mapObject = mapClass()
488 499 d_list = []
489 500 # Loop through and push to each engine in non-blocking mode.
@@ -541,7 +552,7 b' class FCFullSynchronousMultiEngineClient(object):'
541 552 d.addCallback(do_scatter)
542 553 return d
543 554
544 def gather(self, key, style='basic', targets='all', block=True):
555 def gather(self, key, dist='b', targets='all', block=True):
545 556
546 557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
547 558 # This enables us to collect a bunch fo deferred ids and make a secondary
@@ -549,7 +560,7 b' class FCFullSynchronousMultiEngineClient(object):'
549 560 # difficult to get right though.
550 561 def do_gather(engines):
551 562 nEngines = len(engines)
552 mapClass = Map.styles[style]
563 mapClass = Map.dists[dist]
553 564 mapObject = mapClass()
554 565 d_list = []
555 566 # Loop through and push to each engine in non-blocking mode.
@@ -604,25 +615,103 b' class FCFullSynchronousMultiEngineClient(object):'
604 615 d.addCallback(do_gather)
605 616 return d
606 617
607 def map(self, func, seq, style='basic', targets='all', block=True):
608 d_list = []
618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
619 """
620 A parallelized version of Python's builtin map.
621
622 This has a slightly different syntax than the builtin `map`.
623 This is needed because we need to have keyword arguments and thus
624 can't use *args to capture all the sequences. Instead, they must
625 be passed in a list or tuple.
626
627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628
629 Most users will want to use parallel functions or the `mapper`
630 and `map` methods for an API that follows that of the builtin
631 `map`.
632 """
633 if not isinstance(sequences, (list, tuple)):
634 raise TypeError('sequences must be a list or tuple')
635 max_len = max(len(s) for s in sequences)
636 for s in sequences:
637 if len(s)!=max_len:
638 raise ValueError('all sequences must have equal length')
609 639 if isinstance(func, FunctionType):
610 640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
611 641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
612 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)'
642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
613 643 elif isinstance(func, str):
614 644 d = defer.succeed(None)
615 645 sourceToRun = \
616 '_ipython_map_seq_result = map(%s, _ipython_map_seq)' % func
646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
617 647 else:
618 648 raise TypeError("func must be a function or str")
619 649
620 d.addCallback(lambda _: self.scatter('_ipython_map_seq', seq, style, targets=targets))
650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
621 651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
622 652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
623 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', style, targets=targets, block=block))
653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
624 654 return d
625 655
656 def map(self, func, *sequences):
657 """
658 A parallel version of Python's builtin `map` function.
659
660 This method applies a function to sequences of arguments. It
661 follows the same syntax as the builtin `map`.
662
663 This method creates a mapper objects by calling `self.mapper` with
664 no arguments and then uses that mapper to do the mapping. See
665 the documentation of `mapper` for more details.
666 """
667 return self.mapper().map(func, *sequences)
668
669 def mapper(self, dist='b', targets='all', block=True):
670 """
671 Create a mapper object that has a `map` method.
672
673 This method returns an object that implements the `IMapper`
674 interface. This method is a factory that is used to control how
675 the map happens.
676
677 :Parameters:
678 dist : str
679 What decomposition to use, 'b' is the only one supported
680 currently
681 targets : str, int, sequence of ints
682 Which engines to use for the map
683 block : boolean
684 Should calls to `map` block or not
685 """
686 return MultiEngineMapper(self, dist, targets, block)
687
688 def parallel(self, dist='b', targets='all', block=True):
689 """
690 A decorator that turns a function into a parallel function.
691
692 This can be used as:
693
694 @parallel()
695 def f(x, y)
696 ...
697
698 f(range(10), range(10))
699
700 This causes f(0,0), f(1,1), ... to be called in parallel.
701
702 :Parameters:
703 dist : str
704 What decomposition to use, 'b' is the only one supported
705 currently
706 targets : str, int, sequence of ints
707 Which engines to use for the map
708 block : boolean
709 Should calls to `map` block or not
710 """
711 mapper = self.mapper(dist, targets, block)
712 pf = ParallelFunction(mapper)
713 return pf
714
626 715 #---------------------------------------------------------------------------
627 716 # ISynchronousMultiEngineExtras related methods
628 717 #---------------------------------------------------------------------------
@@ -16,17 +16,92 b' __docformat__ = "restructuredtext en"'
16 16 #-------------------------------------------------------------------------------
17 17
18 18 from types import FunctionType
19 from zope.interface import Interface, implements
19 20
20 class ParallelFunction:
21 """A function that operates in parallel on sequences."""
22 def __init__(self, func, multiengine, targets, block):
23 """Create a `ParallelFunction`.
21
22 class IMultiEngineParallelDecorator(Interface):
23 """A decorator that creates a parallel function."""
24
25 def parallel(dist='b', targets=None, block=None):
26 """
27 A decorator that turns a function into a parallel function.
28
29 This can be used as:
30
31 @parallel()
32 def f(x, y)
33 ...
34
35 f(range(10), range(10))
36
37 This causes f(0,0), f(1,1), ... to be called in parallel.
38
39 :Parameters:
40 dist : str
41 What decomposition to use, 'b' is the only one supported
42 currently
43 targets : str, int, sequence of ints
44 Which engines to use for the map
45 block : boolean
46 Should calls to `map` block or not
47 """
48
49 class ITaskParallelDecorator(Interface):
50 """A decorator that creates a parallel function."""
51
52 def parallel(clear_before=False, clear_after=False, retries=0,
53 recovery_task=None, depend=None, block=True):
54 """
55 A decorator that turns a function into a parallel function.
56
57 This can be used as:
58
59 @parallel()
60 def f(x, y)
61 ...
62
63 f(range(10), range(10))
64
65 This causes f(0,0), f(1,1), ... to be called in parallel.
66
67 See the documentation for `IPython.kernel.task.BaseTask` for
68 documentation on the arguments to this method.
69 """
70
71 class IParallelFunction(Interface):
72 pass
73
74 class ParallelFunction(object):
75 """
76 The implementation of a parallel function.
77
78 A parallel function is similar to Python's map function:
79
80 map(func, *sequences) -> pfunc(*sequences)
81
82 Parallel functions should be created by using the @parallel decorator.
83 """
84
85 implements(IParallelFunction)
86
87 def __init__(self, mapper):
88 """
89 Create a parallel function from an `IMapper`.
90
91 :Parameters:
92 mapper : an `IMapper` implementer.
93 The mapper to use for the parallel function
94 """
95 self.mapper = mapper
96
97 def __call__(self, func):
98 """
99 Decorate a function to make it run in parallel.
24 100 """
25 101 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
26 102 self.func = func
27 self.multiengine = multiengine
28 self.targets = targets
29 self.block = block
30
31 def __call__(self, sequence):
32 return self.multiengine.map(self.func, sequence, targets=self.targets, block=self.block) No newline at end of file
103 def call_function(*sequences):
104 return self.mapper.map(self.func, *sequences)
105 return call_function
106
107 No newline at end of file
This diff has been collapsed as it changes many lines, (682 lines changed) Show them Hide them
@@ -5,117 +5,404 b''
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 #-------------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 14
15 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 16 # Imports
17 #-------------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 18
19 19 import copy, time
20 from types import FunctionType as function
20 from types import FunctionType
21 21
22 22 import zope.interface as zi, string
23 23 from twisted.internet import defer, reactor
24 24 from twisted.python import components, log, failure
25 25
26 # from IPython.genutils import time
27
26 from IPython.kernel.util import printer
28 27 from IPython.kernel import engineservice as es, error
29 28 from IPython.kernel import controllerservice as cs
30 29 from IPython.kernel.twistedutil import gatherBoth, DeferredList
31 30
32 from IPython.kernel.pickleutil import can,uncan, CannedFunction
33
34 def canTask(task):
35 t = copy.copy(task)
36 t.depend = can(t.depend)
37 if t.recovery_task:
38 t.recovery_task = canTask(t.recovery_task)
39 return t
31 from IPython.kernel.pickleutil import can, uncan, CannedFunction
40 32
41 def uncanTask(task):
42 t = copy.copy(task)
43 t.depend = uncan(t.depend)
44 if t.recovery_task and t.recovery_task is not task:
45 t.recovery_task = uncanTask(t.recovery_task)
46 return t
33 #-----------------------------------------------------------------------------
34 # Definition of the Task objects
35 #-----------------------------------------------------------------------------
47 36
48 37 time_format = '%Y/%m/%d %H:%M:%S'
49 38
50 class Task(object):
51 r"""Our representation of a task for the `TaskController` interface.
52
53 The user should create instances of this class to represent a task that
54 needs to be done.
55
56 :Parameters:
57 expression : str
58 A str that is valid python code that is the task.
59 pull : str or list of str
60 The names of objects to be pulled as results. If not specified,
61 will return {'result', None}
62 push : dict
63 A dict of objects to be pushed into the engines namespace before
64 execution of the expression.
65 clear_before : boolean
66 Should the engine's namespace be cleared before the task is run.
67 Default=False.
68 clear_after : boolean
69 Should the engine's namespace be cleared after the task is run.
70 Default=False.
71 retries : int
72 The number of times to resumbit the task if it fails. Default=0.
73 recovery_task : Task
74 This is the Task to be run when the task has exhausted its retries
75 Default=None.
76 depend : bool function(properties)
77 This is the dependency function for the Task, which determines
78 whether a task can be run on a Worker. `depend` is called with
79 one argument, the worker's properties dict, and should return
80 True if the worker meets the dependencies or False if it does
81 not.
82 Default=None - run on any worker
83 options : dict
84 Any other keyword options for more elaborate uses of tasks
85
86 Examples
87 --------
39 class ITask(zi.Interface):
40 """
41 This interface provides a generic definition of what constitutes a task.
42
43 There are two sides to a task. First a task needs to take input from
44 a user to determine what work is performed by the task. Second, the
45 task needs to have the logic that knows how to turn that information
46 info specific calls to a worker, through the `IQueuedEngine` interface.
47
48 Many method in this class get two things passed to them: a Deferred
49 and an IQueuedEngine implementer. Such methods should register callbacks
50 on the Deferred that use the IQueuedEngine to accomplish something. See
51 the existing task objects for examples.
52 """
53
54 zi.Attribute('retries','How many times to retry the task')
55 zi.Attribute('recovery_task','A task to try if the initial one fails')
56 zi.Attribute('taskid','the id of the task')
57
58 def start_time(result):
59 """
60 Do anything needed to start the timing of the task.
61
62 Must simply return the result after starting the timers.
63 """
88 64
89 >>> t = Task('dostuff(args)')
90 >>> t = Task('a=5', pull='a')
91 >>> t = Task('a=5\nb=4', pull=['a','b'])
92 >>> t = Task('os.kill(os.getpid(),9)', retries=100) # this is a bad idea
65 def stop_time(result):
66 """
67 Do anything needed to stop the timing of the task.
68
69 Must simply return the result after stopping the timers. This
70 method will usually set attributes that are used by `process_result`
71 in building result of the task.
72 """
73
74 def pre_task(d, queued_engine):
75 """Do something with the queued_engine before the task is run.
76
77 This method should simply add callbacks to the input Deferred
78 that do something with the `queued_engine` before the task is run.
79
80 :Parameters:
81 d : Deferred
82 The deferred that actions should be attached to
83 queued_engine : IQueuedEngine implementer
84 The worker that has been allocated to perform the task
85 """
86
87 def post_task(d, queued_engine):
88 """Do something with the queued_engine after the task is run.
89
90 This method should simply add callbacks to the input Deferred
91 that do something with the `queued_engine` before the task is run.
92
93 :Parameters:
94 d : Deferred
95 The deferred that actions should be attached to
96 queued_engine : IQueuedEngine implementer
97 The worker that has been allocated to perform the task
98 """
99
100 def submit_task(d, queued_engine):
101 """Submit a task using the `queued_engine` we have been allocated.
102
103 When a task is ready to run, this method is called. This method
104 must take the internal information of the task and make suitable
105 calls on the queued_engine to have the actual work done.
106
107 This method should simply add callbacks to the input Deferred
108 that do something with the `queued_engine` before the task is run.
109
110 :Parameters:
111 d : Deferred
112 The deferred that actions should be attached to
113 queued_engine : IQueuedEngine implementer
114 The worker that has been allocated to perform the task
115 """
93 116
94 A dependency case:
95 >>> def hasMPI(props):
96 ... return props.get('mpi') is not None
97 >>> t = Task('mpi.send(blah,blah)', depend = hasMPI)
117 def process_result(d, result, engine_id):
118 """Take a raw task result.
119
120 Objects that implement `ITask` can choose how the result of running
121 the task is presented. This method takes the raw result and
122 does this logic. Two example are the `MapTask` which simply returns
123 the raw result or a `Failure` object and the `StringTask` which
124 returns a `TaskResult` object.
125
126 :Parameters:
127 d : Deferred
128 The deferred that actions should be attached to
129 result : object
130 The raw task result that needs to be wrapped
131 engine_id : int
132 The id of the engine that did the task
133
134 :Returns:
135 The result, as a tuple of the form: (success, result).
136 Here, success is a boolean indicating if the task
137 succeeded or failed and result is the result.
138 """
139
140 def check_depend(properties):
141 """Check properties to see if the task should be run.
142
143 :Parameters:
144 properties : dict
145 A dictionary of properties that an engine has set
146
147 :Returns:
148 True if the task should be run, False otherwise
149 """
150
151 def can_task(self):
152 """Serialize (can) any functions in the task for pickling.
153
154 Subclasses must override this method and make sure that all
155 functions in the task are canned by calling `can` on the
156 function.
157 """
158
159 def uncan_task(self):
160 """Unserialize (uncan) any canned function in the task."""
161
162 class BaseTask(object):
163 """
164 Common fuctionality for all objects implementing `ITask`.
98 165 """
99 166
100 def __init__(self, expression, pull=None, push=None,
101 clear_before=False, clear_after=False, retries=0,
102 recovery_task=None, depend=None, **options):
103 self.expression = expression
104 if isinstance(pull, str):
105 self.pull = [pull]
106 else:
107 self.pull = pull
108 self.push = push
167 zi.implements(ITask)
168
169 def __init__(self, clear_before=False, clear_after=False, retries=0,
170 recovery_task=None, depend=None):
171 """
172 Make a generic task.
173
174 :Parameters:
175 clear_before : boolean
176 Should the engines namespace be cleared before the task
177 is run
178 clear_after : boolean
179 Should the engines namespace be clear after the task is run
180 retries : int
181 The number of times a task should be retries upon failure
182 recovery_task : any task object
183 If a task fails and it has a recovery_task, that is run
184 upon a retry
185 depend : FunctionType
186 A function that is called to test for properties. This function
187 must take one argument, the properties dict and return a boolean
188 """
109 189 self.clear_before = clear_before
110 190 self.clear_after = clear_after
111 self.retries=retries
191 self.retries = retries
112 192 self.recovery_task = recovery_task
113 193 self.depend = depend
114 self.options = options
115 194 self.taskid = None
195
196 def start_time(self, result):
197 """
198 Start the basic timers.
199 """
200 self.start = time.time()
201 self.start_struct = time.localtime()
202 return result
203
204 def stop_time(self, result):
205 """
206 Stop the basic timers.
207 """
208 self.stop = time.time()
209 self.stop_struct = time.localtime()
210 self.duration = self.stop - self.start
211 self.submitted = time.strftime(time_format, self.start_struct)
212 self.completed = time.strftime(time_format)
213 return result
214
215 def pre_task(self, d, queued_engine):
216 """
217 Clear the engine before running the task if clear_before is set.
218 """
219 if self.clear_before:
220 d.addCallback(lambda r: queued_engine.reset())
221
222 def post_task(self, d, queued_engine):
223 """
224 Clear the engine after running the task if clear_after is set.
225 """
226 def reseter(result):
227 queued_engine.reset()
228 return result
229 if self.clear_after:
230 d.addBoth(reseter)
231
232 def submit_task(self, d, queued_engine):
233 raise NotImplementedError('submit_task must be implemented in a subclass')
234
235 def process_result(self, result, engine_id):
236 """
237 Process a task result.
238
239 This is the default `process_result` that just returns the raw
240 result or a `Failure`.
241 """
242 if isinstance(result, failure.Failure):
243 return (False, result)
244 else:
245 return (True, result)
246
247 def check_depend(self, properties):
248 """
249 Calls self.depend(properties) to see if a task should be run.
250 """
251 if self.depend is not None:
252 return self.depend(properties)
253 else:
254 return True
255
256 def can_task(self):
257 self.depend = can(self.depend)
258 if isinstance(self.recovery_task, BaseTask):
259 self.recovery_task.can_task()
260
261 def uncan_task(self):
262 self.depend = uncan(self.depend)
263 if isinstance(self.recovery_task, BaseTask):
264 self.recovery_task.uncan_task()
265
266 class MapTask(BaseTask):
267 """
268 A task that consists of a function and arguments.
269 """
270
271 zi.implements(ITask)
272
273 def __init__(self, function, args=None, kwargs=None, clear_before=False,
274 clear_after=False, retries=0, recovery_task=None, depend=None):
275 """
276 Create a task based on a function, args and kwargs.
277
278 This is a simple type of task that consists of calling:
279 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
280
281 The return value of the function, or a `Failure` wrapping an
282 exception is the task result for this type of task.
283 """
284 BaseTask.__init__(self, clear_before, clear_after, retries,
285 recovery_task, depend)
286 if not isinstance(function, FunctionType):
287 raise TypeError('a task function must be a FunctionType')
288 self.function = function
289 if args is None:
290 self.args = ()
291 else:
292 self.args = args
293 if not isinstance(self.args, (list, tuple)):
294 raise TypeError('a task args must be a list or tuple')
295 if kwargs is None:
296 self.kwargs = {}
297 else:
298 self.kwargs = kwargs
299 if not isinstance(self.kwargs, dict):
300 raise TypeError('a task kwargs must be a dict')
301
302 def submit_task(self, d, queued_engine):
303 d.addCallback(lambda r: queued_engine.push_function(
304 dict(_ipython_task_function=self.function))
305 )
306 d.addCallback(lambda r: queued_engine.push(
307 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
308 )
309 d.addCallback(lambda r: queued_engine.execute(
310 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
311 )
312 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
313
314 def can_task(self):
315 self.function = can(self.function)
316 BaseTask.can_task(self)
317
318 def uncan_task(self):
319 self.function = uncan(self.function)
320 BaseTask.uncan_task(self)
321
322
323 class StringTask(BaseTask):
324 """
325 A task that consists of a string of Python code to run.
326 """
327
328 def __init__(self, expression, pull=None, push=None,
329 clear_before=False, clear_after=False, retries=0,
330 recovery_task=None, depend=None):
331 """
332 Create a task based on a Python expression and variables
333
334 This type of task lets you push a set of variables to the engines
335 namespace, run a Python string in that namespace and then bring back
336 a different set of Python variables as the result.
337
338 Because this type of task can return many results (through the
339 `pull` keyword argument) it returns a special `TaskResult` object
340 that wraps the pulled variables, statistics about the run and
341 any exceptions raised.
342 """
343 if not isinstance(expression, str):
344 raise TypeError('a task expression must be a string')
345 self.expression = expression
346
347 if pull==None:
348 self.pull = ()
349 elif isinstance(pull, str):
350 self.pull = (pull,)
351 elif isinstance(pull, (list, tuple)):
352 self.pull = pull
353 else:
354 raise TypeError('pull must be str or a sequence of strs')
355
356 if push==None:
357 self.push = {}
358 elif isinstance(push, dict):
359 self.push = push
360 else:
361 raise TypeError('push must be a dict')
362
363 BaseTask.__init__(self, clear_before, clear_after, retries,
364 recovery_task, depend)
116 365
117 class ResultNS:
118 """The result namespace object for use in TaskResult objects as tr.ns.
366 def submit_task(self, d, queued_engine):
367 if self.push is not None:
368 d.addCallback(lambda r: queued_engine.push(self.push))
369
370 d.addCallback(lambda r: queued_engine.execute(self.expression))
371
372 if self.pull is not None:
373 d.addCallback(lambda r: queued_engine.pull(self.pull))
374 else:
375 d.addCallback(lambda r: None)
376
377 def process_result(self, result, engine_id):
378 if isinstance(result, failure.Failure):
379 tr = TaskResult(result, engine_id)
380 else:
381 if self.pull is None:
382 resultDict = {}
383 elif len(self.pull) == 1:
384 resultDict = {self.pull[0]:result}
385 else:
386 resultDict = dict(zip(self.pull, result))
387 tr = TaskResult(resultDict, engine_id)
388 # Assign task attributes
389 tr.submitted = self.submitted
390 tr.completed = self.completed
391 tr.duration = self.duration
392 if hasattr(self,'taskid'):
393 tr.taskid = self.taskid
394 else:
395 tr.taskid = None
396 if isinstance(result, failure.Failure):
397 return (False, tr)
398 else:
399 return (True, tr)
400
401 class ResultNS(object):
402 """
403 A dict like object for holding the results of a task.
404
405 The result namespace object for use in `TaskResult` objects as tr.ns.
119 406 It builds an object from a dictionary, such that it has attributes
120 407 according to the key,value pairs of the dictionary.
121 408
@@ -128,15 +415,12 b' class ResultNS:'
128 415 --------
129 416
130 417 >>> ns = ResultNS({'a':17,'foo':range(3)})
131
132 418 >>> print ns
133 NS{'a': 17, 'foo': [0, 1, 2]}
134
419 NS{'a':17,'foo':range(3)}
135 420 >>> ns.a
136 17
137
421 17
138 422 >>> ns['foo']
139 [0, 1, 2]
423 [0,1,2]
140 424 """
141 425 def __init__(self, dikt):
142 426 for k,v in dikt.iteritems():
@@ -156,7 +440,7 b' class ResultNS:'
156 440
157 441 class TaskResult(object):
158 442 """
159 An object for returning task results.
443 An object for returning task results for certain types of tasks.
160 444
161 445 This object encapsulates the results of a task. On task
162 446 success it will have a keys attribute that will have a list
@@ -166,21 +450,21 b' class TaskResult(object):'
166 450
167 451 In task failure, keys will be empty, but failure will contain
168 452 the failure object that encapsulates the remote exception.
169 One can also simply call the raiseException() method of
453 One can also simply call the `raise_exception` method of
170 454 this class to re-raise any remote exception in the local
171 455 session.
172 456
173 The TaskResult has a .ns member, which is a property for access
457 The `TaskResult` has a `.ns` member, which is a property for access
174 458 to the results. If the Task had pull=['a', 'b'], then the
175 Task Result will have attributes tr.ns.a, tr.ns.b for those values.
176 Accessing tr.ns will raise the remote failure if the task failed.
459 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
460 Accessing `tr.ns` will raise the remote failure if the task failed.
177 461
178 The engineid attribute should have the engineid of the engine
179 that ran the task. But, because engines can come and go in
180 the ipython task system, the engineid may not continue to be
462 The `engineid` attribute should have the `engineid` of the engine
463 that ran the task. But, because engines can come and go,
464 the `engineid` may not continue to be
181 465 valid or accurate.
182 466
183 The taskid attribute simply gives the taskid that the task
467 The `taskid` attribute simply gives the `taskid` that the task
184 468 is tracked under.
185 469 """
186 470 taskid = None
@@ -192,7 +476,7 b' class TaskResult(object):'
192 476 return self._ns
193 477
194 478 def _setNS(self, v):
195 raise Exception("I am protected!")
479 raise Exception("the ns attribute cannot be changed")
196 480
197 481 ns = property(_getNS, _setNS)
198 482
@@ -218,15 +502,19 b' class TaskResult(object):'
218 502
219 503 def __getitem__(self, key):
220 504 if self.failure is not None:
221 self.raiseException()
505 self.raise_exception()
222 506 return self.results[key]
223 507
224 def raiseException(self):
508 def raise_exception(self):
225 509 """Re-raise any remote exceptions in the local python session."""
226 510 if self.failure is not None:
227 511 self.failure.raiseException()
228 512
229 513
514 #-----------------------------------------------------------------------------
515 # The controller side of things
516 #-----------------------------------------------------------------------------
517
230 518 class IWorker(zi.Interface):
231 519 """The Basic Worker Interface.
232 520
@@ -241,12 +529,15 b' class IWorker(zi.Interface):'
241 529 :Parameters:
242 530 task : a `Task` object
243 531
244 :Returns: `Deferred` to a `TaskResult` object.
532 :Returns: `Deferred` to a tuple of (success, result) where
533 success if a boolean that signifies success or failure
534 and result is the task result.
245 535 """
246 536
247 537
248 538 class WorkerFromQueuedEngine(object):
249 539 """Adapt an `IQueuedEngine` to an `IWorker` object"""
540
250 541 zi.implements(IWorker)
251 542
252 543 def __init__(self, qe):
@@ -261,53 +552,27 b' class WorkerFromQueuedEngine(object):'
261 552 def run(self, task):
262 553 """Run task in worker's namespace.
263 554
555 This takes a task and calls methods on the task that actually
556 cause `self.queuedEngine` to do the task. See the methods of
557 `ITask` for more information about how these methods are called.
558
264 559 :Parameters:
265 560 task : a `Task` object
266 561
267 :Returns: `Deferred` to a `TaskResult` object.
562 :Returns: `Deferred` to a tuple of (success, result) where
563 success if a boolean that signifies success or failure
564 and result is the task result.
268 565 """
269 if task.clear_before:
270 d = self.queuedEngine.reset()
271 else:
272 d = defer.succeed(None)
273
274 if task.push is not None:
275 d.addCallback(lambda r: self.queuedEngine.push(task.push))
276
277 d.addCallback(lambda r: self.queuedEngine.execute(task.expression))
278
279 if task.pull is not None:
280 d.addCallback(lambda r: self.queuedEngine.pull(task.pull))
281 else:
282 d.addCallback(lambda r: None)
283
284 def reseter(result):
285 self.queuedEngine.reset()
286 return result
287
288 if task.clear_after:
289 d.addBoth(reseter)
290
291 return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime())
292
293 def _zipResults(self, result, names, start, start_struct):
294 """Callback for construting the TaskResult object."""
295 if isinstance(result, failure.Failure):
296 tr = TaskResult(result, self.queuedEngine.id)
297 else:
298 if names is None:
299 resultDict = {}
300 elif len(names) == 1:
301 resultDict = {names[0]:result}
302 else:
303 resultDict = dict(zip(names, result))
304 tr = TaskResult(resultDict, self.queuedEngine.id)
305 # the time info
306 tr.submitted = time.strftime(time_format, start_struct)
307 tr.completed = time.strftime(time_format)
308 tr.duration = time.time()-start
309 return tr
310
566 d = defer.succeed(None)
567 d.addCallback(task.start_time)
568 task.pre_task(d, self.queuedEngine)
569 task.submit_task(d, self.queuedEngine)
570 task.post_task(d, self.queuedEngine)
571 d.addBoth(task.stop_time)
572 d.addBoth(task.process_result, self.queuedEngine.id)
573 # At this point, there will be (success, result) coming down the line
574 return d
575
311 576
312 577 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
313 578
@@ -323,14 +588,14 b' class IScheduler(zi.Interface):'
323 588 """Add a task to the queue of the Scheduler.
324 589
325 590 :Parameters:
326 task : a `Task` object
591 task : an `ITask` implementer
327 592 The task to be queued.
328 593 flags : dict
329 594 General keywords for more sophisticated scheduling
330 595 """
331 596
332 597 def pop_task(id=None):
333 """Pops a Task object.
598 """Pops a task object from the queue.
334 599
335 600 This gets the next task to be run. If no `id` is requested, the highest priority
336 601 task is returned.
@@ -340,7 +605,7 b' class IScheduler(zi.Interface):'
340 605 The id of the task to be popped. The default (None) is to return
341 606 the highest priority task.
342 607
343 :Returns: a `Task` object
608 :Returns: an `ITask` implementer
344 609
345 610 :Exceptions:
346 611 IndexError : raised if no taskid in queue
@@ -350,8 +615,9 b' class IScheduler(zi.Interface):'
350 615 """Add a worker to the worker queue.
351 616
352 617 :Parameters:
353 worker : an IWorker implementing object
354 flags : General keywords for more sophisticated scheduling
618 worker : an `IWorker` implementer
619 flags : dict
620 General keywords for more sophisticated scheduling
355 621 """
356 622
357 623 def pop_worker(id=None):
@@ -374,15 +640,15 b' class IScheduler(zi.Interface):'
374 640 """Returns True if there is something to do, False otherwise"""
375 641
376 642 def schedule():
377 """Returns a tuple of the worker and task pair for the next
378 task to be run.
379 """
643 """Returns (worker,task) pair for the next task to be run."""
380 644
381 645
382 646 class FIFOScheduler(object):
383 """A basic First-In-First-Out (Queue) Scheduler.
384 This is the default Scheduler for the TaskController.
385 See the docstrings for IScheduler for interface details.
647 """
648 A basic First-In-First-Out (Queue) Scheduler.
649
650 This is the default Scheduler for the `TaskController`.
651 See the docstrings for `IScheduler` for interface details.
386 652 """
387 653
388 654 zi.implements(IScheduler)
@@ -439,7 +705,9 b' class FIFOScheduler(object):'
439 705 for t in self.tasks:
440 706 for w in self.workers:
441 707 try:# do not allow exceptions to break this
442 cando = t.depend is None or t.depend(w.properties)
708 # Allow the task to check itself using its
709 # check_depend method.
710 cando = t.check_depend(w.properties)
443 711 except:
444 712 cando = False
445 713 if cando:
@@ -449,9 +717,12 b' class FIFOScheduler(object):'
449 717
450 718
451 719 class LIFOScheduler(FIFOScheduler):
452 """A Last-In-First-Out (Stack) Scheduler. This scheduler should naively
453 reward fast engines by giving them more jobs. This risks starvation, but
454 only in cases with low load, where starvation does not really matter.
720 """
721 A Last-In-First-Out (Stack) Scheduler.
722
723 This scheduler should naively reward fast engines by giving
724 them more jobs. This risks starvation, but only in cases with
725 low load, where starvation does not really matter.
455 726 """
456 727
457 728 def add_task(self, task, **flags):
@@ -466,13 +737,15 b' class LIFOScheduler(FIFOScheduler):'
466 737
467 738
468 739 class ITaskController(cs.IControllerBase):
469 """The Task based interface to a `ControllerService` object
740 """
741 The Task based interface to a `ControllerService` object
470 742
471 743 This adapts a `ControllerService` to the ITaskController interface.
472 744 """
473 745
474 746 def run(task):
475 """Run a task.
747 """
748 Run a task.
476 749
477 750 :Parameters:
478 751 task : an IPython `Task` object
@@ -481,13 +754,14 b' class ITaskController(cs.IControllerBase):'
481 754 """
482 755
483 756 def get_task_result(taskid, block=False):
484 """Get the result of a task by its ID.
757 """
758 Get the result of a task by its ID.
485 759
486 760 :Parameters:
487 761 taskid : int
488 762 the id of the task whose result is requested
489 763
490 :Returns: `Deferred` to (taskid, actualResult) if the task is done, and None
764 :Returns: `Deferred` to the task result if the task is done, and None
491 765 if not.
492 766
493 767 :Exceptions:
@@ -512,23 +786,35 b' class ITaskController(cs.IControllerBase):'
512 786 """
513 787
514 788 def barrier(taskids):
515 """Block until the list of taskids are completed.
789 """
790 Block until the list of taskids are completed.
516 791
517 792 Returns None on success.
518 793 """
519 794
520 795 def spin():
521 """touch the scheduler, to resume scheduling without submitting
522 a task.
796 """
797 Touch the scheduler, to resume scheduling without submitting a task.
523 798 """
524 799
525 def queue_status(self, verbose=False):
526 """Get a dictionary with the current state of the task queue.
800 def queue_status(verbose=False):
801 """
802 Get a dictionary with the current state of the task queue.
527 803
528 804 If verbose is True, then return lists of taskids, otherwise,
529 805 return the number of tasks with each status.
530 806 """
531 807
808 def clear():
809 """
810 Clear all previously run tasks from the task controller.
811
812 This is needed because the task controller keep all task results
813 in memory. This can be a problem is there are many completed
814 tasks. Users should call this periodically to clean out these
815 cached task results.
816 """
817
532 818
533 819 class TaskController(cs.ControllerAdapterBase):
534 820 """The Task based interface to a Controller object.
@@ -565,7 +851,7 b' class TaskController(cs.ControllerAdapterBase):'
565 851 def registerWorker(self, id):
566 852 """Called by controller.register_engine."""
567 853 if self.workers.get(id):
568 raise "We already have one! This should not happen."
854 raise ValueError("worker with id %s already exists. This should not happen." % id)
569 855 self.workers[id] = IWorker(self.controller.engines[id])
570 856 self.workers[id].workerid = id
571 857 if not self.pendingTasks.has_key(id):# if not working
@@ -590,21 +876,25 b' class TaskController(cs.ControllerAdapterBase):'
590 876 #---------------------------------------------------------------------------
591 877
592 878 def run(self, task):
593 """Run a task and return `Deferred` to its taskid."""
879 """
880 Run a task and return `Deferred` to its taskid.
881 """
594 882 task.taskid = self.taskid
595 883 task.start = time.localtime()
596 884 self.taskid += 1
597 885 d = defer.Deferred()
598 886 self.scheduler.add_task(task)
599 # log.msg('Queuing task: %i' % task.taskid)
887 log.msg('Queuing task: %i' % task.taskid)
600 888
601 889 self.deferredResults[task.taskid] = []
602 890 self.distributeTasks()
603 891 return defer.succeed(task.taskid)
604 892
605 893 def get_task_result(self, taskid, block=False):
606 """Returns a `Deferred` to a TaskResult tuple or None."""
607 # log.msg("Getting task result: %i" % taskid)
894 """
895 Returns a `Deferred` to the task result, or None.
896 """
897 log.msg("Getting task result: %i" % taskid)
608 898 if self.finishedResults.has_key(taskid):
609 899 tr = self.finishedResults[taskid]
610 900 return defer.succeed(tr)
@@ -619,7 +909,9 b' class TaskController(cs.ControllerAdapterBase):'
619 909 return defer.fail(IndexError("task ID not registered: %r" % taskid))
620 910
621 911 def abort(self, taskid):
622 """Remove a task from the queue if it has not been run already."""
912 """
913 Remove a task from the queue if it has not been run already.
914 """
623 915 if not isinstance(taskid, int):
624 916 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
625 917 try:
@@ -678,8 +970,10 b' class TaskController(cs.ControllerAdapterBase):'
678 970 #---------------------------------------------------------------------------
679 971
680 972 def _doAbort(self, taskid):
681 """Helper function for aborting a pending task."""
682 # log.msg("Task aborted: %i" % taskid)
973 """
974 Helper function for aborting a pending task.
975 """
976 log.msg("Task aborted: %i" % taskid)
683 977 result = failure.Failure(error.TaskAborted())
684 978 self._finishTask(taskid, result)
685 979 if taskid in self.abortPending:
@@ -687,14 +981,16 b' class TaskController(cs.ControllerAdapterBase):'
687 981
688 982 def _finishTask(self, taskid, result):
689 983 dlist = self.deferredResults.pop(taskid)
690 result.taskid = taskid # The TaskResult should save the taskid
984 # result.taskid = taskid # The TaskResult should save the taskid
691 985 self.finishedResults[taskid] = result
692 986 for d in dlist:
693 987 d.callback(result)
694 988
695 989 def distributeTasks(self):
696 """Distribute tasks while self.scheduler has things to do."""
697 # log.msg("distributing Tasks")
990 """
991 Distribute tasks while self.scheduler has things to do.
992 """
993 log.msg("distributing Tasks")
698 994 worker, task = self.scheduler.schedule()
699 995 if not worker and not task:
700 996 if self.idleLater and self.idleLater.called:# we are inside failIdle
@@ -709,7 +1005,7 b' class TaskController(cs.ControllerAdapterBase):'
709 1005 self.pendingTasks[worker.workerid] = task
710 1006 # run/link callbacks
711 1007 d = worker.run(task)
712 # log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1008 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
713 1009 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
714 1010 worker, task = self.scheduler.schedule()
715 1011 # check for idle timeout:
@@ -731,14 +1027,15 b' class TaskController(cs.ControllerAdapterBase):'
731 1027 t = self.scheduler.pop_task()
732 1028 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
733 1029 msg += " for %i seconds"%self.timeout
734 # log.msg("Task aborted by timeout: %i" % t.taskid)
1030 log.msg("Task aborted by timeout: %i" % t.taskid)
735 1031 f = failure.Failure(error.TaskTimeout(msg))
736 1032 self._finishTask(t.taskid, f)
737 1033 self.idleLater = None
738 1034
739 1035
740 def taskCompleted(self, result, taskid, workerid):
1036 def taskCompleted(self, success_and_result, taskid, workerid):
741 1037 """This is the err/callback for a completed task."""
1038 success, result = success_and_result
742 1039 try:
743 1040 task = self.pendingTasks.pop(workerid)
744 1041 except:
@@ -755,7 +1052,7 b' class TaskController(cs.ControllerAdapterBase):'
755 1052 aborted = True
756 1053
757 1054 if not aborted:
758 if result.failure is not None and isinstance(result.failure, failure.Failure): # we failed
1055 if not success:
759 1056 log.msg("Task %i failed on worker %i"% (taskid, workerid))
760 1057 if task.retries > 0: # resubmit
761 1058 task.retries -= 1
@@ -763,7 +1060,7 b' class TaskController(cs.ControllerAdapterBase):'
763 1060 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
764 1061 log.msg(s)
765 1062 self.distributeTasks()
766 elif isinstance(task.recovery_task, Task) and \
1063 elif isinstance(task.recovery_task, BaseTask) and \
767 1064 task.recovery_task.retries > -1:
768 1065 # retries = -1 is to prevent infinite recovery_task loop
769 1066 task.retries = -1
@@ -779,17 +1076,18 b' class TaskController(cs.ControllerAdapterBase):'
779 1076 # it may have died, and not yet been unregistered
780 1077 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
781 1078 else: # we succeeded
782 # log.msg("Task completed: %i"% taskid)
1079 log.msg("Task completed: %i"% taskid)
783 1080 self._finishTask(taskid, result)
784 1081 self.readmitWorker(workerid)
785 else:# we aborted the task
786 if result.failure is not None and isinstance(result.failure, failure.Failure): # it failed, penalize worker
1082 else: # we aborted the task
1083 if not success:
787 1084 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
788 1085 else:
789 1086 self.readmitWorker(workerid)
790 1087
791 1088 def readmitWorker(self, workerid):
792 """Readmit a worker to the scheduler.
1089 """
1090 Readmit a worker to the scheduler.
793 1091
794 1092 This is outside `taskCompleted` because of the `failurePenalty` being
795 1093 implemented through `reactor.callLater`.
@@ -798,6 +1096,18 b' class TaskController(cs.ControllerAdapterBase):'
798 1096 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
799 1097 self.scheduler.add_worker(self.workers[workerid])
800 1098 self.distributeTasks()
1099
1100 def clear(self):
1101 """
1102 Clear all previously run tasks from the task controller.
1103
1104 This is needed because the task controller keep all task results
1105 in memory. This can be a problem is there are many completed
1106 tasks. Users should call this periodically to clean out these
1107 cached task results.
1108 """
1109 self.finishedResults = {}
1110 return defer.succeed(None)
801 1111
802 1112
803 1113 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
@@ -1,9 +1,8 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3 3
4 """The Generic Task Client object.
5
6 This must be subclassed based on your connection method.
4 """
5 A blocking version of the task client.
7 6 """
8 7
9 8 __docformat__ = "restructuredtext en"
@@ -24,119 +23,100 b' from twisted.python import components, log'
24 23
25 24 from IPython.kernel.twistedutil import blockingCallFromThread
26 25 from IPython.kernel import task, error
26 from IPython.kernel.mapper import (
27 SynchronousTaskMapper,
28 ITaskMapperFactory,
29 IMapper
30 )
31 from IPython.kernel.parallelfunction import (
32 ParallelFunction,
33 ITaskParallelDecorator
34 )
27 35
28 36 #-------------------------------------------------------------------------------
29 # Connecting Task Client
37 # The task client
30 38 #-------------------------------------------------------------------------------
31 39
32 class InteractiveTaskClient(object):
33
34 def irun(self, *args, **kwargs):
35 """Run a task on the `TaskController`.
36
37 This method is a shorthand for run(task) and its arguments are simply
38 passed onto a `Task` object:
39
40 irun(*args, **kwargs) -> run(Task(*args, **kwargs))
41
42 :Parameters:
43 expression : str
44 A str that is valid python code that is the task.
45 pull : str or list of str
46 The names of objects to be pulled as results.
47 push : dict
48 A dict of objects to be pushed into the engines namespace before
49 execution of the expression.
50 clear_before : boolean
51 Should the engine's namespace be cleared before the task is run.
52 Default=False.
53 clear_after : boolean
54 Should the engine's namespace be cleared after the task is run.
55 Default=False.
56 retries : int
57 The number of times to resumbit the task if it fails. Default=0.
58 options : dict
59 Any other keyword options for more elaborate uses of tasks
60
61 :Returns: A `TaskResult` object.
62 """
63 block = kwargs.pop('block', False)
64 if len(args) == 1 and isinstance(args[0], task.Task):
65 t = args[0]
66 else:
67 t = task.Task(*args, **kwargs)
68 taskid = self.run(t)
69 print "TaskID = %i"%taskid
70 if block:
71 return self.get_task_result(taskid, block)
72 else:
73 return taskid
74
75 40 class IBlockingTaskClient(Interface):
76 41 """
77 An interface for blocking task clients.
42 A vague interface of the blocking task client
78 43 """
79 44 pass
80 45
81
82 class BlockingTaskClient(InteractiveTaskClient):
46 class BlockingTaskClient(object):
83 47 """
84 This class provides a blocking task client.
48 A blocking task client that adapts a non-blocking one.
85 49 """
86 50
87 implements(IBlockingTaskClient)
51 implements(
52 IBlockingTaskClient,
53 ITaskMapperFactory,
54 IMapper,
55 ITaskParallelDecorator
56 )
88 57
89 58 def __init__(self, task_controller):
90 59 self.task_controller = task_controller
91 60 self.block = True
92 61
93 def run(self, task):
94 """
95 Run a task and return a task id that can be used to get the task result.
62 def run(self, task, block=False):
63 """Run a task on the `TaskController`.
64
65 See the documentation of the `MapTask` and `StringTask` classes for
66 details on how to build a task of different types.
96 67
97 68 :Parameters:
98 task : `Task`
99 The `Task` object to run
69 task : an `ITask` implementer
70
71 :Returns: The int taskid of the submitted task. Pass this to
72 `get_task_result` to get the `TaskResult` object.
100 73 """
101 return blockingCallFromThread(self.task_controller.run, task)
74 tid = blockingCallFromThread(self.task_controller.run, task)
75 if block:
76 return self.get_task_result(tid, block=True)
77 else:
78 return tid
102 79
103 80 def get_task_result(self, taskid, block=False):
104 81 """
105 Get or poll for a task result.
82 Get a task result by taskid.
106 83
107 84 :Parameters:
108 85 taskid : int
109 The id of the task whose result to get
86 The taskid of the task to be retrieved.
110 87 block : boolean
111 If True, wait until the task is done and then result the
112 `TaskResult` object. If False, just poll for the result and
113 return None if the task is not done.
88 Should I block until the task is done?
89
90 :Returns: A `TaskResult` object that encapsulates the task result.
114 91 """
115 92 return blockingCallFromThread(self.task_controller.get_task_result,
116 93 taskid, block)
117 94
118 95 def abort(self, taskid):
119 96 """
120 Abort a task by task id if it has not been started.
97 Abort a task by taskid.
98
99 :Parameters:
100 taskid : int
101 The taskid of the task to be aborted.
121 102 """
122 103 return blockingCallFromThread(self.task_controller.abort, taskid)
123 104
124 105 def barrier(self, taskids):
125 """
126 Wait for a set of tasks to finish.
106 """Block until a set of tasks are completed.
127 107
128 108 :Parameters:
129 taskids : list of ints
130 A list of task ids to wait for.
109 taskids : list, tuple
110 A sequence of taskids to block on.
131 111 """
132 112 return blockingCallFromThread(self.task_controller.barrier, taskids)
133 113
134 114 def spin(self):
135 115 """
136 Cause the scheduler to schedule tasks.
116 Touch the scheduler, to resume scheduling without submitting a task.
137 117
138 118 This method only needs to be called in unusual situations where the
139 scheduler is idle for some reason.
119 scheduler is idle for some reason.
140 120 """
141 121 return blockingCallFromThread(self.task_controller.spin)
142 122
@@ -153,7 +133,46 b' class BlockingTaskClient(InteractiveTaskClient):'
153 133 A dict with the queue status.
154 134 """
155 135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
136
137 def clear(self):
138 """
139 Clear all previously run tasks from the task controller.
140
141 This is needed because the task controller keep all task results
142 in memory. This can be a problem is there are many completed
143 tasks. Users should call this periodically to clean out these
144 cached task results.
145 """
146 return blockingCallFromThread(self.task_controller.clear)
147
148 def map(self, func, *sequences):
149 """
150 Apply func to *sequences elementwise. Like Python's builtin map.
151
152 This version is load balanced.
153 """
154 return self.mapper().map(func, *sequences)
156 155
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
157 recovery_task=None, depend=None, block=True):
158 """
159 Create an `IMapper` implementer with a given set of arguments.
160
161 The `IMapper` created using a task controller is load balanced.
162
163 See the documentation for `IPython.kernel.task.BaseTask` for
164 documentation on the arguments to this method.
165 """
166 return SynchronousTaskMapper(self, clear_before=clear_before,
167 clear_after=clear_after, retries=retries,
168 recovery_task=recovery_task, depend=depend, block=block)
169
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
171 recovery_task=None, depend=None, block=True):
172 mapper = self.mapper(clear_before, clear_after, retries,
173 recovery_task, depend, block)
174 pf = ParallelFunction(mapper)
175 return pf
157 176
158 177 components.registerAdapter(BlockingTaskClient,
159 178 task.ITaskController, IBlockingTaskClient)
@@ -34,6 +34,15 b' from IPython.kernel.clientinterfaces import ('
34 34 IFCClientInterfaceProvider,
35 35 IBlockingClientAdaptor
36 36 )
37 from IPython.kernel.mapper import (
38 TaskMapper,
39 ITaskMapperFactory,
40 IMapper
41 )
42 from IPython.kernel.parallelfunction import (
43 ParallelFunction,
44 ITaskParallelDecorator
45 )
37 46
38 47 #-------------------------------------------------------------------------------
39 48 # The Controller side of things
@@ -43,32 +52,38 b' from IPython.kernel.clientinterfaces import ('
43 52 class IFCTaskController(Interface):
44 53 """Foolscap interface to task controller.
45 54
46 See the documentation of ITaskController for documentation about the methods.
55 See the documentation of `ITaskController` for more information.
47 56 """
48 def remote_run(request, binTask):
57 def remote_run(binTask):
49 58 """"""
50 59
51 def remote_abort(request, taskid):
60 def remote_abort(taskid):
52 61 """"""
53 62
54 def remote_get_task_result(request, taskid, block=False):
63 def remote_get_task_result(taskid, block=False):
55 64 """"""
56 65
57 def remote_barrier(request, taskids):
66 def remote_barrier(taskids):
67 """"""
68
69 def remote_spin():
58 70 """"""
59 71
60 def remote_spin(request):
72 def remote_queue_status(verbose):
61 73 """"""
62 74
63 def remote_queue_status(request, verbose):
75 def remote_clear():
64 76 """"""
65 77
66 78
67 79 class FCTaskControllerFromTaskController(Referenceable):
68 """XML-RPC attachmeot for controller.
69
70 See IXMLRPCTaskController and ITaskController (and its children) for documentation.
71 80 """
81 Adapt a `TaskController` to an `IFCTaskController`
82
83 This class is used to expose a `TaskController` over the wire using
84 the Foolscap network protocol.
85 """
86
72 87 implements(IFCTaskController, IFCClientInterfaceProvider)
73 88
74 89 def __init__(self, taskController):
@@ -92,8 +107,8 b' class FCTaskControllerFromTaskController(Referenceable):'
92 107
93 108 def remote_run(self, ptask):
94 109 try:
95 ctask = pickle.loads(ptask)
96 task = taskmodule.uncanTask(ctask)
110 task = pickle.loads(ptask)
111 task.uncan_task()
97 112 except:
98 113 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
99 114 else:
@@ -132,6 +147,9 b' class FCTaskControllerFromTaskController(Referenceable):'
132 147 d.addErrback(self.packageFailure)
133 148 return d
134 149
150 def remote_clear(self):
151 return self.taskController.clear()
152
135 153 def remote_get_client_name(self):
136 154 return 'IPython.kernel.taskfc.FCTaskClient'
137 155
@@ -144,13 +162,23 b' components.registerAdapter(FCTaskControllerFromTaskController,'
144 162 #-------------------------------------------------------------------------------
145 163
146 164 class FCTaskClient(object):
147 """XML-RPC based TaskController client that implements ITaskController.
148
149 :Parameters:
150 addr : (ip, port)
151 The ip (str) and port (int) tuple of the `TaskController`.
152 165 """
153 implements(taskmodule.ITaskController, IBlockingClientAdaptor)
166 Client class for Foolscap exposed `TaskController`.
167
168 This class is an adapter that makes a `RemoteReference` to a
169 `TaskController` look like an actual `ITaskController` on the client side.
170
171 This class also implements `IBlockingClientAdaptor` so that clients can
172 automatically get a blocking version of this class.
173 """
174
175 implements(
176 taskmodule.ITaskController,
177 IBlockingClientAdaptor,
178 ITaskMapperFactory,
179 IMapper,
180 ITaskParallelDecorator
181 )
154 182
155 183 def __init__(self, remote_reference):
156 184 self.remote_reference = remote_reference
@@ -168,48 +196,26 b' class FCTaskClient(object):'
168 196 def run(self, task):
169 197 """Run a task on the `TaskController`.
170 198
171 :Parameters:
172 task : a `Task` object
173
174 The Task object is created using the following signature:
175
176 Task(expression, pull=None, push={}, clear_before=False,
177 clear_after=False, retries=0, **options):)
199 See the documentation of the `MapTask` and `StringTask` classes for
200 details on how to build a task of different types.
178 201
179 The meaning of the arguments is as follows:
202 :Parameters:
203 task : an `ITask` implementer
180 204
181 :Task Parameters:
182 expression : str
183 A str that is valid python code that is the task.
184 pull : str or list of str
185 The names of objects to be pulled as results.
186 push : dict
187 A dict of objects to be pushed into the engines namespace before
188 execution of the expression.
189 clear_before : boolean
190 Should the engine's namespace be cleared before the task is run.
191 Default=False.
192 clear_after : boolean
193 Should the engine's namespace be cleared after the task is run.
194 Default=False.
195 retries : int
196 The number of times to resumbit the task if it fails. Default=0.
197 options : dict
198 Any other keyword options for more elaborate uses of tasks
199
200 205 :Returns: The int taskid of the submitted task. Pass this to
201 206 `get_task_result` to get the `TaskResult` object.
202 207 """
203 assert isinstance(task, taskmodule.Task), "task must be a Task object!"
204 ctask = taskmodule.canTask(task) # handles arbitrary function in .depend
205 # as well as arbitrary recovery_task chains
206 ptask = pickle.dumps(ctask, 2)
208 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
209 task.can_task()
210 ptask = pickle.dumps(task, 2)
211 task.uncan_task()
207 212 d = self.remote_reference.callRemote('run', ptask)
208 213 d.addCallback(self.unpackage)
209 214 return d
210 215
211 216 def get_task_result(self, taskid, block=False):
212 """The task result by taskid.
217 """
218 Get a task result by taskid.
213 219
214 220 :Parameters:
215 221 taskid : int
@@ -224,20 +230,19 b' class FCTaskClient(object):'
224 230 return d
225 231
226 232 def abort(self, taskid):
227 """Abort a task by taskid.
233 """
234 Abort a task by taskid.
228 235
229 236 :Parameters:
230 237 taskid : int
231 238 The taskid of the task to be aborted.
232 block : boolean
233 Should I block until the task is aborted.
234 239 """
235 240 d = self.remote_reference.callRemote('abort', taskid)
236 241 d.addCallback(self.unpackage)
237 242 return d
238 243
239 244 def barrier(self, taskids):
240 """Block until all tasks are completed.
245 """Block until a set of tasks are completed.
241 246
242 247 :Parameters:
243 248 taskids : list, tuple
@@ -248,20 +253,77 b' class FCTaskClient(object):'
248 253 return d
249 254
250 255 def spin(self):
251 """touch the scheduler, to resume scheduling without submitting
252 a task.
256 """
257 Touch the scheduler, to resume scheduling without submitting a task.
258
259 This method only needs to be called in unusual situations where the
260 scheduler is idle for some reason.
253 261 """
254 262 d = self.remote_reference.callRemote('spin')
255 263 d.addCallback(self.unpackage)
256 264 return d
257 265
258 266 def queue_status(self, verbose=False):
259 """Return a dict with the status of the task queue."""
267 """
268 Get a dictionary with the current state of the task queue.
269
270 :Parameters:
271 verbose : boolean
272 If True, return a list of taskids. If False, simply give
273 the number of tasks with each status.
274
275 :Returns:
276 A dict with the queue status.
277 """
260 278 d = self.remote_reference.callRemote('queue_status', verbose)
261 279 d.addCallback(self.unpackage)
262 280 return d
263 281
282 def clear(self):
283 """
284 Clear all previously run tasks from the task controller.
285
286 This is needed because the task controller keep all task results
287 in memory. This can be a problem is there are many completed
288 tasks. Users should call this periodically to clean out these
289 cached task results.
290 """
291 d = self.remote_reference.callRemote('clear')
292 return d
293
264 294 def adapt_to_blocking_client(self):
295 """
296 Wrap self in a blocking version that implements `IBlockingTaskClient.
297 """
265 298 from IPython.kernel.taskclient import IBlockingTaskClient
266 299 return IBlockingTaskClient(self)
300
301 def map(self, func, *sequences):
302 """
303 Apply func to *sequences elementwise. Like Python's builtin map.
304
305 This version is load balanced.
306 """
307 return self.mapper().map(func, *sequences)
308
309 def mapper(self, clear_before=False, clear_after=False, retries=0,
310 recovery_task=None, depend=None, block=True):
311 """
312 Create an `IMapper` implementer with a given set of arguments.
313
314 The `IMapper` created using a task controller is load balanced.
315
316 See the documentation for `IPython.kernel.task.BaseTask` for
317 documentation on the arguments to this method.
318 """
319 return TaskMapper(self, clear_before=clear_before,
320 clear_after=clear_after, retries=retries,
321 recovery_task=recovery_task, depend=depend, block=block)
322
323 def parallel(self, clear_before=False, clear_after=False, retries=0,
324 recovery_task=None, depend=None, block=True):
325 mapper = self.mapper(clear_before, clear_after, retries,
326 recovery_task, depend, block)
327 pf = ParallelFunction(mapper)
328 return pf
267 329
@@ -163,7 +163,6 b' class IEngineCoreTestCase(object):'
163 163 try:
164 164 import numpy
165 165 except:
166 print 'no numpy, ',
167 166 return
168 167 a = numpy.random.random(1000)
169 168 d = self.engine.push(dict(a=a))
@@ -733,7 +733,7 b' class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase'
733 733 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
734 734 d.addCallback(lambda r: self.assertEquals(r, range(16)))
735 735 return d
736
736
737 737 def testScatterGatherNumpyNonblocking(self):
738 738 try:
739 739 import numpy
@@ -749,17 +749,7 b' class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase'
749 749 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
750 750 d.addCallback(lambda r: assert_array_equal(r, a))
751 751 return d
752
753 def testMapNonblocking(self):
754 self.addEngine(4)
755 def f(x):
756 return x**2
757 data = range(16)
758 d= self.multiengine.map(f, data, block=False)
759 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
760 d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data]))
761 return d
762
752
763 753 def test_clear_pending_deferreds(self):
764 754 self.addEngine(4)
765 755 did_list = []
@@ -43,23 +43,23 b' class TaskTestBase(object):'
43 43
44 44 class ITaskControllerTestCase(TaskTestBase):
45 45
46 def testTaskIDs(self):
46 def test_task_ids(self):
47 47 self.addEngine(1)
48 d = self.tc.run(task.Task('a=5'))
48 d = self.tc.run(task.StringTask('a=5'))
49 49 d.addCallback(lambda r: self.assertEquals(r, 0))
50 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
50 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
51 51 d.addCallback(lambda r: self.assertEquals(r, 1))
52 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
52 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
53 53 d.addCallback(lambda r: self.assertEquals(r, 2))
54 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
54 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
55 55 d.addCallback(lambda r: self.assertEquals(r, 3))
56 56 return d
57 57
58 def testAbort(self):
58 def test_abort(self):
59 59 """Cannot do a proper abort test, because blocking execution prevents
60 60 abort from being called before task completes"""
61 61 self.addEngine(1)
62 t = task.Task('a=5')
62 t = task.StringTask('a=5')
63 63 d = self.tc.abort(0)
64 64 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
65 65 d.addCallback(lambda _:self.tc.run(t))
@@ -67,15 +67,15 b' class ITaskControllerTestCase(TaskTestBase):'
67 67 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
68 68 return d
69 69
70 def testAbortType(self):
70 def test_abort_type(self):
71 71 self.addEngine(1)
72 72 d = self.tc.abort('asdfadsf')
73 73 d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException))
74 74 return d
75 75
76 def testClears(self):
76 def test_clear_before_and_after(self):
77 77 self.addEngine(1)
78 t = task.Task('a=1', clear_before=True, pull='b', clear_after=True)
78 t = task.StringTask('a=1', clear_before=True, pull='b', clear_after=True)
79 79 d = self.multiengine.execute('b=1', targets=0)
80 80 d.addCallback(lambda _: self.tc.run(t))
81 81 d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True))
@@ -85,10 +85,10 b' class ITaskControllerTestCase(TaskTestBase):'
85 85 d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
86 86 return d
87 87
88 def testSimpleRetries(self):
88 def test_simple_retries(self):
89 89 self.addEngine(1)
90 t = task.Task("i += 1\nassert i == 16", pull='i',retries=10)
91 t2 = task.Task("i += 1\nassert i == 16", pull='i',retries=10)
90 t = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10)
91 t2 = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10)
92 92 d = self.multiengine.execute('i=0', targets=0)
93 93 d.addCallback(lambda r: self.tc.run(t))
94 94 d.addCallback(self.tc.get_task_result, block=True)
@@ -101,10 +101,10 b' class ITaskControllerTestCase(TaskTestBase):'
101 101 d.addCallback(lambda r: self.assertEquals(r, 16))
102 102 return d
103 103
104 def testRecoveryTasks(self):
104 def test_recovery_tasks(self):
105 105 self.addEngine(1)
106 t = task.Task("i=16", pull='i')
107 t2 = task.Task("raise Exception", recovery_task=t, retries = 2)
106 t = task.StringTask("i=16", pull='i')
107 t2 = task.StringTask("raise Exception", recovery_task=t, retries = 2)
108 108
109 109 d = self.tc.run(t2)
110 110 d.addCallback(self.tc.get_task_result, block=True)
@@ -112,47 +112,76 b' class ITaskControllerTestCase(TaskTestBase):'
112 112 d.addCallback(lambda r: self.assertEquals(r, 16))
113 113 return d
114 114
115 # def testInfiniteRecoveryLoop(self):
116 # self.addEngine(1)
117 # t = task.Task("raise Exception", retries = 5)
118 # t2 = task.Task("assert True", retries = 2, recovery_task = t)
119 # t.recovery_task = t2
120 #
121 # d = self.tc.run(t)
122 # d.addCallback(self.tc.get_task_result, block=True)
123 # d.addCallback(lambda tr: tr.ns.i)
124 # d.addBoth(printer)
125 # d.addErrback(lambda f: self.assertRaises(AssertionError, f.raiseException))
126 # return d
127 #
128 def testSetupNS(self):
115 def test_setup_ns(self):
129 116 self.addEngine(1)
130 117 d = self.multiengine.execute('a=0', targets=0)
131 118 ns = dict(a=1, b=0)
132 t = task.Task("", push=ns, pull=['a','b'])
119 t = task.StringTask("", push=ns, pull=['a','b'])
133 120 d.addCallback(lambda r: self.tc.run(t))
134 121 d.addCallback(self.tc.get_task_result, block=True)
135 122 d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']})
136 123 d.addCallback(lambda r: self.assertEquals(r, ns))
137 124 return d
138 125
139 def testTaskResults(self):
126 def test_string_task_results(self):
140 127 self.addEngine(1)
141 t1 = task.Task('a=5', pull='a')
128 t1 = task.StringTask('a=5', pull='a')
142 129 d = self.tc.run(t1)
143 130 d.addCallback(self.tc.get_task_result, block=True)
144 d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raiseException()))
131 d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raise_exception()))
145 132 d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None)))
146 133
147 t2 = task.Task('7=5')
134 t2 = task.StringTask('7=5')
148 135 d.addCallback(lambda r: self.tc.run(t2))
149 136 d.addCallback(self.tc.get_task_result, block=True)
150 137 d.addCallback(lambda tr: tr.ns)
151 138 d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException))
152 139
153 t3 = task.Task('', pull='b')
140 t3 = task.StringTask('', pull='b')
154 141 d.addCallback(lambda r: self.tc.run(t3))
155 142 d.addCallback(self.tc.get_task_result, block=True)
156 143 d.addCallback(lambda tr: tr.ns)
157 144 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
158 145 return d
146
147 def test_map_task(self):
148 self.addEngine(1)
149 t1 = task.MapTask(lambda x: 2*x,(10,))
150 d = self.tc.run(t1)
151 d.addCallback(self.tc.get_task_result, block=True)
152 d.addCallback(lambda r: self.assertEquals(r,20))
153
154 t2 = task.MapTask(lambda : 20)
155 d.addCallback(lambda _: self.tc.run(t2))
156 d.addCallback(self.tc.get_task_result, block=True)
157 d.addCallback(lambda r: self.assertEquals(r,20))
158
159 t3 = task.MapTask(lambda x: x,(),{'x':20})
160 d.addCallback(lambda _: self.tc.run(t3))
161 d.addCallback(self.tc.get_task_result, block=True)
162 d.addCallback(lambda r: self.assertEquals(r,20))
163 return d
164
165 def test_map_task_failure(self):
166 self.addEngine(1)
167 t1 = task.MapTask(lambda x: 1/0,(10,))
168 d = self.tc.run(t1)
169 d.addCallback(self.tc.get_task_result, block=True)
170 d.addErrback(lambda f: self.assertRaises(ZeroDivisionError, f.raiseException))
171 return d
172
173 def test_map_task_args(self):
174 self.assertRaises(TypeError, task.MapTask, 'asdfasdf')
175 self.assertRaises(TypeError, task.MapTask, lambda x: x, 10)
176 self.assertRaises(TypeError, task.MapTask, lambda x: x, (10,),30)
177
178 def test_clear(self):
179 self.addEngine(1)
180 t1 = task.MapTask(lambda x: 2*x,(10,))
181 d = self.tc.run(t1)
182 d.addCallback(lambda _: self.tc.get_task_result(0, block=True))
183 d.addCallback(lambda r: self.assertEquals(r,20))
184 d.addCallback(lambda _: self.tc.clear())
185 d.addCallback(lambda _: self.tc.get_task_result(0, block=True))
186 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
187 return d
@@ -38,7 +38,7 b' try:'
38 38 IEngineQueuedTestCase
39 39 except ImportError:
40 40 print "we got an error!!!"
41 pass
41 raise
42 42 else:
43 43 class EngineFCTest(DeferredTestCase,
44 44 IEngineCoreTestCase,
@@ -26,9 +26,20 b' try:'
26 26 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
27 27 from IPython.kernel import multiengine as me
28 28 from IPython.kernel.clientconnector import ClientConnector
29 from IPython.kernel.parallelfunction import ParallelFunction
30 from IPython.kernel.error import CompositeError
31 from IPython.kernel.util import printer
29 32 except ImportError:
30 33 pass
31 34 else:
35
36 def _raise_it(f):
37 try:
38 f.raiseException()
39 except CompositeError, e:
40 e.raise_exception()
41
42
32 43 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
33 44
34 45 def setUp(self):
@@ -68,3 +79,66 b' else:'
68 79 d.addBoth(lambda _: self.controller.stopService())
69 80 dlist.append(d)
70 81 return defer.DeferredList(dlist)
82
83 def test_mapper(self):
84 self.addEngine(4)
85 m = self.multiengine.mapper()
86 self.assertEquals(m.multiengine,self.multiengine)
87 self.assertEquals(m.dist,'b')
88 self.assertEquals(m.targets,'all')
89 self.assertEquals(m.block,True)
90
91 def test_map_default(self):
92 self.addEngine(4)
93 m = self.multiengine.mapper()
94 d = m.map(lambda x: 2*x, range(10))
95 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
96 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
97 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
98 return d
99
100 def test_map_noblock(self):
101 self.addEngine(4)
102 m = self.multiengine.mapper(block=False)
103 d = m.map(lambda x: 2*x, range(10))
104 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
105 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 return d
107
108 def test_mapper_fail(self):
109 self.addEngine(4)
110 m = self.multiengine.mapper()
111 d = m.map(lambda x: 1/0, range(10))
112 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
113 return d
114
115 def test_parallel(self):
116 self.addEngine(4)
117 p = self.multiengine.parallel()
118 self.assert_(isinstance(p, ParallelFunction))
119 @p
120 def f(x): return 2*x
121 d = f(range(10))
122 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
123 return d
124
125 def test_parallel_noblock(self):
126 self.addEngine(1)
127 p = self.multiengine.parallel(block=False)
128 self.assert_(isinstance(p, ParallelFunction))
129 @p
130 def f(x): return 2*x
131 d = f(range(10))
132 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
133 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
134 return d
135
136 def test_parallel_fail(self):
137 self.addEngine(4)
138 p = self.multiengine.parallel()
139 self.assert_(isinstance(p, ParallelFunction))
140 @p
141 def f(x): return 1/0
142 d = f(range(10))
143 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
144 return d No newline at end of file
@@ -20,8 +20,6 b' try:'
20 20 from twisted.internet import defer
21 21 from twisted.python import failure
22 22
23 from IPython.testing import tcommon
24 #from IPython.testing.tcommon import *
25 23 from IPython.testing.util import DeferredTestCase
26 24 import IPython.kernel.pendingdeferred as pd
27 25 from IPython.kernel import error
@@ -29,6 +27,11 b' try:'
29 27 except ImportError:
30 28 pass
31 29 else:
30
31 class Foo(object):
32
33 def bar(self, bahz):
34 return defer.succeed('blahblah: %s' % bahz)
32 35
33 36 class TwoPhaseFoo(pd.PendingDeferredManager):
34 37
@@ -181,6 +184,3 b' else:'
181 184 d3 = self.pdm.get_pending_deferred(did,False)
182 185 d3.addCallback(lambda r: self.assertEquals(r,'bar'))
183 186
184
185 # Global object expected by Twisted's trial
186 testSuite = lambda : makeTestSuite(__name__,dt_files,dt_modules)
@@ -30,6 +30,8 b' try:'
30 30 from IPython.kernel.util import printer
31 31 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
32 32 from IPython.kernel.clientconnector import ClientConnector
33 from IPython.kernel.error import CompositeError
34 from IPython.kernel.parallelfunction import ParallelFunction
33 35 except ImportError:
34 36 pass
35 37 else:
@@ -38,6 +40,12 b' else:'
38 40 # Tests
39 41 #-------------------------------------------------------------------------------
40 42
43 def _raise_it(f):
44 try:
45 f.raiseException()
46 except CompositeError, e:
47 e.raise_exception()
48
41 49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
42 50
43 51 def setUp(self):
@@ -87,4 +95,67 b' else:'
87 95 d.addBoth(lambda _: self.controller.stopService())
88 96 dlist.append(d)
89 97 return defer.DeferredList(dlist)
90
98
99 def test_mapper(self):
100 self.addEngine(1)
101 m = self.tc.mapper()
102 self.assertEquals(m.task_controller,self.tc)
103 self.assertEquals(m.clear_before,False)
104 self.assertEquals(m.clear_after,False)
105 self.assertEquals(m.retries,0)
106 self.assertEquals(m.recovery_task,None)
107 self.assertEquals(m.depend,None)
108 self.assertEquals(m.block,True)
109
110 def test_map_default(self):
111 self.addEngine(1)
112 m = self.tc.mapper()
113 d = m.map(lambda x: 2*x, range(10))
114 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
117 return d
118
119 def test_map_noblock(self):
120 self.addEngine(1)
121 m = self.tc.mapper(block=False)
122 d = m.map(lambda x: 2*x, range(10))
123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
124 return d
125
126 def test_mapper_fail(self):
127 self.addEngine(1)
128 m = self.tc.mapper()
129 d = m.map(lambda x: 1/0, range(10))
130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
131 return d
132
133 def test_parallel(self):
134 self.addEngine(1)
135 p = self.tc.parallel()
136 self.assert_(isinstance(p, ParallelFunction))
137 @p
138 def f(x): return 2*x
139 d = f(range(10))
140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
141 return d
142
143 def test_parallel_noblock(self):
144 self.addEngine(1)
145 p = self.tc.parallel(block=False)
146 self.assert_(isinstance(p, ParallelFunction))
147 @p
148 def f(x): return 2*x
149 d = f(range(10))
150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
151 return d
152
153 def test_parallel_fail(self):
154 self.addEngine(1)
155 p = self.tc.parallel()
156 self.assert_(isinstance(p, ParallelFunction))
157 @p
158 def f(x): return 1/0
159 d = f(range(10))
160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
161 return d No newline at end of file
@@ -53,7 +53,7 b' class DistributedSpider(object):'
53 53 self.allLinks.append(url)
54 54 if url.startswith(self.site):
55 55 print ' ', url
56 self.linksWorking[url] = self.tc.run(client.Task('links = fetchAndParse(url)', pull=['links'], push={'url': url}))
56 self.linksWorking[url] = self.tc.run(client.StringTask('links = fetchAndParse(url)', pull=['links'], push={'url': url}))
57 57
58 58 def onVisitDone(self, result, url):
59 59 print url, ':'
@@ -8,7 +8,7 b' tc = client.TaskClient()'
8 8 mec = client.MultiEngineClient()
9 9
10 10 mec.execute('import time')
11 hello_taskid = tc.run(client.Task('time.sleep(3) ; word = "Hello,"', pull=('word')))
12 world_taskid = tc.run(client.Task('time.sleep(3) ; word = "World!"', pull=('word')))
11 hello_taskid = tc.run(client.StringTask('time.sleep(3) ; word = "Hello,"', pull=('word')))
12 world_taskid = tc.run(client.StringTask('time.sleep(3) ; word = "World!"', pull=('word')))
13 13 print "Submitted tasks:", hello_taskid, world_taskid
14 14 print tc.get_task_result(hello_taskid,block=True).ns.word, tc.get_task_result(world_taskid,block=True).ns.word
@@ -31,7 +31,7 b' sigma_vals = N.linspace(0.0, 0.2,5)'
31 31 taskids = []
32 32 for K in K_vals:
33 33 for sigma in sigma_vals:
34 t = client.Task(task_string,
34 t = client.StringTask(task_string,
35 35 push=dict(sigma=sigma,K=K),
36 36 pull=('vp','ap','vc','ac','sigma','K'))
37 37 taskids.append(tc.run(t))
@@ -11,8 +11,8 b' b = 10*d'
11 11 c = a*b*d
12 12 """
13 13
14 t1 = client.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
14 t1 = client.StringTask(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
15 15 tid1 = tc.run(t1)
16 16 tr1 = tc.get_task_result(tid1,block=True)
17 tr1.raiseException()
17 tr1.raise_exception()
18 18 print "a, b: ", tr1.ns.a, tr1.ns.b No newline at end of file
@@ -10,7 +10,7 b' mec = client.MultiEngineClient()'
10 10 mec.execute('import time')
11 11
12 12 for i in range(24):
13 tc.irun('time.sleep(1)')
13 tc.run(client.StringTask('time.sleep(1)'))
14 14
15 15 for i in range(6):
16 16 time.sleep(1.0)
@@ -18,7 +18,7 b' for i in range(6):'
18 18 print tc.queue_status()
19 19
20 20 for i in range(24):
21 tc.irun('time.sleep(1)')
21 tc.run(client.StringTask('time.sleep(1)'))
22 22
23 23 for i in range(6):
24 24 time.sleep(1.0)
@@ -26,7 +26,7 b' for i in range(6):'
26 26 print tc.queue_status(True)
27 27
28 28 for i in range(12):
29 tc.irun('time.sleep(2)')
29 tc.run(client.StringTask('time.sleep(2)'))
30 30
31 31 print "Queue status (vebose=True)"
32 32 print tc.queue_status(True)
@@ -55,7 +55,7 b' def main():'
55 55
56 56 # the jobs should take a random time within a range
57 57 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
58 tasks = [client.Task("time.sleep(%f)"%t) for t in times]
58 tasks = [client.StringTask("time.sleep(%f)"%t) for t in times]
59 59 stime = sum(times)
60 60
61 61 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
@@ -12,6 +12,22 b' Release 0.9'
12 12 New features
13 13 ------------
14 14
15 * The notion of a task has been completely reworked. An `ITask` interface has
16 been created. This interface defines the methods that tasks need to implement.
17 These methods are now responsible for things like submitting tasks and processing
18 results. There are two basic task types: :class:`IPython.kernel.task.StringTask`
19 (this is the old `Task` object, but renamed) and the new
20 :class:`IPython.kernel.task.MapTask`, which is based on a function.
21 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
22 standardize the idea of a `map` method. This interface has a single
23 `map` method that has the same syntax as the built-in `map`. We have also defined
24 a `mapper` factory interface that creates objects that implement
25 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both
26 the multiengine and task controller now have mapping capabilties.
27 * The parallel function capabilities have been reworks. The major changes are that
28 i) there is now an `@parallel` magic that creates parallel functions, ii)
29 the syntax for mulitple variable follows that of `map`, iii) both the
30 multiengine and task controller now have a parallel function implementation.
15 31 * All of the parallel computing capabilities from `ipython1-dev` have been merged into
16 32 IPython proper. This resulted in the following new subpackages:
17 33 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
@@ -38,11 +54,11 b' New features'
38 54 when ipcluster is able to start things on other hosts, we will put security
39 55 back.
40 56
41
42
43 57 Bug fixes
44 58 ---------
45 59
60 * The colors escapes in the multiengine client are now turned off on win32 as they
61 don't print correctly.
46 62 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing mpi_import_statement
47 63 incorrectly, which was leading the engine to crash when mpi was enabled.
48 64 * A few subpackages has missing `__init__.py` files.
@@ -52,6 +68,12 b' Bug fixes'
52 68 Backwards incompatible changes
53 69 ------------------------------
54 70
71 * :class:`IPython.kernel.client.Task` has been renamed
72 :class:`IPython.kernel.client.StringTask` to make way for new task types.
73 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
74 and `map`.
75 * Renamed the values that the rename `dist` keyword argument can have from
76 `'basic'` to `'b'`.
55 77 * IPython has a larger set of dependencies if you want all of its capabilities.
56 78 See the `setup.py` script for details.
57 79 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
@@ -1,3 +1,5 b''
1 .. _install_index:
2
1 3 ==================
2 4 Installation
3 5 ==================
@@ -4,18 +4,6 b''
4 4 Introduction
5 5 ============
6 6
7 This is the official documentation for IPython 0.x series (i.e. what
8 we are used to refer to just as "IPython"). The original text of the
9 manual (most of which is still in place) has been authored by Fernando
10 Perez, but as recommended usage patterns and new features have
11 emerged, this manual has been updated to reflect that fact. Most of
12 the additions have been authored by Ville M. Vainio.
13
14 The manual has been generated from reStructuredText source markup with
15 Sphinx, which should make it much easier to keep it up-to-date in the
16 future. Some reST artifacts and bugs may still be apparent in the
17 documentation, but this should improve as the toolchain matures.
18
19 7 Overview
20 8 ========
21 9
@@ -25,8 +13,19 b' creating test files as is typical in most programming languages.'
25 13 However, the interpreter supplied with the standard Python distribution
26 14 is somewhat limited for extended interactive use.
27 15
28 IPython is a free software project (released under the BSD license)
29 which tries to:
16 The goal of IPython is to create a comprehensive environment for
17 interactive and exploratory computing. To support, this goal, IPython
18 has two main components:
19
20 * An enhanced interactive Python shell.
21 * An architecture for interactive parallel computing.
22
23 All of IPython is open source (released under the revised BSD license).
24
25 Enhanced interactive Python shell
26 =================================
27
28 IPython's interactive shell (`ipython`), has the following goals:
30 29
31 30 1. Provide an interactive shell superior to Python's default. IPython
32 31 has many features for object introspection, system shell access,
@@ -50,140 +49,126 b' which tries to:'
50 49 WX applications via special threading flags. The normal Python
51 50 shell can only do this for Tkinter applications.
52 51
53
54 Main features
55 -------------
56
57 * Dynamic object introspection. One can access docstrings, function
58 definition prototypes, source code, source files and other details
59 of any object accessible to the interpreter with a single
60 keystroke ('?', and using '??' provides additional detail).
61 * Searching through modules and namespaces with '*' wildcards, both
62 when using the '?' system and via the %psearch command.
63 * Completion in the local namespace, by typing TAB at the prompt.
64 This works for keywords, modules, methods, variables and files in the
65 current directory. This is supported via the readline library, and
66 full access to configuring readline's behavior is provided.
67 Custom completers can be implemented easily for different purposes
68 (system commands, magic arguments etc.)
69 * Numbered input/output prompts with command history (persistent
70 across sessions and tied to each profile), full searching in this
71 history and caching of all input and output.
72 * User-extensible 'magic' commands. A set of commands prefixed with
73 % is available for controlling IPython itself and provides
74 directory control, namespace information and many aliases to
75 common system shell commands.
76 * Alias facility for defining your own system aliases.
77 * Complete system shell access. Lines starting with ! are passed
78 directly to the system shell, and using !! or var = !cmd
79 captures shell output into python variables for further use.
80 * Background execution of Python commands in a separate thread.
81 IPython has an internal job manager called jobs, and a
82 conveninence backgrounding magic function called %bg.
83 * The ability to expand python variables when calling the system
84 shell. In a shell command, any python variable prefixed with $ is
85 expanded. A double $$ allows passing a literal $ to the shell (for
86 access to shell and environment variables like $PATH).
87 * Filesystem navigation, via a magic %cd command, along with a
88 persistent bookmark system (using %bookmark) for fast access to
89 frequently visited directories.
90 * A lightweight persistence framework via the %store command, which
91 allows you to save arbitrary Python variables. These get restored
92 automatically when your session restarts.
93 * Automatic indentation (optional) of code as you type (through the
94 readline library).
95 * Macro system for quickly re-executing multiple lines of previous
96 input with a single name. Macros can be stored persistently via
97 %store and edited via %edit.
98 * Session logging (you can then later use these logs as code in your
99 programs). Logs can optionally timestamp all input, and also store
100 session output (marked as comments, so the log remains valid
101 Python source code).
102 * Session restoring: logs can be replayed to restore a previous
103 session to the state where you left it.
104 * Verbose and colored exception traceback printouts. Easier to parse
105 visually, and in verbose mode they produce a lot of useful
106 debugging information (basically a terminal version of the cgitb
107 module).
108 * Auto-parentheses: callable objects can be executed without
109 parentheses: 'sin 3' is automatically converted to 'sin(3)'.
110 * Auto-quoting: using ',' or ';' as the first character forces
111 auto-quoting of the rest of the line: ',my_function a b' becomes
112 automatically 'my_function("a","b")', while ';my_function a b'
113 becomes 'my_function("a b")'.
114 * Extensible input syntax. You can define filters that pre-process
115 user input to simplify input in special situations. This allows
116 for example pasting multi-line code fragments which start with
117 '>>>' or '...' such as those from other python sessions or the
118 standard Python documentation.
119 * Flexible configuration system. It uses a configuration file which
120 allows permanent setting of all command-line options, module
121 loading, code and file execution. The system allows recursive file
122 inclusion, so you can have a base file with defaults and layers
123 which load other customizations for particular projects.
124 * Embeddable. You can call IPython as a python shell inside your own
125 python programs. This can be used both for debugging code or for
126 providing interactive abilities to your programs with knowledge
127 about the local namespaces (very useful in debugging and data
128 analysis situations).
129 * Easy debugger access. You can set IPython to call up an enhanced
130 version of the Python debugger (pdb) every time there is an
131 uncaught exception. This drops you inside the code which triggered
132 the exception with all the data live and it is possible to
133 navigate the stack to rapidly isolate the source of a bug. The
134 %run magic command -with the -d option- can run any script under
135 pdb's control, automatically setting initial breakpoints for you.
136 This version of pdb has IPython-specific improvements, including
137 tab-completion and traceback coloring support. For even easier
138 debugger access, try %debug after seeing an exception. winpdb is
139 also supported, see ipy_winpdb extension.
140 * Profiler support. You can run single statements (similar to
141 profile.run()) or complete programs under the profiler's control.
142 While this is possible with standard cProfile or profile modules,
143 IPython wraps this functionality with magic commands (see '%prun'
144 and '%run -p') convenient for rapid interactive work.
145 * Doctest support. The special %doctest_mode command toggles a mode
146 that allows you to paste existing doctests (with leading '>>>'
147 prompts and whitespace) and uses doctest-compatible prompts and
148 output, so you can use IPython sessions as doctest code.
149
52 Main features of the interactive shell
53 --------------------------------------
54
55 * Dynamic object introspection. One can access docstrings, function
56 definition prototypes, source code, source files and other details
57 of any object accessible to the interpreter with a single
58 keystroke (:samp:`?`, and using :samp:`??` provides additional detail).
59 * Searching through modules and namespaces with :samp:`*` wildcards, both
60 when using the :samp:`?` system and via the :samp:`%psearch` command.
61 * Completion in the local namespace, by typing :kbd:`TAB` at the prompt.
62 This works for keywords, modules, methods, variables and files in the
63 current directory. This is supported via the readline library, and
64 full access to configuring readline's behavior is provided.
65 Custom completers can be implemented easily for different purposes
66 (system commands, magic arguments etc.)
67 * Numbered input/output prompts with command history (persistent
68 across sessions and tied to each profile), full searching in this
69 history and caching of all input and output.
70 * User-extensible 'magic' commands. A set of commands prefixed with
71 :samp:`%` is available for controlling IPython itself and provides
72 directory control, namespace information and many aliases to
73 common system shell commands.
74 * Alias facility for defining your own system aliases.
75 * Complete system shell access. Lines starting with :samp:`!` are passed
76 directly to the system shell, and using :samp:`!!` or :samp:`var = !cmd`
77 captures shell output into python variables for further use.
78 * Background execution of Python commands in a separate thread.
79 IPython has an internal job manager called jobs, and a
80 conveninence backgrounding magic function called :samp:`%bg`.
81 * The ability to expand python variables when calling the system
82 shell. In a shell command, any python variable prefixed with :samp:`$` is
83 expanded. A double :samp:`$$` allows passing a literal :samp:`$` to the shell (for
84 access to shell and environment variables like :envvar:`PATH`).
85 * Filesystem navigation, via a magic :samp:`%cd` command, along with a
86 persistent bookmark system (using :samp:`%bookmark`) for fast access to
87 frequently visited directories.
88 * A lightweight persistence framework via the :samp:`%store` command, which
89 allows you to save arbitrary Python variables. These get restored
90 automatically when your session restarts.
91 * Automatic indentation (optional) of code as you type (through the
92 readline library).
93 * Macro system for quickly re-executing multiple lines of previous
94 input with a single name. Macros can be stored persistently via
95 :samp:`%store` and edited via :samp:`%edit`.
96 * Session logging (you can then later use these logs as code in your
97 programs). Logs can optionally timestamp all input, and also store
98 session output (marked as comments, so the log remains valid
99 Python source code).
100 * Session restoring: logs can be replayed to restore a previous
101 session to the state where you left it.
102 * Verbose and colored exception traceback printouts. Easier to parse
103 visually, and in verbose mode they produce a lot of useful
104 debugging information (basically a terminal version of the cgitb
105 module).
106 * Auto-parentheses: callable objects can be executed without
107 parentheses: :samp:`sin 3` is automatically converted to :samp:`sin(3)`.
108 * Auto-quoting: using :samp:`,`, or :samp:`;` as the first character forces
109 auto-quoting of the rest of the line: :samp:`,my_function a b` becomes
110 automatically :samp:`my_function("a","b")`, while :samp:`;my_function a b`
111 becomes :samp:`my_function("a b")`.
112 * Extensible input syntax. You can define filters that pre-process
113 user input to simplify input in special situations. This allows
114 for example pasting multi-line code fragments which start with
115 :samp:`>>>` or :samp:`...` such as those from other python sessions or the
116 standard Python documentation.
117 * Flexible configuration system. It uses a configuration file which
118 allows permanent setting of all command-line options, module
119 loading, code and file execution. The system allows recursive file
120 inclusion, so you can have a base file with defaults and layers
121 which load other customizations for particular projects.
122 * Embeddable. You can call IPython as a python shell inside your own
123 python programs. This can be used both for debugging code or for
124 providing interactive abilities to your programs with knowledge
125 about the local namespaces (very useful in debugging and data
126 analysis situations).
127 * Easy debugger access. You can set IPython to call up an enhanced
128 version of the Python debugger (pdb) every time there is an
129 uncaught exception. This drops you inside the code which triggered
130 the exception with all the data live and it is possible to
131 navigate the stack to rapidly isolate the source of a bug. The
132 :samp:`%run` magic command (with the :samp:`-d` option) can run any script under
133 pdb's control, automatically setting initial breakpoints for you.
134 This version of pdb has IPython-specific improvements, including
135 tab-completion and traceback coloring support. For even easier
136 debugger access, try :samp:`%debug` after seeing an exception. winpdb is
137 also supported, see ipy_winpdb extension.
138 * Profiler support. You can run single statements (similar to
139 :samp:`profile.run()`) or complete programs under the profiler's control.
140 While this is possible with standard cProfile or profile modules,
141 IPython wraps this functionality with magic commands (see :samp:`%prun`
142 and :samp:`%run -p`) convenient for rapid interactive work.
143 * Doctest support. The special :samp:`%doctest_mode` command toggles a mode
144 that allows you to paste existing doctests (with leading :samp:`>>>`
145 prompts and whitespace) and uses doctest-compatible prompts and
146 output, so you can use IPython sessions as doctest code.
147
148 Interactive parallel computing
149 ==============================
150
151 Increasingly, parallel computer hardware, such as multicore CPUs, clusters and supercomputers, is becoming ubiquitous. Over the last 3 years, we have developed an
152 architecture within IPython that allows such hardware to be used quickly and easily
153 from Python. Moreover, this architecture is designed to support interactive and
154 collaborative parallel computing.
155
156 For more information, see our :ref:`overview <parallel_index>` of using IPython for
157 parallel computing.
150 158
151 159 Portability and Python requirements
152 160 -----------------------------------
153 161
154 Python requirements: IPython requires with Python version 2.3 or newer.
155 If you are still using Python 2.2 and can not upgrade, the last version
156 of IPython which worked with Python 2.2 was 0.6.15, so you will have to
157 use that.
158
159 IPython is developed under Linux, but it should work in any reasonable
160 Unix-type system (tested OK under Solaris and the BSD family, for which
161 a port exists thanks to Dryice Liu).
162
163 Mac OS X: it works, apparently without any problems (thanks to Jim Boyle
164 at Lawrence Livermore for the information). Thanks to Andrea Riciputi,
165 Fink support is available.
166
167 CygWin: it works mostly OK, though some users have reported problems
168 with prompt coloring. No satisfactory solution to this has been found so
169 far, you may want to disable colors permanently in the ipythonrc
170 configuration file if you experience problems. If you have proper color
171 support under cygwin, please post to the IPython mailing list so this
172 issue can be resolved for all users.
173
174 Windows: it works well under Windows Vista/XP/2k, and I suspect NT should
175 behave similarly. Section "Installation under windows" describes
176 installation details for Windows, including some additional tools needed
177 on this platform.
178
179 Windows 9x support is present, and has been reported to work fine (at
180 least on WinME).
181
182 Location
183 --------
184
185 IPython is generously hosted at http://ipython.scipy.org by the
186 Enthought, Inc and the SciPy project. This site offers downloads,
187 subversion access, mailing lists and a bug tracking system. I am very
188 grateful to Enthought (http://www.enthought.com) and all of the SciPy
189 team for their contribution. No newline at end of file
162 As of the 0.9 release, IPython requires Python 2.4 or greater. We have
163 not begun to test IPython on Python 2.6 or 3.0, but we expect it will
164 work with some minor changes.
165
166 IPython is known to work on the following operating systems:
167
168 * Linux
169 * AIX
170 * Most other Unix-like OSs (Solaris, BSD, etc.)
171 * Mac OS X
172 * Windows (CygWin, XP, Vista, etc.)
173
174 See :ref:`here <install_index>` for instructions on how to install IPython. No newline at end of file
@@ -1,3 +1,5 b''
1 .. _parallel_index:
2
1 3 ====================================
2 4 Using IPython for Parallel computing
3 5 ====================================
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now