##// END OF EJS Templates
Updated the multiengine and task interface documentation....
Brian Granger -
Show More
@@ -0,0 +1,240 b''
1 .. _paralleltask:
2
3 ==========================
4 The IPython task interface
5 ==========================
6
7 .. contents::
8
9 The ``Task`` interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the ``MultiEngine`` interface, in the ``Task`` interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. Best of all the user can use both of these interfaces at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the ``Task`` interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign Tasks to engines explicitly.
10
11 Starting the IPython controller and engines
12 ===========================================
13
14 To follow along with this tutorial, the user will need to start the IPython
15 controller and four IPython engines. The simplest way of doing this is to
16 use the ``ipcluster`` command::
17
18 $ ipcluster -n 4
19
20 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
21
22 The magic here is that this single controller and set of engines is running both the MultiEngine and ``Task`` interfaces simultaneously.
23
24 QuickStart Task Farming
25 =======================
26
27 First, a quick example of how to start running the most basic Tasks.
28 The first step is to import the IPython ``client`` module and then create a ``TaskClient`` instance::
29
30 In [1]: from IPython.kernel import client
31
32 In [2]: tc = client.TaskClient()
33
34 Then the user wrap the commands the user want to run in Tasks::
35
36 In [3]: tasklist = []
37 In [4]: for n in range(1000):
38 ... tasklist.append(client.Task("a = %i"%n, pull="a"))
39
40 The first argument of the ``Task`` constructor is a string, the command to be executed. The most important optional keyword argument is ``pull``, which can be a string or list of strings, and it specifies the variable names to be saved as results of the ``Task``.
41
42 Next, the user need to submit the Tasks to the ``TaskController`` with the ``TaskClient``::
43
44 In [5]: taskids = [ tc.run(t) for t in tasklist ]
45
46 This will give the user a list of the TaskIDs used by the controller to keep track of the Tasks and their results. Now at some point the user are going to want to get those results back. The ``barrier`` method allows the user to wait for the Tasks to finish running::
47
48 In [6]: tc.barrier(taskids)
49
50 This command will block until all the Tasks in ``taskids`` have finished. Now, the user probably want to look at the user's results::
51
52 In [7]: task_results = [ tc.get_task_result(taskid) for taskid in taskids ]
53
54 Now the user have a list of ``TaskResult`` objects, which have the actual result as a dictionary, but also keep track of some useful metadata about the ``Task``::
55
56 In [8]: tr = ``Task``_results[73]
57
58 In [9]: tr
59 Out[9]: ``TaskResult``[ID:73]:{'a':73}
60
61 In [10]: tr.engineid
62 Out[10]: 1
63
64 In [11]: tr.submitted, tr.completed, tr.duration
65 Out[11]: ("2008/03/08 03:41:42", "2008/03/08 03:41:44", 2.12345)
66
67 The actual results are stored in a dictionary, ``tr.results``, and a namespace object ``tr.ns`` which accesses the result keys by attribute::
68
69 In [12]: tr.results['a']
70 Out[12]: 73
71
72 In [13]: tr.ns.a
73 Out[13]: 73
74
75 That should cover the basics of running simple Tasks. There are several more powerful things the user can do with Tasks covered later. The most useful probably being using a ``MutiEngineClient`` interface to initialize all the engines with the import dependencies necessary to run the user's Tasks.
76
77 There are many options for running and managing Tasks. The best way to learn further about the ``Task`` interface is to study the examples in ``docs/examples``. If the user do so and learn a lots about this interface, we encourage the user to expand this documentation about the ``Task`` system.
78
79 Overview of the Task System
80 ===========================
81
82 The user's view of the ``Task`` system has three basic objects: The ``TaskClient``, the ``Task``, and the ``TaskResult``. The names of these three objects well indicate their role.
83
84 The ``TaskClient`` is the user's ``Task`` farming connection to the IPython cluster. Unlike the ``MultiEngineClient``, the ``TaskControler`` handles all the scheduling and distribution of work, so the ``TaskClient`` has no notion of engines, it just submits Tasks and requests their results. The Tasks are described as ``Task`` objects, and their results are wrapped in ``TaskResult`` objects. Thus, there are very few necessary methods for the user to manage.
85
86 Inside the task system is a Scheduler object, which assigns tasks to workers. The default scheduler is a simple FIFO queue. Subclassing the Scheduler should be easy, just implementing your own priority system.
87
88 The TaskClient
89 ==============
90
91 The ``TaskClient`` is the object the user use to connect to the ``Controller`` that is managing the user's Tasks. It is the analog of the ``MultiEngineClient`` for the standard IPython multiplexing interface. As with all client interfaces, the first step is to import the IPython Client Module::
92
93 In [1]: from IPython.kernel import client
94
95 Just as with the ``MultiEngineClient``, the user create the ``TaskClient`` with a tuple, containing the ip-address and port of the ``Controller``. the ``client`` module conveniently has the default address of the ``Task`` interface of the controller. Creating a default ``TaskClient`` object would be done with this::
96
97 In [2]: tc = client.TaskClient(client.default_task_address)
98
99 or, if the user want to specify a non default location of the ``Controller``, the user can specify explicitly::
100
101 In [3]: tc = client.TaskClient(("192.168.1.1", 10113))
102
103 As discussed earlier, the ``TaskClient`` only has a few basic methods.
104
105 * ``tc.run(task)``
106 ``run`` is the method by which the user submits Tasks. It takes exactly one argument, a ``Task`` object. All the advanced control of ``Task`` behavior is handled by properties of the ``Task`` object, rather than the submission command, so they will be discussed later in the `Task`_ section. ``run`` returns an integer, the ``Task``ID by which the ``Task`` and its results can be tracked and retrieved::
107
108 In [4]: ``Task``ID = tc.run(``Task``)
109
110 * ``tc.get_task_result(taskid, block=``False``)``
111 ``get_task_result`` is the method by which results are retrieved. It takes a single integer argument, the ``Task``ID`` of the result the user wish to retrieve. ``get_task_result`` also takes a keyword argument ``block``. ``block`` specifies whether the user actually want to wait for the result. If ``block`` is false, as it is by default, ``get_task_result`` will return immediately. If the ``Task`` has completed, it will return the ``TaskResult`` object for that ``Task``. But if the ``Task`` has not completed, it will return ``None``. If the user specify ``block=``True``, then ``get_task_result`` will wait for the ``Task`` to complete, and always return the ``TaskResult`` for the requested ``Task``.
112 * ``tc.barrier(taskid(s))``
113 ``barrier`` is a synchronization method. It takes exactly one argument, a ``Task``ID or list of taskIDs. ``barrier`` will block until all the specified Tasks have completed. In practice, a barrier is often called between the ``Task`` submission section of the code and the result gathering section::
114
115 In [5]: taskIDs = [ tc.run(``Task``) for ``Task`` in myTasks ]
116
117 In [6]: tc.get_task_result(taskIDs[-1]) is None
118 Out[6]: ``True``
119
120 In [7]: tc.barrier(``Task``ID)
121
122 In [8]: results = [ tc.get_task_result(tid) for tid in taskIDs ]
123
124 * ``tc.queue_status(verbose=``False``)``
125 ``queue_status`` is a method for querying the state of the ``TaskControler``. ``queue_status`` returns a dict of the form::
126
127 {'scheduled': Tasks that have been submitted but yet run
128 'pending' : Tasks that are currently running
129 'succeeded': Tasks that have completed successfully
130 'failed' : Tasks that have finished with a failure
131 }
132
133 if @verbose is not specified (or is ``False``), then the values of the dict are integers - the number of Tasks in each state. if @verbose is ``True``, then each element in the dict is a list of the taskIDs in that state::
134
135 In [8]: tc.queue_status()
136 Out[8]: {'scheduled': 4,
137 'pending' : 2,
138 'succeeded': 5,
139 'failed' : 1
140 }
141
142 In [9]: tc.queue_status(verbose=True)
143 Out[9]: {'scheduled': [8,9,10,11],
144 'pending' : [6,7],
145 'succeeded': [0,1,2,4,5],
146 'failed' : [3]
147 }
148
149 * ``tc.abort(taskid)``
150 ``abort`` allows the user to abort Tasks that have already been submitted. ``abort`` will always return immediately. If the ``Task`` has completed, ``abort`` will raise an ``IndexError ``Task`` Already Completed``. An obvious case for ``abort`` would be where the user submits a long-running ``Task`` with a number of retries (see ``Task``_ section for how to specify retries) in an interactive session, but realizes there has been a typo. The user can then abort the ``Task``, preventing certain failures from cluttering up the queue. It can also be used for parallel search-type problems, where only one ``Task`` will give the solution, so once the user find the solution, the user would want to abort all remaining Tasks to prevent wasted work.
151 * ``tc.spin()``
152 ``spin`` simply triggers the scheduler in the ``TaskControler``. Under most normal circumstances, this will do nothing. The primary known usage case involves the ``Task`` dependency (see `Dependencies`_). The dependency is a function of an Engine's ``properties``, but changing the ``properties`` via the ``MutliEngineClient`` does not trigger a reschedule event. The main example case for this requires the following event sequence:
153 * ``engine`` is available, ``Task`` is submitted, but ``engine`` does not have ``Task``'s dependencies.
154 * ``engine`` gets necessary dependencies while no new Tasks are submitted or completed.
155 * now ``engine`` can run ``Task``, but a ``Task`` event is required for the ``TaskControler`` to try scheduling ``Task`` again.
156
157 ``spin`` is just an empty ping method to ensure that the Controller has scheduled all available Tasks, and should not be needed under most normal circumstances.
158
159 That covers the ``TaskClient``, a simple interface to the cluster. With this, the user can submit jobs (and abort if necessary), request their results, synchronize on arbitrary subsets of jobs.
160
161 .. _task: The Task Object
162
163 The Task Object
164 ===============
165
166 The ``Task`` is the basic object for describing a job. It can be used in a very simple manner, where the user just specifies a command string to be executed as the ``Task``. The usage of this first argument is exactly the same as the ``execute`` method of the ``MultiEngine`` (in fact, ``execute`` is called to run the code)::
167
168 In [1]: t = client.Task("a = str(id)")
169
170 This ``Task`` would run, and store the string representation of the ``id`` element in ``a`` in each worker's namespace, but it is fairly useless because the user does not know anything about the state of the ``worker`` on which it ran at the time of retrieving results. It is important that each ``Task`` not expect the state of the ``worker`` to persist after the ``Task`` is completed.
171 There are many different situations for using ``Task`` Farming, and the ``Task`` object has many attributes for use in customizing the ``Task`` behavior. All of a ``Task``'s attributes may be specified in the constructor, through keyword arguments, or after ``Task`` construction through attribute assignment.
172
173 Data Attributes
174 ***************
175 It is likely that the user may want to move data around before or after executing the ``Task``. We provide methods of sending data to initialize the worker's namespace, and specifying what data to bring back as the ``Task``'s results.
176
177 * pull = []
178 The obvious case is as above, where ``t`` would execute and store the result of ``myfunc`` in ``a``, it is likely that the user would want to bring ``a`` back to their namespace. This is done through the ``pull`` attribute. ``pull`` can be a string or list of strings, and it specifies the names of variables to be retrieved. The ``TaskResult`` object retrieved by ``get_task_result`` will have a dictionary of keys and values, and the ``Task``'s ``pull`` attribute determines what goes into it::
179
180 In [2]: t = client.Task("a = str(id)", pull = "a")
181
182 In [3]: t = client.Task("a = str(id)", pull = ["a", "id"])
183
184 * push = {}
185 A user might also want to initialize some data into the namespace before the code part of the ``Task`` is run. Enter ``push``. ``push`` is a dictionary of key/value pairs to be loaded from the user's namespace into the worker's immediately before execution::
186
187 In [4]: t = client.Task("a = f(submitted)", push=dict(submitted=time.time()), pull="a")
188
189 push and pull result directly in calling an ``engine``'s ``push`` and ``pull`` methods before and after ``Task`` execution respectively, and thus their api is the same.
190
191 Namespace Cleaning
192 ******************
193 When a user is running a large number of Tasks, it is likely that the namespace of the worker's could become cluttered. Some Tasks might be sensitive to clutter, while others might be known to cause namespace pollution. For these reasons, Tasks have two boolean attributes for cleaning up the namespace.
194
195 * ``clear_after``
196 if clear_after is specified ``True``, the worker on which the ``Task`` was run will be reset (via ``engine.reset``) upon completion of the ``Task``. This can be useful for both Tasks that produce clutter or Tasks whose intermediate data one might wish to be kept private::
197
198 In [5]: t = client.Task("a = range(1e10)", pull = "a",clear_after=True)
199
200
201 * ``clear_before``
202 as one might guess, clear_before is identical to ``clear_after``, but it takes place before the ``Task`` is run. This ensures that the ``Task`` runs on a fresh worker::
203
204 In [6]: t = client.Task("a = globals()", pull = "a",clear_before=True)
205
206 Of course, a user can both at the same time, ensuring that all workers are clear except when they are currently running a job. Both of these default to ``False``.
207
208 Fault Tolerance
209 ***************
210 It is possible that Tasks might fail, and there are a variety of reasons this could happen. One might be that the worker it was running on disconnected, and there was nothing wrong with the ``Task`` itself. With the fault tolerance attributes of the ``Task``, the user can specify how many times to resubmit the ``Task``, and what to do if it never succeeds.
211
212 * ``retries``
213 ``retries`` is an integer, specifying the number of times a ``Task`` is to be retried. It defaults to zero. It is often a good idea for this number to be 1 or 2, to protect the ``Task`` from disconnecting engines, but not a large number. If a ``Task`` is failing 100 times, there is probably something wrong with the ``Task``. The canonical bad example:
214
215 In [7]: t = client.Task("os.kill(os.getpid(), 9)", retries=99)
216
217 This would actually take down 100 workers.
218
219 * ``recovery_task``
220 ``recovery_task`` is another ``Task`` object, to be run in the event of the original ``Task`` still failing after running out of retries. Since ``recovery_task`` is another ``Task`` object, it can have its own ``recovery_task``. The chain of Tasks is limitless, except loops are not allowed (that would be bad!).
221
222 Dependencies
223 ************
224 Dependencies are the most powerful part of the ``Task`` farming system, because it allows the user to do some classification of the workers, and guide the ``Task`` distribution without meddling with the controller directly. It makes use of two objects - the ``Task``'s ``depend`` attribute, and the engine's ``properties``. See the `MultiEngine`_ reference for how to use engine properties. The engine properties api exists for extending IPython, allowing conditional execution and new controllers that make decisions based on properties of its engines. Currently the ``Task`` dependency is the only internal use of the properties api.
225
226 .. _MultiEngine: ./parallel_multiengine
227
228 The ``depend`` attribute of a ``Task`` must be a function of exactly one argument, the worker's properties dictionary, and it should return ``True`` if the ``Task`` should be allowed to run on the worker and ``False`` if not. The usage in the controller is fault tolerant, so exceptions raised by ``Task.depend`` will be ignored and functionally equivalent to always returning ``False``. Tasks`` with invalid ``depend`` functions will never be assigned to a worker::
229
230 In [8]: def dep(properties):
231 ... return properties["RAM"] > 2**32 # have at least 4GB
232 In [9]: t = client.Task("a = bigfunc()", depend=dep)
233
234 It is important to note that assignment of values to the properties dict is done entirely by the user, either locally (in the engine) using the EngineAPI, or remotely, through the ``MultiEngineClient``'s get/set_properties methods.
235
236
237
238
239
240
@@ -1,8 +1,8 b''
1 .. _ip1par:
1 .. _ip1par:
2
2
3 ======================================
3 ============================
4 Using IPython for parallel computing
4 Overview and getting started
5 ======================================
5 ============================
6
6
7 .. contents::
7 .. contents::
8
8
@@ -1,57 +1,115 b''
1 .. _parallelmultiengine:
1 .. _parallelmultiengine:
2
2
3 =================================
3 ===============================
4 IPython's MultiEngine interface
4 IPython's multiengine interface
5 =================================
5 ===============================
6
6
7 .. contents::
7 .. contents::
8
8
9 The MultiEngine interface represents one possible way of working with a
9 The multiengine interface represents one possible way of working with a set of
10 set of IPython engines. The basic idea behind the MultiEngine interface is
10 IPython engines. The basic idea behind the multiengine interface is that the
11 that the capabilities of each engine are explicitly exposed to the user.
11 capabilities of each engine are directly and explicitly exposed to the user.
12 Thus, in the MultiEngine interface, each engine is given an id that is
12 Thus, in the multiengine interface, each engine is given an id that is used to
13 used to identify the engine and give it work to do. This interface is very
13 identify the engine and give it work to do. This interface is very intuitive
14 intuitive and is designed with interactive usage in mind, and is thus the
14 and is designed with interactive usage in mind, and is thus the best place for
15 best place for new users of IPython to begin.
15 new users of IPython to begin.
16
16
17 Starting the IPython controller and engines
17 Starting the IPython controller and engines
18 ===========================================
18 ===========================================
19
19
20 To follow along with this tutorial, you will need to start the IPython
20 To follow along with this tutorial, you will need to start the IPython
21 controller and four IPython engines. The simplest way of doing this is to
21 controller and four IPython engines. The simplest way of doing this is to use
22 use the ``ipcluster`` command::
22 the :command:`ipcluster` command::
23
23
24 $ ipcluster -n 4
24 $ ipcluster -n 4
25
25
26 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
26 For more detailed information about starting the controller and engines, see
27 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
27
28
28 Creating a ``MultiEngineClient`` instance
29 Creating a ``MultiEngineClient`` instance
29 =========================================
30 =========================================
30
31
31 The first step is to import the IPython ``client`` module and then create a ``MultiEngineClient`` instance::
32 The first step is to import the IPython :mod:`IPython.kernel.client` module
33 and then create a :class:`MultiEngineClient` instance::
32
34
33 In [1]: from IPython.kernel import client
35 In [1]: from IPython.kernel import client
34
36
35 In [2]: mec = client.MultiEngineClient()
37 In [2]: mec = client.MultiEngineClient()
36
38
37 To make sure there are engines connected to the controller, use can get a list of engine ids::
39 This form assumes that the :file:`ipcontroller-mec.furl` is in the
40 :file:`~./ipython/security` directory on the client's host. If not, the
41 location of the ``.furl`` file must be given as an argument to the
42 constructor::
43
44 In[2]: mec = client.MultiEngineClient('/path/to/my/ipcontroller-mec.furl')
45
46 To make sure there are engines connected to the controller, use can get a list
47 of engine ids::
38
48
39 In [3]: mec.get_ids()
49 In [3]: mec.get_ids()
40 Out[3]: [0, 1, 2, 3]
50 Out[3]: [0, 1, 2, 3]
41
51
42 Here we see that there are four engines ready to do work for us.
52 Here we see that there are four engines ready to do work for us.
43
53
54 Quick and easy parallelism
55 ==========================
56
57 In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. The multiengine interface provides two simple ways of accomplishing this: a parallel version of :func:`map` and ``@parallel`` function decorator.
58
59 Parallel map
60 ------------
61
62 Python's builtin :func:`map` functions allows a function to be applied to a
63 sequence element-by-element. This type of code is typically trivial to
64 parallelize. In fact, the multiengine interface in IPython already has a
65 parallel version of :meth:`map` that works just like its serial counterpart::
66
67 In [63]: serial_result = map(lambda x:x**10, range(32))
68
69 In [64]: parallel_result = mec.map(lambda x:x**10, range(32))
70
71 In [65]: serial_result==parallel_result
72 Out[65]: True
73
74 .. note::
75
76 The multiengine interface version of :meth:`map` does not do any load
77 balancing. For a load balanced version, see the task interface.
78
79 .. seealso::
80
81 The :meth:`map` method has a number of options that can be controlled by
82 the :meth:`mapper` method. See its docstring for more information.
83
84 Parallel function decorator
85 ---------------------------
86
87 Parallel functions are just like normal function, but they can be called on sequences and *in parallel*. The multiengine interface provides a decorator that turns any Python function into a parallel function::
88
89 In [10]: @mec.parallel()
90 ....: def f(x):
91 ....: return 10.0*x**4
92 ....:
93
94 In [11]: f(range(32)) # this is done in parallel
95 Out[11]:
96 [0.0,10.0,160.0,...]
97
98 See the docstring for the :meth:`parallel` decorator for options.
99
44 Running Python commands
100 Running Python commands
45 =======================
101 =======================
46
102
47 The most basic type of operation that can be performed on the engines is to execute Python code. Executing Python code can be done in blocking or non-blocking mode (blocking is default) using the ``execute`` method.
103 The most basic type of operation that can be performed on the engines is to
104 execute Python code. Executing Python code can be done in blocking or
105 non-blocking mode (blocking is default) using the :meth:`execute` method.
48
106
49 Blocking execution
107 Blocking execution
50 ------------------
108 ------------------
51
109
52 In blocking mode, the ``MultiEngineClient`` object (called ``mec`` in
110 In blocking mode, the :class:`MultiEngineClient` object (called ``mec`` in
53 these examples) submits the command to the controller, which places the
111 these examples) submits the command to the controller, which places the
54 command in the engines' queues for execution. The ``execute`` call then
112 command in the engines' queues for execution. The :meth:`execute` call then
55 blocks until the engines are done executing the command::
113 blocks until the engines are done executing the command::
56
114
57 # The default is to run on all engines
115 # The default is to run on all engines
@@ -71,7 +129,8 b' blocks until the engines are done executing the command::'
71 [2] In [2]: b=10
129 [2] In [2]: b=10
72 [3] In [2]: b=10
130 [3] In [2]: b=10
73
131
74 Python commands can be executed on specific engines by calling execute using the ``targets`` keyword argument::
132 Python commands can be executed on specific engines by calling execute using
133 the ``targets`` keyword argument::
75
134
76 In [6]: mec.execute('c=a+b',targets=[0,2])
135 In [6]: mec.execute('c=a+b',targets=[0,2])
77 Out[6]:
136 Out[6]:
@@ -102,7 +161,9 b' Python commands can be executed on specific engines by calling execute using the'
102 [3] In [4]: print c
161 [3] In [4]: print c
103 [3] Out[4]: -5
162 [3] Out[4]: -5
104
163
105 This example also shows one of the most important things about the IPython engines: they have a persistent user namespaces. The ``execute`` method returns a Python ``dict`` that contains useful information::
164 This example also shows one of the most important things about the IPython
165 engines: they have a persistent user namespaces. The :meth:`execute` method
166 returns a Python ``dict`` that contains useful information::
106
167
107 In [9]: result_dict = mec.execute('d=10; print d')
168 In [9]: result_dict = mec.execute('d=10; print d')
108
169
@@ -118,10 +179,12 b' This example also shows one of the most important things about the IPython engin'
118 Non-blocking execution
179 Non-blocking execution
119 ----------------------
180 ----------------------
120
181
121 In non-blocking mode, ``execute`` submits the command to be executed and then returns a
182 In non-blocking mode, :meth:`execute` submits the command to be executed and
122 ``PendingResult`` object immediately. The ``PendingResult`` object gives you a way of getting a
183 then returns a :class:`PendingResult` object immediately. The
123 result at a later time through its ``get_result`` method or ``r`` attribute. This allows you to
184 :class:`PendingResult` object gives you a way of getting a result at a later
124 quickly submit long running commands without blocking your local Python/IPython session::
185 time through its :meth:`get_result` method or :attr:`r` attribute. This allows
186 you to quickly submit long running commands without blocking your local
187 Python/IPython session::
125
188
126 # In blocking mode
189 # In blocking mode
127 In [6]: mec.execute('import time')
190 In [6]: mec.execute('import time')
@@ -159,7 +222,10 b' quickly submit long running commands without blocking your local Python/IPython '
159 [2] In [3]: time.sleep(10)
222 [2] In [3]: time.sleep(10)
160 [3] In [3]: time.sleep(10)
223 [3] In [3]: time.sleep(10)
161
224
162 Often, it is desirable to wait until a set of ``PendingResult`` objects are done. For this, there is a the method ``barrier``. This method takes a tuple of ``PendingResult`` objects and blocks until all of the associated results are ready::
225 Often, it is desirable to wait until a set of :class:`PendingResult` objects
226 are done. For this, there is a the method :meth:`barrier`. This method takes a
227 tuple of :class:`PendingResult` objects and blocks until all of the associated
228 results are ready::
163
229
164 In [72]: mec.block=False
230 In [72]: mec.block=False
165
231
@@ -182,14 +248,16 b' Often, it is desirable to wait until a set of ``PendingResult`` objects are done'
182 The ``block`` and ``targets`` keyword arguments and attributes
248 The ``block`` and ``targets`` keyword arguments and attributes
183 --------------------------------------------------------------
249 --------------------------------------------------------------
184
250
185 Most commands in the multiengine interface (like ``execute``) accept ``block`` and ``targets``
251 Most methods in the multiengine interface (like :meth:`execute`) accept
186 as keyword arguments. As we have seen above, these keyword arguments control the blocking mode
252 ``block`` and ``targets`` as keyword arguments. As we have seen above, these
187 and which engines the command is applied to. The ``MultiEngineClient`` class also has ``block``
253 keyword arguments control the blocking mode and which engines the command is
188 and ``targets`` attributes that control the default behavior when the keyword arguments are not
254 applied to. The :class:`MultiEngineClient` class also has :attr:`block` and
189 provided. Thus the following logic is used for ``block`` and ``targets``:
255 :attr:`targets` attributes that control the default behavior when the keyword
256 arguments are not provided. Thus the following logic is used for :attr:`block`
257 and :attr:`targets`:
190
258
191 * If no keyword argument is provided, the instance attributes are used.
259 * If no keyword argument is provided, the instance attributes are used.
192 * Keyword argument, if provided override the instance attributes.
260 * Keyword argument, if provided override the instance attributes.
193
261
194 The following examples demonstrate how to use the instance attributes::
262 The following examples demonstrate how to use the instance attributes::
195
263
@@ -225,14 +293,19 b' The following examples demonstrate how to use the instance attributes::'
225 [3] In [6]: b=10; print b
293 [3] In [6]: b=10; print b
226 [3] Out[6]: 10
294 [3] Out[6]: 10
227
295
228 The ``block`` and ``targets`` instance attributes also determine the behavior of the parallel
296 The :attr:`block` and :attr:`targets` instance attributes also determine the
229 magic commands...
297 behavior of the parallel magic commands.
230
298
231
299
232 Parallel magic commands
300 Parallel magic commands
233 -----------------------
301 -----------------------
234
302
235 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to ``execute`` and ``get_result``. The ``%px`` magic executes a single Python command on the engines specified by the `magicTargets``targets` attribute of the ``MultiEngineClient`` instance (by default this is 'all')::
303 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
304 that make it more pleasant to execute Python commands on the engines
305 interactively. These are simply shortcuts to :meth:`execute` and
306 :meth:`get_result`. The ``%px`` magic executes a single Python command on the
307 engines specified by the :attr:`targets` attribute of the
308 :class:`MultiEngineClient` instance (by default this is ``'all'``)::
236
309
237 # Make this MultiEngineClient active for parallel magic commands
310 # Make this MultiEngineClient active for parallel magic commands
238 In [23]: mec.activate()
311 In [23]: mec.activate()
@@ -277,7 +350,9 b' We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) t'
277 [3] In [9]: print numpy.linalg.eigvals(a)
350 [3] In [9]: print numpy.linalg.eigvals(a)
278 [3] Out[9]: [ 0.83664764 -0.25602658]
351 [3] Out[9]: [ 0.83664764 -0.25602658]
279
352
280 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last command executed on each engine. It is simply a shortcut to the ``get_result`` method::
353 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
354 command executed on each engine. It is simply a shortcut to the
355 :meth:`get_result` method::
281
356
282 In [29]: %result
357 In [29]: %result
283 Out[29]:
358 Out[29]:
@@ -294,7 +369,8 b' The ``%result`` magic gets and prints the stdin/stdout/stderr of the last comman'
294 [3] In [9]: print numpy.linalg.eigvals(a)
369 [3] In [9]: print numpy.linalg.eigvals(a)
295 [3] Out[9]: [ 0.83664764 -0.25602658]
370 [3] Out[9]: [ 0.83664764 -0.25602658]
296
371
297 The ``%autopx`` magic switches to a mode where everything you type is executed on the engines given by the ``targets`` attribute::
372 The ``%autopx`` magic switches to a mode where everything you type is executed
373 on the engines given by the :attr:`targets` attribute::
298
374
299 In [30]: mec.block=False
375 In [30]: mec.block=False
300
376
@@ -335,51 +411,19 b' The ``%autopx`` magic switches to a mode where everything you type is executed o'
335 [3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
411 [3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
336 [3] Out[12]: Average max eigenvalue is: 10.1158837784
412 [3] Out[12]: Average max eigenvalue is: 10.1158837784
337
413
338 Using the ``with`` statement of Python 2.5
339 ------------------------------------------
340
414
341 Python 2.5 introduced the ``with`` statement. The ``MultiEngineClient`` can be used with the ``with`` statement to execute a block of code on the engines indicated by the ``targets`` attribute::
415 Moving Python objects around
416 ============================
342
417
343 In [3]: with mec:
418 In addition to executing code on engines, you can transfer Python objects to
344 ...: client.remote() # Required so the following code is not run locally
419 and from your IPython session and the engines. In IPython, these operations
345 ...: a = 10
420 are called :meth:`push` (sending an object to the engines) and :meth:`pull`
346 ...: b = 30
421 (getting an object from the engines).
347 ...: c = a+b
348 ...:
349 ...:
350
351 In [4]: mec.get_result()
352 Out[4]:
353 <Results List>
354 [0] In [1]: a = 10
355 b = 30
356 c = a+b
357
358 [1] In [1]: a = 10
359 b = 30
360 c = a+b
361
362 [2] In [1]: a = 10
363 b = 30
364 c = a+b
365
366 [3] In [1]: a = 10
367 b = 30
368 c = a+b
369
370 This is basically another way of calling execute, but one with allows you to avoid writing code in strings. When used in this way, the attributes ``targets`` and ``block`` are used to control how the code is executed. For now, if you run code in non-blocking mode you won't have access to the ``PendingResult``.
371
372 Moving Python object around
373 ===========================
374
375 In addition to executing code on engines, you can transfer Python objects to and from your
376 IPython session and the engines. In IPython, these operations are called ``push`` (sending an
377 object to the engines) and ``pull`` (getting an object from the engines).
378
422
379 Basic push and pull
423 Basic push and pull
380 -------------------
424 -------------------
381
425
382 Here are some examples of how you use ``push`` and ``pull``::
426 Here are some examples of how you use :meth:`push` and :meth:`pull`::
383
427
384 In [38]: mec.push(dict(a=1.03234,b=3453))
428 In [38]: mec.push(dict(a=1.03234,b=3453))
385 Out[38]: [None, None, None, None]
429 Out[38]: [None, None, None, None]
@@ -415,7 +459,8 b' Here are some examples of how you use ``push`` and ``pull``::'
415 [3] In [13]: print c
459 [3] In [13]: print c
416 [3] Out[13]: speed
460 [3] Out[13]: speed
417
461
418 In non-blocking mode ``push`` and ``pull`` also return ``PendingResult`` objects::
462 In non-blocking mode :meth:`push` and :meth:`pull` also return
463 :class:`PendingResult` objects::
419
464
420 In [47]: mec.block=False
465 In [47]: mec.block=False
421
466
@@ -428,7 +473,11 b' In non-blocking mode ``push`` and ``pull`` also return ``PendingResult`` objects'
428 Push and pull for functions
473 Push and pull for functions
429 ---------------------------
474 ---------------------------
430
475
431 Functions can also be pushed and pulled using ``push_function`` and ``pull_function``::
476 Functions can also be pushed and pulled using :meth:`push_function` and
477 :meth:`pull_function`::
478
479
480 In [52]: mec.block=True
432
481
433 In [53]: def f(x):
482 In [53]: def f(x):
434 ....: return 2.0*x**4
483 ....: return 2.0*x**4
@@ -466,7 +515,10 b' Functions can also be pushed and pulled using ``push_function`` and ``pull_funct'
466 Dictionary interface
515 Dictionary interface
467 --------------------
516 --------------------
468
517
469 As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class implements some of the Python dictionary interface. This make the remote namespaces of the engines appear as a local dictionary. Underneath, this uses ``push`` and ``pull``::
518 As a shorthand to :meth:`push` and :meth:`pull`, the
519 :class:`MultiEngineClient` class implements some of the Python dictionary
520 interface. This make the remote namespaces of the engines appear as a local
521 dictionary. Underneath, this uses :meth:`push` and :meth:`pull`::
470
522
471 In [50]: mec.block=True
523 In [50]: mec.block=True
472
524
@@ -478,11 +530,13 b' As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class impleme'
478 Scatter and gather
530 Scatter and gather
479 ------------------
531 ------------------
480
532
481 Sometimes it is useful to partition a sequence and push the partitions to different engines. In
533 Sometimes it is useful to partition a sequence and push the partitions to
482 MPI language, this is know as scatter/gather and we follow that terminology. However, it is
534 different engines. In MPI language, this is know as scatter/gather and we
483 important to remember that in IPython ``scatter`` is from the interactive IPython session to
535 follow that terminology. However, it is important to remember that in
484 the engines and ``gather`` is from the engines back to the interactive IPython session. For
536 IPython's :class:`MultiEngineClient` class, :meth:`scatter` is from the
485 scatter/gather operations between engines, MPI should be used::
537 interactive IPython session to the engines and :meth:`gather` is from the
538 engines back to the interactive IPython session. For scatter/gather operations
539 between engines, MPI should be used::
486
540
487 In [58]: mec.scatter('a',range(16))
541 In [58]: mec.scatter('a',range(16))
488 Out[58]: [None, None, None, None]
542 Out[58]: [None, None, None, None]
@@ -510,24 +564,12 b' scatter/gather operations between engines, MPI should be used::'
510 Other things to look at
564 Other things to look at
511 =======================
565 =======================
512
566
513 Parallel map
514 ------------
515
516 Python's builtin ``map`` functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, the MultiEngine interface in IPython already has a parallel version of ``map`` that works just like its serial counterpart::
517
518 In [63]: serial_result = map(lambda x:x**10, range(32))
519
520 In [64]: parallel_result = mec.map(lambda x:x**10, range(32))
521
522 In [65]: serial_result==parallel_result
523 Out[65]: True
524
525 As you would expect, the parallel version of ``map`` is also influenced by the ``block`` and ``targets`` keyword arguments and attributes.
526
527 How to do parallel list comprehensions
567 How to do parallel list comprehensions
528 --------------------------------------
568 --------------------------------------
529
569
530 In many cases list comprehensions are nicer than using the map function. While we don't have fully parallel list comprehensions, it is simple to get the basic effect using ``scatter`` and ``gather``::
570 In many cases list comprehensions are nicer than using the map function. While
571 we don't have fully parallel list comprehensions, it is simple to get the
572 basic effect using :meth:`scatter` and :meth:`gather`::
531
573
532 In [66]: mec.scatter('x',range(64))
574 In [66]: mec.scatter('x',range(64))
533 Out[66]: [None, None, None, None]
575 Out[66]: [None, None, None, None]
@@ -547,10 +589,16 b' In many cases list comprehensions are nicer than using the map function. While '
547 In [69]: print y
589 In [69]: print y
548 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
590 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
549
591
550 Parallel Exceptions
592 Parallel exceptions
551 -------------------
593 -------------------
552
594
553 In the MultiEngine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, the MultiEngine interface has a ``CompositeError`` exception class that will be raised in most cases. The ``CompositeError`` class is a special type of exception that wraps one or more other types of exceptions. Here is how it works::
595 In the multiengine interface, parallel commands can raise Python exceptions,
596 just like serial commands. But, it is a little subtle, because a single
597 parallel command can actually raise multiple exceptions (one for each engine
598 the command was run on). To express this idea, the MultiEngine interface has a
599 :exc:`CompositeError` exception class that will be raised in most cases. The
600 :exc:`CompositeError` class is a special type of exception that wraps one or
601 more other types of exceptions. Here is how it works::
554
602
555 In [76]: mec.block=True
603 In [76]: mec.block=True
556
604
@@ -580,7 +628,7 b' In the MultiEngine interface, parallel commands can raise Python exceptions, jus'
580 [2:execute]: ZeroDivisionError: integer division or modulo by zero
628 [2:execute]: ZeroDivisionError: integer division or modulo by zero
581 [3:execute]: ZeroDivisionError: integer division or modulo by zero
629 [3:execute]: ZeroDivisionError: integer division or modulo by zero
582
630
583 Notice how the error message printed when ``CompositeError`` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions::
631 Notice how the error message printed when :exc:`CompositeError` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions::
584
632
585 In [80]: try:
633 In [80]: try:
586 ....: mec.execute('1/0')
634 ....: mec.execute('1/0')
@@ -602,7 +650,9 b' Notice how the error message printed when ``CompositeError`` is raised has infor'
602
650
603 ZeroDivisionError: integer division or modulo by zero
651 ZeroDivisionError: integer division or modulo by zero
604
652
605 If you are working in IPython, you can simple type ``%debug`` after one of these ``CompositeError`` is raised, and inspect the exception instance::
653 If you are working in IPython, you can simple type ``%debug`` after one of
654 these :exc:`CompositeError` exceptions is raised, and inspect the exception
655 instance::
606
656
607 In [81]: mec.execute('1/0')
657 In [81]: mec.execute('1/0')
608 ---------------------------------------------------------------------------
658 ---------------------------------------------------------------------------
@@ -679,6 +729,11 b' If you are working in IPython, you can simple type ``%debug`` after one of these'
679
729
680 ZeroDivisionError: integer division or modulo by zero
730 ZeroDivisionError: integer division or modulo by zero
681
731
732 .. note::
733
734 The above example appears to be broken right now because of a change in
735 how we are using Twisted.
736
682 All of this same error handling magic even works in non-blocking mode::
737 All of this same error handling magic even works in non-blocking mode::
683
738
684 In [83]: mec.block=False
739 In [83]: mec.block=False
@@ -1,240 +1,93 b''
1 .. _paralleltask:
1 .. _paralleltask:
2
2
3 =================================
3 ==========================
4 The IPython Task interface
4 The IPython task interface
5 =================================
5 ==========================
6
6
7 .. contents::
7 .. contents::
8
8
9 The ``Task`` interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the ``MultiEngine`` interface, in the ``Task`` interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. Best of all the user can use both of these interfaces at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the ``Task`` interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign Tasks to engines explicitly.
9 The task interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the multiengine interface, in the task interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful.
10
11 Best of all the user can use both of these interfaces running at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the task interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign tasks to engines explicitly.
10
12
11 Starting the IPython controller and engines
13 Starting the IPython controller and engines
12 ===========================================
14 ===========================================
13
15
14 To follow along with this tutorial, the user will need to start the IPython
16 To follow along with this tutorial, you will need to start the IPython
15 controller and four IPython engines. The simplest way of doing this is to
17 controller and four IPython engines. The simplest way of doing this is to use
16 use the ``ipcluster`` command::
18 the :command:`ipcluster` command::
17
19
18 $ ipcluster -n 4
20 $ ipcluster -n 4
19
21
20 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
22 For more detailed information about starting the controller and engines, see
23 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
24
25 Creating a ``TaskClient`` instance
26 =========================================
27
28 The first step is to import the IPython :mod:`IPython.kernel.client` module
29 and then create a :class:`TaskClient` instance::
30
31 In [1]: from IPython.kernel import client
32
33 In [2]: tc = client.TaskClient()
34
35 This form assumes that the :file:`ipcontroller-tc.furl` is in the
36 :file:`~./ipython/security` directory on the client's host. If not, the
37 location of the ``.furl`` file must be given as an argument to the
38 constructor::
39
40 In[2]: mec = client.TaskClient('/path/to/my/ipcontroller-tc.furl')
41
42 Quick and easy parallelism
43 ==========================
44
45 In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. Like the multiengine interface, the task interface provides two simple ways of accomplishing this: a parallel version of :func:`map` and ``@parallel`` function decorator. However, the verions in the task interface have one important difference: they are dynamically load balanced. Thus, if the execution time per item varies significantly, you should use the versions in the task interface.
46
47 Parallel map
48 ------------
49
50 The parallel :meth:`map` in the task interface is similar to that in the multiengine interface::
51
52 In [63]: serial_result = map(lambda x:x**10, range(32))
53
54 In [64]: parallel_result = tc.map(lambda x:x**10, range(32))
55
56 In [65]: serial_result==parallel_result
57 Out[65]: True
58
59 Parallel function decorator
60 ---------------------------
61
62 Parallel functions are just like normal function, but they can be called on sequences and *in parallel*. The multiengine interface provides a decorator that turns any Python function into a parallel function::
21
63
22 The magic here is that this single controller and set of engines is running both the MultiEngine and ``Task`` interfaces simultaneously.
64 In [10]: @tc.parallel()
65 ....: def f(x):
66 ....: return 10.0*x**4
67 ....:
23
68
24 QuickStart Task Farming
69 In [11]: f(range(32)) # this is done in parallel
25 =======================
70 Out[11]:
71 [0.0,10.0,160.0,...]
26
72
27 First, a quick example of how to start running the most basic Tasks.
73 More details
28 The first step is to import the IPython ``client`` module and then create a ``TaskClient`` instance::
74 ============
29
30 In [1]: from IPython.kernel import client
31
32 In [2]: tc = client.TaskClient()
33
75
34 Then the user wrap the commands the user want to run in Tasks::
76 The :class:`TaskClient` has many more powerful features that allow quite a bit of flexibility in how tasks are defined and run. The next places to look are in the following classes:
35
77
36 In [3]: tasklist = []
78 * :class:`IPython.kernel.client.TaskClient`
37 In [4]: for n in range(1000):
79 * :class:`IPython.kernel.client.StringTask`
38 ... tasklist.append(client.Task("a = %i"%n, pull="a"))
80 * :class:`IPython.kernel.client.MapTask`
39
81
40 The first argument of the ``Task`` constructor is a string, the command to be executed. The most important optional keyword argument is ``pull``, which can be a string or list of strings, and it specifies the variable names to be saved as results of the ``Task``.
82 The following is an overview of how to use these classes together:
41
83
42 Next, the user need to submit the Tasks to the ``TaskController`` with the ``TaskClient``::
84 1. Create a :class:`TaskClient`.
85 2. Create one or more instances of :class:`StringTask` or :class:`MapTask`
86 to define your tasks.
87 3. Submit your tasks to using the :meth:`run` method of your
88 :class:`TaskClient` instance.
89 4. Use :meth:`TaskClient.get_task_result` to get the results of the
90 tasks.
43
91
44 In [5]: taskids = [ tc.run(t) for t in tasklist ]
92 We are in the process of developing more detailed information about the task interface. For now, the docstrings of the :class:`TaskClient`, :class:`StringTask` and :class:`MapTask` classes should be consulted.
45
93
46 This will give the user a list of the TaskIDs used by the controller to keep track of the Tasks and their results. Now at some point the user are going to want to get those results back. The ``barrier`` method allows the user to wait for the Tasks to finish running::
47
48 In [6]: tc.barrier(taskids)
49
50 This command will block until all the Tasks in ``taskids`` have finished. Now, the user probably want to look at the user's results::
51
52 In [7]: task_results = [ tc.get_task_result(taskid) for taskid in taskids ]
53
54 Now the user have a list of ``TaskResult`` objects, which have the actual result as a dictionary, but also keep track of some useful metadata about the ``Task``::
55
56 In [8]: tr = ``Task``_results[73]
57
58 In [9]: tr
59 Out[9]: ``TaskResult``[ID:73]:{'a':73}
60
61 In [10]: tr.engineid
62 Out[10]: 1
63
64 In [11]: tr.submitted, tr.completed, tr.duration
65 Out[11]: ("2008/03/08 03:41:42", "2008/03/08 03:41:44", 2.12345)
66
67 The actual results are stored in a dictionary, ``tr.results``, and a namespace object ``tr.ns`` which accesses the result keys by attribute::
68
69 In [12]: tr.results['a']
70 Out[12]: 73
71
72 In [13]: tr.ns.a
73 Out[13]: 73
74
75 That should cover the basics of running simple Tasks. There are several more powerful things the user can do with Tasks covered later. The most useful probably being using a ``MutiEngineClient`` interface to initialize all the engines with the import dependencies necessary to run the user's Tasks.
76
77 There are many options for running and managing Tasks. The best way to learn further about the ``Task`` interface is to study the examples in ``docs/examples``. If the user do so and learn a lots about this interface, we encourage the user to expand this documentation about the ``Task`` system.
78
79 Overview of the Task System
80 ===========================
81
82 The user's view of the ``Task`` system has three basic objects: The ``TaskClient``, the ``Task``, and the ``TaskResult``. The names of these three objects well indicate their role.
83
84 The ``TaskClient`` is the user's ``Task`` farming connection to the IPython cluster. Unlike the ``MultiEngineClient``, the ``TaskControler`` handles all the scheduling and distribution of work, so the ``TaskClient`` has no notion of engines, it just submits Tasks and requests their results. The Tasks are described as ``Task`` objects, and their results are wrapped in ``TaskResult`` objects. Thus, there are very few necessary methods for the user to manage.
85
86 Inside the task system is a Scheduler object, which assigns tasks to workers. The default scheduler is a simple FIFO queue. Subclassing the Scheduler should be easy, just implementing your own priority system.
87
88 The TaskClient
89 ==============
90
91 The ``TaskClient`` is the object the user use to connect to the ``Controller`` that is managing the user's Tasks. It is the analog of the ``MultiEngineClient`` for the standard IPython multiplexing interface. As with all client interfaces, the first step is to import the IPython Client Module::
92
93 In [1]: from IPython.kernel import client
94
95 Just as with the ``MultiEngineClient``, the user create the ``TaskClient`` with a tuple, containing the ip-address and port of the ``Controller``. the ``client`` module conveniently has the default address of the ``Task`` interface of the controller. Creating a default ``TaskClient`` object would be done with this::
96
97 In [2]: tc = client.TaskClient(client.default_task_address)
98
99 or, if the user want to specify a non default location of the ``Controller``, the user can specify explicitly::
100
101 In [3]: tc = client.TaskClient(("192.168.1.1", 10113))
102
103 As discussed earlier, the ``TaskClient`` only has a few basic methods.
104
105 * ``tc.run(task)``
106 ``run`` is the method by which the user submits Tasks. It takes exactly one argument, a ``Task`` object. All the advanced control of ``Task`` behavior is handled by properties of the ``Task`` object, rather than the submission command, so they will be discussed later in the `Task`_ section. ``run`` returns an integer, the ``Task``ID by which the ``Task`` and its results can be tracked and retrieved::
107
108 In [4]: ``Task``ID = tc.run(``Task``)
109
110 * ``tc.get_task_result(taskid, block=``False``)``
111 ``get_task_result`` is the method by which results are retrieved. It takes a single integer argument, the ``Task``ID`` of the result the user wish to retrieve. ``get_task_result`` also takes a keyword argument ``block``. ``block`` specifies whether the user actually want to wait for the result. If ``block`` is false, as it is by default, ``get_task_result`` will return immediately. If the ``Task`` has completed, it will return the ``TaskResult`` object for that ``Task``. But if the ``Task`` has not completed, it will return ``None``. If the user specify ``block=``True``, then ``get_task_result`` will wait for the ``Task`` to complete, and always return the ``TaskResult`` for the requested ``Task``.
112 * ``tc.barrier(taskid(s))``
113 ``barrier`` is a synchronization method. It takes exactly one argument, a ``Task``ID or list of taskIDs. ``barrier`` will block until all the specified Tasks have completed. In practice, a barrier is often called between the ``Task`` submission section of the code and the result gathering section::
114
115 In [5]: taskIDs = [ tc.run(``Task``) for ``Task`` in myTasks ]
116
117 In [6]: tc.get_task_result(taskIDs[-1]) is None
118 Out[6]: ``True``
119
120 In [7]: tc.barrier(``Task``ID)
121
122 In [8]: results = [ tc.get_task_result(tid) for tid in taskIDs ]
123
124 * ``tc.queue_status(verbose=``False``)``
125 ``queue_status`` is a method for querying the state of the ``TaskControler``. ``queue_status`` returns a dict of the form::
126
127 {'scheduled': Tasks that have been submitted but yet run
128 'pending' : Tasks that are currently running
129 'succeeded': Tasks that have completed successfully
130 'failed' : Tasks that have finished with a failure
131 }
132
133 if @verbose is not specified (or is ``False``), then the values of the dict are integers - the number of Tasks in each state. if @verbose is ``True``, then each element in the dict is a list of the taskIDs in that state::
134
135 In [8]: tc.queue_status()
136 Out[8]: {'scheduled': 4,
137 'pending' : 2,
138 'succeeded': 5,
139 'failed' : 1
140 }
141
142 In [9]: tc.queue_status(verbose=True)
143 Out[9]: {'scheduled': [8,9,10,11],
144 'pending' : [6,7],
145 'succeeded': [0,1,2,4,5],
146 'failed' : [3]
147 }
148
149 * ``tc.abort(taskid)``
150 ``abort`` allows the user to abort Tasks that have already been submitted. ``abort`` will always return immediately. If the ``Task`` has completed, ``abort`` will raise an ``IndexError ``Task`` Already Completed``. An obvious case for ``abort`` would be where the user submits a long-running ``Task`` with a number of retries (see ``Task``_ section for how to specify retries) in an interactive session, but realizes there has been a typo. The user can then abort the ``Task``, preventing certain failures from cluttering up the queue. It can also be used for parallel search-type problems, where only one ``Task`` will give the solution, so once the user find the solution, the user would want to abort all remaining Tasks to prevent wasted work.
151 * ``tc.spin()``
152 ``spin`` simply triggers the scheduler in the ``TaskControler``. Under most normal circumstances, this will do nothing. The primary known usage case involves the ``Task`` dependency (see `Dependencies`_). The dependency is a function of an Engine's ``properties``, but changing the ``properties`` via the ``MutliEngineClient`` does not trigger a reschedule event. The main example case for this requires the following event sequence:
153 * ``engine`` is available, ``Task`` is submitted, but ``engine`` does not have ``Task``'s dependencies.
154 * ``engine`` gets necessary dependencies while no new Tasks are submitted or completed.
155 * now ``engine`` can run ``Task``, but a ``Task`` event is required for the ``TaskControler`` to try scheduling ``Task`` again.
156
157 ``spin`` is just an empty ping method to ensure that the Controller has scheduled all available Tasks, and should not be needed under most normal circumstances.
158
159 That covers the ``TaskClient``, a simple interface to the cluster. With this, the user can submit jobs (and abort if necessary), request their results, synchronize on arbitrary subsets of jobs.
160
161 .. _task: The Task Object
162
163 The Task Object
164 ===============
165
166 The ``Task`` is the basic object for describing a job. It can be used in a very simple manner, where the user just specifies a command string to be executed as the ``Task``. The usage of this first argument is exactly the same as the ``execute`` method of the ``MultiEngine`` (in fact, ``execute`` is called to run the code)::
167
168 In [1]: t = client.Task("a = str(id)")
169
170 This ``Task`` would run, and store the string representation of the ``id`` element in ``a`` in each worker's namespace, but it is fairly useless because the user does not know anything about the state of the ``worker`` on which it ran at the time of retrieving results. It is important that each ``Task`` not expect the state of the ``worker`` to persist after the ``Task`` is completed.
171 There are many different situations for using ``Task`` Farming, and the ``Task`` object has many attributes for use in customizing the ``Task`` behavior. All of a ``Task``'s attributes may be specified in the constructor, through keyword arguments, or after ``Task`` construction through attribute assignment.
172
173 Data Attributes
174 ***************
175 It is likely that the user may want to move data around before or after executing the ``Task``. We provide methods of sending data to initialize the worker's namespace, and specifying what data to bring back as the ``Task``'s results.
176
177 * pull = []
178 The obvious case is as above, where ``t`` would execute and store the result of ``myfunc`` in ``a``, it is likely that the user would want to bring ``a`` back to their namespace. This is done through the ``pull`` attribute. ``pull`` can be a string or list of strings, and it specifies the names of variables to be retrieved. The ``TaskResult`` object retrieved by ``get_task_result`` will have a dictionary of keys and values, and the ``Task``'s ``pull`` attribute determines what goes into it::
179
180 In [2]: t = client.Task("a = str(id)", pull = "a")
181
182 In [3]: t = client.Task("a = str(id)", pull = ["a", "id"])
183
184 * push = {}
185 A user might also want to initialize some data into the namespace before the code part of the ``Task`` is run. Enter ``push``. ``push`` is a dictionary of key/value pairs to be loaded from the user's namespace into the worker's immediately before execution::
186
187 In [4]: t = client.Task("a = f(submitted)", push=dict(submitted=time.time()), pull="a")
188
189 push and pull result directly in calling an ``engine``'s ``push`` and ``pull`` methods before and after ``Task`` execution respectively, and thus their api is the same.
190
191 Namespace Cleaning
192 ******************
193 When a user is running a large number of Tasks, it is likely that the namespace of the worker's could become cluttered. Some Tasks might be sensitive to clutter, while others might be known to cause namespace pollution. For these reasons, Tasks have two boolean attributes for cleaning up the namespace.
194
195 * ``clear_after``
196 if clear_after is specified ``True``, the worker on which the ``Task`` was run will be reset (via ``engine.reset``) upon completion of the ``Task``. This can be useful for both Tasks that produce clutter or Tasks whose intermediate data one might wish to be kept private::
197
198 In [5]: t = client.Task("a = range(1e10)", pull = "a",clear_after=True)
199
200
201 * ``clear_before``
202 as one might guess, clear_before is identical to ``clear_after``, but it takes place before the ``Task`` is run. This ensures that the ``Task`` runs on a fresh worker::
203
204 In [6]: t = client.Task("a = globals()", pull = "a",clear_before=True)
205
206 Of course, a user can both at the same time, ensuring that all workers are clear except when they are currently running a job. Both of these default to ``False``.
207
208 Fault Tolerance
209 ***************
210 It is possible that Tasks might fail, and there are a variety of reasons this could happen. One might be that the worker it was running on disconnected, and there was nothing wrong with the ``Task`` itself. With the fault tolerance attributes of the ``Task``, the user can specify how many times to resubmit the ``Task``, and what to do if it never succeeds.
211
212 * ``retries``
213 ``retries`` is an integer, specifying the number of times a ``Task`` is to be retried. It defaults to zero. It is often a good idea for this number to be 1 or 2, to protect the ``Task`` from disconnecting engines, but not a large number. If a ``Task`` is failing 100 times, there is probably something wrong with the ``Task``. The canonical bad example:
214
215 In [7]: t = client.Task("os.kill(os.getpid(), 9)", retries=99)
216
217 This would actually take down 100 workers.
218
219 * ``recovery_task``
220 ``recovery_task`` is another ``Task`` object, to be run in the event of the original ``Task`` still failing after running out of retries. Since ``recovery_task`` is another ``Task`` object, it can have its own ``recovery_task``. The chain of Tasks is limitless, except loops are not allowed (that would be bad!).
221
222 Dependencies
223 ************
224 Dependencies are the most powerful part of the ``Task`` farming system, because it allows the user to do some classification of the workers, and guide the ``Task`` distribution without meddling with the controller directly. It makes use of two objects - the ``Task``'s ``depend`` attribute, and the engine's ``properties``. See the `MultiEngine`_ reference for how to use engine properties. The engine properties api exists for extending IPython, allowing conditional execution and new controllers that make decisions based on properties of its engines. Currently the ``Task`` dependency is the only internal use of the properties api.
225
226 .. _MultiEngine: ./parallel_multiengine
227
228 The ``depend`` attribute of a ``Task`` must be a function of exactly one argument, the worker's properties dictionary, and it should return ``True`` if the ``Task`` should be allowed to run on the worker and ``False`` if not. The usage in the controller is fault tolerant, so exceptions raised by ``Task.depend`` will be ignored and functionally equivalent to always returning ``False``. Tasks`` with invalid ``depend`` functions will never be assigned to a worker::
229
230 In [8]: def dep(properties):
231 ... return properties["RAM"] > 2**32 # have at least 4GB
232 In [9]: t = client.Task("a = bigfunc()", depend=dep)
233
234 It is important to note that assignment of values to the properties dict is done entirely by the user, either locally (in the engine) using the EngineAPI, or remotely, through the ``MultiEngineClient``'s get/set_properties methods.
235
236
237
238
239
240
General Comments 0
You need to be logged in to leave comments. Login now