##// END OF EJS Templates
Updated the multiengine and task interface documentation....
Brian Granger -
Show More
@@ -0,0 +1,240 b''
1 .. _paralleltask:
2
3 ==========================
4 The IPython task interface
5 ==========================
6
7 .. contents::
8
9 The ``Task`` interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the ``MultiEngine`` interface, in the ``Task`` interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. Best of all the user can use both of these interfaces at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the ``Task`` interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign Tasks to engines explicitly.
10
11 Starting the IPython controller and engines
12 ===========================================
13
14 To follow along with this tutorial, the user will need to start the IPython
15 controller and four IPython engines. The simplest way of doing this is to
16 use the ``ipcluster`` command::
17
18 $ ipcluster -n 4
19
20 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
21
22 The magic here is that this single controller and set of engines is running both the MultiEngine and ``Task`` interfaces simultaneously.
23
24 QuickStart Task Farming
25 =======================
26
27 First, a quick example of how to start running the most basic Tasks.
28 The first step is to import the IPython ``client`` module and then create a ``TaskClient`` instance::
29
30 In [1]: from IPython.kernel import client
31
32 In [2]: tc = client.TaskClient()
33
34 Then the user wrap the commands the user want to run in Tasks::
35
36 In [3]: tasklist = []
37 In [4]: for n in range(1000):
38 ... tasklist.append(client.Task("a = %i"%n, pull="a"))
39
40 The first argument of the ``Task`` constructor is a string, the command to be executed. The most important optional keyword argument is ``pull``, which can be a string or list of strings, and it specifies the variable names to be saved as results of the ``Task``.
41
42 Next, the user need to submit the Tasks to the ``TaskController`` with the ``TaskClient``::
43
44 In [5]: taskids = [ tc.run(t) for t in tasklist ]
45
46 This will give the user a list of the TaskIDs used by the controller to keep track of the Tasks and their results. Now at some point the user are going to want to get those results back. The ``barrier`` method allows the user to wait for the Tasks to finish running::
47
48 In [6]: tc.barrier(taskids)
49
50 This command will block until all the Tasks in ``taskids`` have finished. Now, the user probably want to look at the user's results::
51
52 In [7]: task_results = [ tc.get_task_result(taskid) for taskid in taskids ]
53
54 Now the user have a list of ``TaskResult`` objects, which have the actual result as a dictionary, but also keep track of some useful metadata about the ``Task``::
55
56 In [8]: tr = ``Task``_results[73]
57
58 In [9]: tr
59 Out[9]: ``TaskResult``[ID:73]:{'a':73}
60
61 In [10]: tr.engineid
62 Out[10]: 1
63
64 In [11]: tr.submitted, tr.completed, tr.duration
65 Out[11]: ("2008/03/08 03:41:42", "2008/03/08 03:41:44", 2.12345)
66
67 The actual results are stored in a dictionary, ``tr.results``, and a namespace object ``tr.ns`` which accesses the result keys by attribute::
68
69 In [12]: tr.results['a']
70 Out[12]: 73
71
72 In [13]: tr.ns.a
73 Out[13]: 73
74
75 That should cover the basics of running simple Tasks. There are several more powerful things the user can do with Tasks covered later. The most useful probably being using a ``MutiEngineClient`` interface to initialize all the engines with the import dependencies necessary to run the user's Tasks.
76
77 There are many options for running and managing Tasks. The best way to learn further about the ``Task`` interface is to study the examples in ``docs/examples``. If the user do so and learn a lots about this interface, we encourage the user to expand this documentation about the ``Task`` system.
78
79 Overview of the Task System
80 ===========================
81
82 The user's view of the ``Task`` system has three basic objects: The ``TaskClient``, the ``Task``, and the ``TaskResult``. The names of these three objects well indicate their role.
83
84 The ``TaskClient`` is the user's ``Task`` farming connection to the IPython cluster. Unlike the ``MultiEngineClient``, the ``TaskControler`` handles all the scheduling and distribution of work, so the ``TaskClient`` has no notion of engines, it just submits Tasks and requests their results. The Tasks are described as ``Task`` objects, and their results are wrapped in ``TaskResult`` objects. Thus, there are very few necessary methods for the user to manage.
85
86 Inside the task system is a Scheduler object, which assigns tasks to workers. The default scheduler is a simple FIFO queue. Subclassing the Scheduler should be easy, just implementing your own priority system.
87
88 The TaskClient
89 ==============
90
91 The ``TaskClient`` is the object the user use to connect to the ``Controller`` that is managing the user's Tasks. It is the analog of the ``MultiEngineClient`` for the standard IPython multiplexing interface. As with all client interfaces, the first step is to import the IPython Client Module::
92
93 In [1]: from IPython.kernel import client
94
95 Just as with the ``MultiEngineClient``, the user create the ``TaskClient`` with a tuple, containing the ip-address and port of the ``Controller``. the ``client`` module conveniently has the default address of the ``Task`` interface of the controller. Creating a default ``TaskClient`` object would be done with this::
96
97 In [2]: tc = client.TaskClient(client.default_task_address)
98
99 or, if the user want to specify a non default location of the ``Controller``, the user can specify explicitly::
100
101 In [3]: tc = client.TaskClient(("192.168.1.1", 10113))
102
103 As discussed earlier, the ``TaskClient`` only has a few basic methods.
104
105 * ``tc.run(task)``
106 ``run`` is the method by which the user submits Tasks. It takes exactly one argument, a ``Task`` object. All the advanced control of ``Task`` behavior is handled by properties of the ``Task`` object, rather than the submission command, so they will be discussed later in the `Task`_ section. ``run`` returns an integer, the ``Task``ID by which the ``Task`` and its results can be tracked and retrieved::
107
108 In [4]: ``Task``ID = tc.run(``Task``)
109
110 * ``tc.get_task_result(taskid, block=``False``)``
111 ``get_task_result`` is the method by which results are retrieved. It takes a single integer argument, the ``Task``ID`` of the result the user wish to retrieve. ``get_task_result`` also takes a keyword argument ``block``. ``block`` specifies whether the user actually want to wait for the result. If ``block`` is false, as it is by default, ``get_task_result`` will return immediately. If the ``Task`` has completed, it will return the ``TaskResult`` object for that ``Task``. But if the ``Task`` has not completed, it will return ``None``. If the user specify ``block=``True``, then ``get_task_result`` will wait for the ``Task`` to complete, and always return the ``TaskResult`` for the requested ``Task``.
112 * ``tc.barrier(taskid(s))``
113 ``barrier`` is a synchronization method. It takes exactly one argument, a ``Task``ID or list of taskIDs. ``barrier`` will block until all the specified Tasks have completed. In practice, a barrier is often called between the ``Task`` submission section of the code and the result gathering section::
114
115 In [5]: taskIDs = [ tc.run(``Task``) for ``Task`` in myTasks ]
116
117 In [6]: tc.get_task_result(taskIDs[-1]) is None
118 Out[6]: ``True``
119
120 In [7]: tc.barrier(``Task``ID)
121
122 In [8]: results = [ tc.get_task_result(tid) for tid in taskIDs ]
123
124 * ``tc.queue_status(verbose=``False``)``
125 ``queue_status`` is a method for querying the state of the ``TaskControler``. ``queue_status`` returns a dict of the form::
126
127 {'scheduled': Tasks that have been submitted but yet run
128 'pending' : Tasks that are currently running
129 'succeeded': Tasks that have completed successfully
130 'failed' : Tasks that have finished with a failure
131 }
132
133 if @verbose is not specified (or is ``False``), then the values of the dict are integers - the number of Tasks in each state. if @verbose is ``True``, then each element in the dict is a list of the taskIDs in that state::
134
135 In [8]: tc.queue_status()
136 Out[8]: {'scheduled': 4,
137 'pending' : 2,
138 'succeeded': 5,
139 'failed' : 1
140 }
141
142 In [9]: tc.queue_status(verbose=True)
143 Out[9]: {'scheduled': [8,9,10,11],
144 'pending' : [6,7],
145 'succeeded': [0,1,2,4,5],
146 'failed' : [3]
147 }
148
149 * ``tc.abort(taskid)``
150 ``abort`` allows the user to abort Tasks that have already been submitted. ``abort`` will always return immediately. If the ``Task`` has completed, ``abort`` will raise an ``IndexError ``Task`` Already Completed``. An obvious case for ``abort`` would be where the user submits a long-running ``Task`` with a number of retries (see ``Task``_ section for how to specify retries) in an interactive session, but realizes there has been a typo. The user can then abort the ``Task``, preventing certain failures from cluttering up the queue. It can also be used for parallel search-type problems, where only one ``Task`` will give the solution, so once the user find the solution, the user would want to abort all remaining Tasks to prevent wasted work.
151 * ``tc.spin()``
152 ``spin`` simply triggers the scheduler in the ``TaskControler``. Under most normal circumstances, this will do nothing. The primary known usage case involves the ``Task`` dependency (see `Dependencies`_). The dependency is a function of an Engine's ``properties``, but changing the ``properties`` via the ``MutliEngineClient`` does not trigger a reschedule event. The main example case for this requires the following event sequence:
153 * ``engine`` is available, ``Task`` is submitted, but ``engine`` does not have ``Task``'s dependencies.
154 * ``engine`` gets necessary dependencies while no new Tasks are submitted or completed.
155 * now ``engine`` can run ``Task``, but a ``Task`` event is required for the ``TaskControler`` to try scheduling ``Task`` again.
156
157 ``spin`` is just an empty ping method to ensure that the Controller has scheduled all available Tasks, and should not be needed under most normal circumstances.
158
159 That covers the ``TaskClient``, a simple interface to the cluster. With this, the user can submit jobs (and abort if necessary), request their results, synchronize on arbitrary subsets of jobs.
160
161 .. _task: The Task Object
162
163 The Task Object
164 ===============
165
166 The ``Task`` is the basic object for describing a job. It can be used in a very simple manner, where the user just specifies a command string to be executed as the ``Task``. The usage of this first argument is exactly the same as the ``execute`` method of the ``MultiEngine`` (in fact, ``execute`` is called to run the code)::
167
168 In [1]: t = client.Task("a = str(id)")
169
170 This ``Task`` would run, and store the string representation of the ``id`` element in ``a`` in each worker's namespace, but it is fairly useless because the user does not know anything about the state of the ``worker`` on which it ran at the time of retrieving results. It is important that each ``Task`` not expect the state of the ``worker`` to persist after the ``Task`` is completed.
171 There are many different situations for using ``Task`` Farming, and the ``Task`` object has many attributes for use in customizing the ``Task`` behavior. All of a ``Task``'s attributes may be specified in the constructor, through keyword arguments, or after ``Task`` construction through attribute assignment.
172
173 Data Attributes
174 ***************
175 It is likely that the user may want to move data around before or after executing the ``Task``. We provide methods of sending data to initialize the worker's namespace, and specifying what data to bring back as the ``Task``'s results.
176
177 * pull = []
178 The obvious case is as above, where ``t`` would execute and store the result of ``myfunc`` in ``a``, it is likely that the user would want to bring ``a`` back to their namespace. This is done through the ``pull`` attribute. ``pull`` can be a string or list of strings, and it specifies the names of variables to be retrieved. The ``TaskResult`` object retrieved by ``get_task_result`` will have a dictionary of keys and values, and the ``Task``'s ``pull`` attribute determines what goes into it::
179
180 In [2]: t = client.Task("a = str(id)", pull = "a")
181
182 In [3]: t = client.Task("a = str(id)", pull = ["a", "id"])
183
184 * push = {}
185 A user might also want to initialize some data into the namespace before the code part of the ``Task`` is run. Enter ``push``. ``push`` is a dictionary of key/value pairs to be loaded from the user's namespace into the worker's immediately before execution::
186
187 In [4]: t = client.Task("a = f(submitted)", push=dict(submitted=time.time()), pull="a")
188
189 push and pull result directly in calling an ``engine``'s ``push`` and ``pull`` methods before and after ``Task`` execution respectively, and thus their api is the same.
190
191 Namespace Cleaning
192 ******************
193 When a user is running a large number of Tasks, it is likely that the namespace of the worker's could become cluttered. Some Tasks might be sensitive to clutter, while others might be known to cause namespace pollution. For these reasons, Tasks have two boolean attributes for cleaning up the namespace.
194
195 * ``clear_after``
196 if clear_after is specified ``True``, the worker on which the ``Task`` was run will be reset (via ``engine.reset``) upon completion of the ``Task``. This can be useful for both Tasks that produce clutter or Tasks whose intermediate data one might wish to be kept private::
197
198 In [5]: t = client.Task("a = range(1e10)", pull = "a",clear_after=True)
199
200
201 * ``clear_before``
202 as one might guess, clear_before is identical to ``clear_after``, but it takes place before the ``Task`` is run. This ensures that the ``Task`` runs on a fresh worker::
203
204 In [6]: t = client.Task("a = globals()", pull = "a",clear_before=True)
205
206 Of course, a user can both at the same time, ensuring that all workers are clear except when they are currently running a job. Both of these default to ``False``.
207
208 Fault Tolerance
209 ***************
210 It is possible that Tasks might fail, and there are a variety of reasons this could happen. One might be that the worker it was running on disconnected, and there was nothing wrong with the ``Task`` itself. With the fault tolerance attributes of the ``Task``, the user can specify how many times to resubmit the ``Task``, and what to do if it never succeeds.
211
212 * ``retries``
213 ``retries`` is an integer, specifying the number of times a ``Task`` is to be retried. It defaults to zero. It is often a good idea for this number to be 1 or 2, to protect the ``Task`` from disconnecting engines, but not a large number. If a ``Task`` is failing 100 times, there is probably something wrong with the ``Task``. The canonical bad example:
214
215 In [7]: t = client.Task("os.kill(os.getpid(), 9)", retries=99)
216
217 This would actually take down 100 workers.
218
219 * ``recovery_task``
220 ``recovery_task`` is another ``Task`` object, to be run in the event of the original ``Task`` still failing after running out of retries. Since ``recovery_task`` is another ``Task`` object, it can have its own ``recovery_task``. The chain of Tasks is limitless, except loops are not allowed (that would be bad!).
221
222 Dependencies
223 ************
224 Dependencies are the most powerful part of the ``Task`` farming system, because it allows the user to do some classification of the workers, and guide the ``Task`` distribution without meddling with the controller directly. It makes use of two objects - the ``Task``'s ``depend`` attribute, and the engine's ``properties``. See the `MultiEngine`_ reference for how to use engine properties. The engine properties api exists for extending IPython, allowing conditional execution and new controllers that make decisions based on properties of its engines. Currently the ``Task`` dependency is the only internal use of the properties api.
225
226 .. _MultiEngine: ./parallel_multiengine
227
228 The ``depend`` attribute of a ``Task`` must be a function of exactly one argument, the worker's properties dictionary, and it should return ``True`` if the ``Task`` should be allowed to run on the worker and ``False`` if not. The usage in the controller is fault tolerant, so exceptions raised by ``Task.depend`` will be ignored and functionally equivalent to always returning ``False``. Tasks`` with invalid ``depend`` functions will never be assigned to a worker::
229
230 In [8]: def dep(properties):
231 ... return properties["RAM"] > 2**32 # have at least 4GB
232 In [9]: t = client.Task("a = bigfunc()", depend=dep)
233
234 It is important to note that assignment of values to the properties dict is done entirely by the user, either locally (in the engine) using the EngineAPI, or remotely, through the ``MultiEngineClient``'s get/set_properties methods.
235
236
237
238
239
240
@@ -1,8 +1,8 b''
1 1 .. _ip1par:
2 2
3 ======================================
4 Using IPython for parallel computing
5 ======================================
3 ============================
4 Overview and getting started
5 ============================
6 6
7 7 .. contents::
8 8
@@ -1,57 +1,115 b''
1 1 .. _parallelmultiengine:
2 2
3 =================================
4 IPython's MultiEngine interface
5 =================================
3 ===============================
4 IPython's multiengine interface
5 ===============================
6 6
7 7 .. contents::
8 8
9 The MultiEngine interface represents one possible way of working with a
10 set of IPython engines. The basic idea behind the MultiEngine interface is
11 that the capabilities of each engine are explicitly exposed to the user.
12 Thus, in the MultiEngine interface, each engine is given an id that is
13 used to identify the engine and give it work to do. This interface is very
14 intuitive and is designed with interactive usage in mind, and is thus the
15 best place for new users of IPython to begin.
9 The multiengine interface represents one possible way of working with a set of
10 IPython engines. The basic idea behind the multiengine interface is that the
11 capabilities of each engine are directly and explicitly exposed to the user.
12 Thus, in the multiengine interface, each engine is given an id that is used to
13 identify the engine and give it work to do. This interface is very intuitive
14 and is designed with interactive usage in mind, and is thus the best place for
15 new users of IPython to begin.
16 16
17 17 Starting the IPython controller and engines
18 18 ===========================================
19 19
20 20 To follow along with this tutorial, you will need to start the IPython
21 controller and four IPython engines. The simplest way of doing this is to
22 use the ``ipcluster`` command::
21 controller and four IPython engines. The simplest way of doing this is to use
22 the :command:`ipcluster` command::
23 23
24 24 $ ipcluster -n 4
25 25
26 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
26 For more detailed information about starting the controller and engines, see
27 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
27 28
28 29 Creating a ``MultiEngineClient`` instance
29 30 =========================================
30 31
31 The first step is to import the IPython ``client`` module and then create a ``MultiEngineClient`` instance::
32 The first step is to import the IPython :mod:`IPython.kernel.client` module
33 and then create a :class:`MultiEngineClient` instance::
32 34
33 35 In [1]: from IPython.kernel import client
34 36
35 37 In [2]: mec = client.MultiEngineClient()
36 38
37 To make sure there are engines connected to the controller, use can get a list of engine ids::
39 This form assumes that the :file:`ipcontroller-mec.furl` is in the
40 :file:`~./ipython/security` directory on the client's host. If not, the
41 location of the ``.furl`` file must be given as an argument to the
42 constructor::
43
44 In[2]: mec = client.MultiEngineClient('/path/to/my/ipcontroller-mec.furl')
45
46 To make sure there are engines connected to the controller, use can get a list
47 of engine ids::
38 48
39 49 In [3]: mec.get_ids()
40 50 Out[3]: [0, 1, 2, 3]
41 51
42 52 Here we see that there are four engines ready to do work for us.
43 53
54 Quick and easy parallelism
55 ==========================
56
57 In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. The multiengine interface provides two simple ways of accomplishing this: a parallel version of :func:`map` and ``@parallel`` function decorator.
58
59 Parallel map
60 ------------
61
62 Python's builtin :func:`map` functions allows a function to be applied to a
63 sequence element-by-element. This type of code is typically trivial to
64 parallelize. In fact, the multiengine interface in IPython already has a
65 parallel version of :meth:`map` that works just like its serial counterpart::
66
67 In [63]: serial_result = map(lambda x:x**10, range(32))
68
69 In [64]: parallel_result = mec.map(lambda x:x**10, range(32))
70
71 In [65]: serial_result==parallel_result
72 Out[65]: True
73
74 .. note::
75
76 The multiengine interface version of :meth:`map` does not do any load
77 balancing. For a load balanced version, see the task interface.
78
79 .. seealso::
80
81 The :meth:`map` method has a number of options that can be controlled by
82 the :meth:`mapper` method. See its docstring for more information.
83
84 Parallel function decorator
85 ---------------------------
86
87 Parallel functions are just like normal function, but they can be called on sequences and *in parallel*. The multiengine interface provides a decorator that turns any Python function into a parallel function::
88
89 In [10]: @mec.parallel()
90 ....: def f(x):
91 ....: return 10.0*x**4
92 ....:
93
94 In [11]: f(range(32)) # this is done in parallel
95 Out[11]:
96 [0.0,10.0,160.0,...]
97
98 See the docstring for the :meth:`parallel` decorator for options.
99
44 100 Running Python commands
45 101 =======================
46 102
47 The most basic type of operation that can be performed on the engines is to execute Python code. Executing Python code can be done in blocking or non-blocking mode (blocking is default) using the ``execute`` method.
103 The most basic type of operation that can be performed on the engines is to
104 execute Python code. Executing Python code can be done in blocking or
105 non-blocking mode (blocking is default) using the :meth:`execute` method.
48 106
49 107 Blocking execution
50 108 ------------------
51 109
52 In blocking mode, the ``MultiEngineClient`` object (called ``mec`` in
110 In blocking mode, the :class:`MultiEngineClient` object (called ``mec`` in
53 111 these examples) submits the command to the controller, which places the
54 command in the engines' queues for execution. The ``execute`` call then
112 command in the engines' queues for execution. The :meth:`execute` call then
55 113 blocks until the engines are done executing the command::
56 114
57 115 # The default is to run on all engines
@@ -71,7 +129,8 b' blocks until the engines are done executing the command::'
71 129 [2] In [2]: b=10
72 130 [3] In [2]: b=10
73 131
74 Python commands can be executed on specific engines by calling execute using the ``targets`` keyword argument::
132 Python commands can be executed on specific engines by calling execute using
133 the ``targets`` keyword argument::
75 134
76 135 In [6]: mec.execute('c=a+b',targets=[0,2])
77 136 Out[6]:
@@ -102,7 +161,9 b' Python commands can be executed on specific engines by calling execute using the'
102 161 [3] In [4]: print c
103 162 [3] Out[4]: -5
104 163
105 This example also shows one of the most important things about the IPython engines: they have a persistent user namespaces. The ``execute`` method returns a Python ``dict`` that contains useful information::
164 This example also shows one of the most important things about the IPython
165 engines: they have a persistent user namespaces. The :meth:`execute` method
166 returns a Python ``dict`` that contains useful information::
106 167
107 168 In [9]: result_dict = mec.execute('d=10; print d')
108 169
@@ -118,10 +179,12 b' This example also shows one of the most important things about the IPython engin'
118 179 Non-blocking execution
119 180 ----------------------
120 181
121 In non-blocking mode, ``execute`` submits the command to be executed and then returns a
122 ``PendingResult`` object immediately. The ``PendingResult`` object gives you a way of getting a
123 result at a later time through its ``get_result`` method or ``r`` attribute. This allows you to
124 quickly submit long running commands without blocking your local Python/IPython session::
182 In non-blocking mode, :meth:`execute` submits the command to be executed and
183 then returns a :class:`PendingResult` object immediately. The
184 :class:`PendingResult` object gives you a way of getting a result at a later
185 time through its :meth:`get_result` method or :attr:`r` attribute. This allows
186 you to quickly submit long running commands without blocking your local
187 Python/IPython session::
125 188
126 189 # In blocking mode
127 190 In [6]: mec.execute('import time')
@@ -159,7 +222,10 b' quickly submit long running commands without blocking your local Python/IPython '
159 222 [2] In [3]: time.sleep(10)
160 223 [3] In [3]: time.sleep(10)
161 224
162 Often, it is desirable to wait until a set of ``PendingResult`` objects are done. For this, there is a the method ``barrier``. This method takes a tuple of ``PendingResult`` objects and blocks until all of the associated results are ready::
225 Often, it is desirable to wait until a set of :class:`PendingResult` objects
226 are done. For this, there is a the method :meth:`barrier`. This method takes a
227 tuple of :class:`PendingResult` objects and blocks until all of the associated
228 results are ready::
163 229
164 230 In [72]: mec.block=False
165 231
@@ -182,14 +248,16 b' Often, it is desirable to wait until a set of ``PendingResult`` objects are done'
182 248 The ``block`` and ``targets`` keyword arguments and attributes
183 249 --------------------------------------------------------------
184 250
185 Most commands in the multiengine interface (like ``execute``) accept ``block`` and ``targets``
186 as keyword arguments. As we have seen above, these keyword arguments control the blocking mode
187 and which engines the command is applied to. The ``MultiEngineClient`` class also has ``block``
188 and ``targets`` attributes that control the default behavior when the keyword arguments are not
189 provided. Thus the following logic is used for ``block`` and ``targets``:
251 Most methods in the multiengine interface (like :meth:`execute`) accept
252 ``block`` and ``targets`` as keyword arguments. As we have seen above, these
253 keyword arguments control the blocking mode and which engines the command is
254 applied to. The :class:`MultiEngineClient` class also has :attr:`block` and
255 :attr:`targets` attributes that control the default behavior when the keyword
256 arguments are not provided. Thus the following logic is used for :attr:`block`
257 and :attr:`targets`:
190 258
191 * If no keyword argument is provided, the instance attributes are used.
192 * Keyword argument, if provided override the instance attributes.
259 * If no keyword argument is provided, the instance attributes are used.
260 * Keyword argument, if provided override the instance attributes.
193 261
194 262 The following examples demonstrate how to use the instance attributes::
195 263
@@ -225,14 +293,19 b' The following examples demonstrate how to use the instance attributes::'
225 293 [3] In [6]: b=10; print b
226 294 [3] Out[6]: 10
227 295
228 The ``block`` and ``targets`` instance attributes also determine the behavior of the parallel
229 magic commands...
296 The :attr:`block` and :attr:`targets` instance attributes also determine the
297 behavior of the parallel magic commands.
230 298
231 299
232 300 Parallel magic commands
233 301 -----------------------
234 302
235 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to ``execute`` and ``get_result``. The ``%px`` magic executes a single Python command on the engines specified by the `magicTargets``targets` attribute of the ``MultiEngineClient`` instance (by default this is 'all')::
303 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
304 that make it more pleasant to execute Python commands on the engines
305 interactively. These are simply shortcuts to :meth:`execute` and
306 :meth:`get_result`. The ``%px`` magic executes a single Python command on the
307 engines specified by the :attr:`targets` attribute of the
308 :class:`MultiEngineClient` instance (by default this is ``'all'``)::
236 309
237 310 # Make this MultiEngineClient active for parallel magic commands
238 311 In [23]: mec.activate()
@@ -277,7 +350,9 b' We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) t'
277 350 [3] In [9]: print numpy.linalg.eigvals(a)
278 351 [3] Out[9]: [ 0.83664764 -0.25602658]
279 352
280 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last command executed on each engine. It is simply a shortcut to the ``get_result`` method::
353 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
354 command executed on each engine. It is simply a shortcut to the
355 :meth:`get_result` method::
281 356
282 357 In [29]: %result
283 358 Out[29]:
@@ -294,7 +369,8 b' The ``%result`` magic gets and prints the stdin/stdout/stderr of the last comman'
294 369 [3] In [9]: print numpy.linalg.eigvals(a)
295 370 [3] Out[9]: [ 0.83664764 -0.25602658]
296 371
297 The ``%autopx`` magic switches to a mode where everything you type is executed on the engines given by the ``targets`` attribute::
372 The ``%autopx`` magic switches to a mode where everything you type is executed
373 on the engines given by the :attr:`targets` attribute::
298 374
299 375 In [30]: mec.block=False
300 376
@@ -335,51 +411,19 b' The ``%autopx`` magic switches to a mode where everything you type is executed o'
335 411 [3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
336 412 [3] Out[12]: Average max eigenvalue is: 10.1158837784
337 413
338 Using the ``with`` statement of Python 2.5
339 ------------------------------------------
340 414
341 Python 2.5 introduced the ``with`` statement. The ``MultiEngineClient`` can be used with the ``with`` statement to execute a block of code on the engines indicated by the ``targets`` attribute::
415 Moving Python objects around
416 ============================
342 417
343 In [3]: with mec:
344 ...: client.remote() # Required so the following code is not run locally
345 ...: a = 10
346 ...: b = 30
347 ...: c = a+b
348 ...:
349 ...:
350
351 In [4]: mec.get_result()
352 Out[4]:
353 <Results List>
354 [0] In [1]: a = 10
355 b = 30
356 c = a+b
357
358 [1] In [1]: a = 10
359 b = 30
360 c = a+b
361
362 [2] In [1]: a = 10
363 b = 30
364 c = a+b
365
366 [3] In [1]: a = 10
367 b = 30
368 c = a+b
369
370 This is basically another way of calling execute, but one with allows you to avoid writing code in strings. When used in this way, the attributes ``targets`` and ``block`` are used to control how the code is executed. For now, if you run code in non-blocking mode you won't have access to the ``PendingResult``.
371
372 Moving Python object around
373 ===========================
374
375 In addition to executing code on engines, you can transfer Python objects to and from your
376 IPython session and the engines. In IPython, these operations are called ``push`` (sending an
377 object to the engines) and ``pull`` (getting an object from the engines).
418 In addition to executing code on engines, you can transfer Python objects to
419 and from your IPython session and the engines. In IPython, these operations
420 are called :meth:`push` (sending an object to the engines) and :meth:`pull`
421 (getting an object from the engines).
378 422
379 423 Basic push and pull
380 424 -------------------
381 425
382 Here are some examples of how you use ``push`` and ``pull``::
426 Here are some examples of how you use :meth:`push` and :meth:`pull`::
383 427
384 428 In [38]: mec.push(dict(a=1.03234,b=3453))
385 429 Out[38]: [None, None, None, None]
@@ -415,7 +459,8 b' Here are some examples of how you use ``push`` and ``pull``::'
415 459 [3] In [13]: print c
416 460 [3] Out[13]: speed
417 461
418 In non-blocking mode ``push`` and ``pull`` also return ``PendingResult`` objects::
462 In non-blocking mode :meth:`push` and :meth:`pull` also return
463 :class:`PendingResult` objects::
419 464
420 465 In [47]: mec.block=False
421 466
@@ -428,7 +473,11 b' In non-blocking mode ``push`` and ``pull`` also return ``PendingResult`` objects'
428 473 Push and pull for functions
429 474 ---------------------------
430 475
431 Functions can also be pushed and pulled using ``push_function`` and ``pull_function``::
476 Functions can also be pushed and pulled using :meth:`push_function` and
477 :meth:`pull_function`::
478
479
480 In [52]: mec.block=True
432 481
433 482 In [53]: def f(x):
434 483 ....: return 2.0*x**4
@@ -466,7 +515,10 b' Functions can also be pushed and pulled using ``push_function`` and ``pull_funct'
466 515 Dictionary interface
467 516 --------------------
468 517
469 As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class implements some of the Python dictionary interface. This make the remote namespaces of the engines appear as a local dictionary. Underneath, this uses ``push`` and ``pull``::
518 As a shorthand to :meth:`push` and :meth:`pull`, the
519 :class:`MultiEngineClient` class implements some of the Python dictionary
520 interface. This make the remote namespaces of the engines appear as a local
521 dictionary. Underneath, this uses :meth:`push` and :meth:`pull`::
470 522
471 523 In [50]: mec.block=True
472 524
@@ -478,11 +530,13 b' As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class impleme'
478 530 Scatter and gather
479 531 ------------------
480 532
481 Sometimes it is useful to partition a sequence and push the partitions to different engines. In
482 MPI language, this is know as scatter/gather and we follow that terminology. However, it is
483 important to remember that in IPython ``scatter`` is from the interactive IPython session to
484 the engines and ``gather`` is from the engines back to the interactive IPython session. For
485 scatter/gather operations between engines, MPI should be used::
533 Sometimes it is useful to partition a sequence and push the partitions to
534 different engines. In MPI language, this is know as scatter/gather and we
535 follow that terminology. However, it is important to remember that in
536 IPython's :class:`MultiEngineClient` class, :meth:`scatter` is from the
537 interactive IPython session to the engines and :meth:`gather` is from the
538 engines back to the interactive IPython session. For scatter/gather operations
539 between engines, MPI should be used::
486 540
487 541 In [58]: mec.scatter('a',range(16))
488 542 Out[58]: [None, None, None, None]
@@ -510,24 +564,12 b' scatter/gather operations between engines, MPI should be used::'
510 564 Other things to look at
511 565 =======================
512 566
513 Parallel map
514 ------------
515
516 Python's builtin ``map`` functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, the MultiEngine interface in IPython already has a parallel version of ``map`` that works just like its serial counterpart::
517
518 In [63]: serial_result = map(lambda x:x**10, range(32))
519
520 In [64]: parallel_result = mec.map(lambda x:x**10, range(32))
521
522 In [65]: serial_result==parallel_result
523 Out[65]: True
524
525 As you would expect, the parallel version of ``map`` is also influenced by the ``block`` and ``targets`` keyword arguments and attributes.
526
527 567 How to do parallel list comprehensions
528 568 --------------------------------------
529 569
530 In many cases list comprehensions are nicer than using the map function. While we don't have fully parallel list comprehensions, it is simple to get the basic effect using ``scatter`` and ``gather``::
570 In many cases list comprehensions are nicer than using the map function. While
571 we don't have fully parallel list comprehensions, it is simple to get the
572 basic effect using :meth:`scatter` and :meth:`gather`::
531 573
532 574 In [66]: mec.scatter('x',range(64))
533 575 Out[66]: [None, None, None, None]
@@ -547,10 +589,16 b' In many cases list comprehensions are nicer than using the map function. While '
547 589 In [69]: print y
548 590 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
549 591
550 Parallel Exceptions
592 Parallel exceptions
551 593 -------------------
552 594
553 In the MultiEngine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, the MultiEngine interface has a ``CompositeError`` exception class that will be raised in most cases. The ``CompositeError`` class is a special type of exception that wraps one or more other types of exceptions. Here is how it works::
595 In the multiengine interface, parallel commands can raise Python exceptions,
596 just like serial commands. But, it is a little subtle, because a single
597 parallel command can actually raise multiple exceptions (one for each engine
598 the command was run on). To express this idea, the MultiEngine interface has a
599 :exc:`CompositeError` exception class that will be raised in most cases. The
600 :exc:`CompositeError` class is a special type of exception that wraps one or
601 more other types of exceptions. Here is how it works::
554 602
555 603 In [76]: mec.block=True
556 604
@@ -580,7 +628,7 b' In the MultiEngine interface, parallel commands can raise Python exceptions, jus'
580 628 [2:execute]: ZeroDivisionError: integer division or modulo by zero
581 629 [3:execute]: ZeroDivisionError: integer division or modulo by zero
582 630
583 Notice how the error message printed when ``CompositeError`` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions::
631 Notice how the error message printed when :exc:`CompositeError` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions::
584 632
585 633 In [80]: try:
586 634 ....: mec.execute('1/0')
@@ -602,7 +650,9 b' Notice how the error message printed when ``CompositeError`` is raised has infor'
602 650
603 651 ZeroDivisionError: integer division or modulo by zero
604 652
605 If you are working in IPython, you can simple type ``%debug`` after one of these ``CompositeError`` is raised, and inspect the exception instance::
653 If you are working in IPython, you can simple type ``%debug`` after one of
654 these :exc:`CompositeError` exceptions is raised, and inspect the exception
655 instance::
606 656
607 657 In [81]: mec.execute('1/0')
608 658 ---------------------------------------------------------------------------
@@ -679,6 +729,11 b' If you are working in IPython, you can simple type ``%debug`` after one of these'
679 729
680 730 ZeroDivisionError: integer division or modulo by zero
681 731
732 .. note::
733
734 The above example appears to be broken right now because of a change in
735 how we are using Twisted.
736
682 737 All of this same error handling magic even works in non-blocking mode::
683 738
684 739 In [83]: mec.block=False
@@ -1,240 +1,93 b''
1 1 .. _paralleltask:
2 2
3 =================================
4 The IPython Task interface
5 =================================
3 ==========================
4 The IPython task interface
5 ==========================
6 6
7 7 .. contents::
8 8
9 The ``Task`` interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the ``MultiEngine`` interface, in the ``Task`` interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. Best of all the user can use both of these interfaces at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the ``Task`` interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign Tasks to engines explicitly.
9 The task interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the multiengine interface, in the task interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful.
10
11 Best of all the user can use both of these interfaces running at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the task interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign tasks to engines explicitly.
10 12
11 13 Starting the IPython controller and engines
12 14 ===========================================
13 15
14 To follow along with this tutorial, the user will need to start the IPython
15 controller and four IPython engines. The simplest way of doing this is to
16 use the ``ipcluster`` command::
16 To follow along with this tutorial, you will need to start the IPython
17 controller and four IPython engines. The simplest way of doing this is to use
18 the :command:`ipcluster` command::
17 19
18 20 $ ipcluster -n 4
19 21
20 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
22 For more detailed information about starting the controller and engines, see
23 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
24
25 Creating a ``TaskClient`` instance
26 =========================================
27
28 The first step is to import the IPython :mod:`IPython.kernel.client` module
29 and then create a :class:`TaskClient` instance::
30
31 In [1]: from IPython.kernel import client
32
33 In [2]: tc = client.TaskClient()
34
35 This form assumes that the :file:`ipcontroller-tc.furl` is in the
36 :file:`~./ipython/security` directory on the client's host. If not, the
37 location of the ``.furl`` file must be given as an argument to the
38 constructor::
39
40 In[2]: mec = client.TaskClient('/path/to/my/ipcontroller-tc.furl')
41
42 Quick and easy parallelism
43 ==========================
44
45 In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. Like the multiengine interface, the task interface provides two simple ways of accomplishing this: a parallel version of :func:`map` and ``@parallel`` function decorator. However, the verions in the task interface have one important difference: they are dynamically load balanced. Thus, if the execution time per item varies significantly, you should use the versions in the task interface.
46
47 Parallel map
48 ------------
49
50 The parallel :meth:`map` in the task interface is similar to that in the multiengine interface::
51
52 In [63]: serial_result = map(lambda x:x**10, range(32))
53
54 In [64]: parallel_result = tc.map(lambda x:x**10, range(32))
55
56 In [65]: serial_result==parallel_result
57 Out[65]: True
58
59 Parallel function decorator
60 ---------------------------
61
62 Parallel functions are just like normal function, but they can be called on sequences and *in parallel*. The multiengine interface provides a decorator that turns any Python function into a parallel function::
21 63
22 The magic here is that this single controller and set of engines is running both the MultiEngine and ``Task`` interfaces simultaneously.
64 In [10]: @tc.parallel()
65 ....: def f(x):
66 ....: return 10.0*x**4
67 ....:
23 68
24 QuickStart Task Farming
25 =======================
69 In [11]: f(range(32)) # this is done in parallel
70 Out[11]:
71 [0.0,10.0,160.0,...]
26 72
27 First, a quick example of how to start running the most basic Tasks.
28 The first step is to import the IPython ``client`` module and then create a ``TaskClient`` instance::
29
30 In [1]: from IPython.kernel import client
31
32 In [2]: tc = client.TaskClient()
73 More details
74 ============
33 75
34 Then the user wrap the commands the user want to run in Tasks::
76 The :class:`TaskClient` has many more powerful features that allow quite a bit of flexibility in how tasks are defined and run. The next places to look are in the following classes:
35 77
36 In [3]: tasklist = []
37 In [4]: for n in range(1000):
38 ... tasklist.append(client.Task("a = %i"%n, pull="a"))
78 * :class:`IPython.kernel.client.TaskClient`
79 * :class:`IPython.kernel.client.StringTask`
80 * :class:`IPython.kernel.client.MapTask`
39 81
40 The first argument of the ``Task`` constructor is a string, the command to be executed. The most important optional keyword argument is ``pull``, which can be a string or list of strings, and it specifies the variable names to be saved as results of the ``Task``.
82 The following is an overview of how to use these classes together:
41 83
42 Next, the user need to submit the Tasks to the ``TaskController`` with the ``TaskClient``::
84 1. Create a :class:`TaskClient`.
85 2. Create one or more instances of :class:`StringTask` or :class:`MapTask`
86 to define your tasks.
87 3. Submit your tasks to using the :meth:`run` method of your
88 :class:`TaskClient` instance.
89 4. Use :meth:`TaskClient.get_task_result` to get the results of the
90 tasks.
43 91
44 In [5]: taskids = [ tc.run(t) for t in tasklist ]
92 We are in the process of developing more detailed information about the task interface. For now, the docstrings of the :class:`TaskClient`, :class:`StringTask` and :class:`MapTask` classes should be consulted.
45 93
46 This will give the user a list of the TaskIDs used by the controller to keep track of the Tasks and their results. Now at some point the user are going to want to get those results back. The ``barrier`` method allows the user to wait for the Tasks to finish running::
47
48 In [6]: tc.barrier(taskids)
49
50 This command will block until all the Tasks in ``taskids`` have finished. Now, the user probably want to look at the user's results::
51
52 In [7]: task_results = [ tc.get_task_result(taskid) for taskid in taskids ]
53
54 Now the user have a list of ``TaskResult`` objects, which have the actual result as a dictionary, but also keep track of some useful metadata about the ``Task``::
55
56 In [8]: tr = ``Task``_results[73]
57
58 In [9]: tr
59 Out[9]: ``TaskResult``[ID:73]:{'a':73}
60
61 In [10]: tr.engineid
62 Out[10]: 1
63
64 In [11]: tr.submitted, tr.completed, tr.duration
65 Out[11]: ("2008/03/08 03:41:42", "2008/03/08 03:41:44", 2.12345)
66
67 The actual results are stored in a dictionary, ``tr.results``, and a namespace object ``tr.ns`` which accesses the result keys by attribute::
68
69 In [12]: tr.results['a']
70 Out[12]: 73
71
72 In [13]: tr.ns.a
73 Out[13]: 73
74
75 That should cover the basics of running simple Tasks. There are several more powerful things the user can do with Tasks covered later. The most useful probably being using a ``MutiEngineClient`` interface to initialize all the engines with the import dependencies necessary to run the user's Tasks.
76
77 There are many options for running and managing Tasks. The best way to learn further about the ``Task`` interface is to study the examples in ``docs/examples``. If the user do so and learn a lots about this interface, we encourage the user to expand this documentation about the ``Task`` system.
78
79 Overview of the Task System
80 ===========================
81
82 The user's view of the ``Task`` system has three basic objects: The ``TaskClient``, the ``Task``, and the ``TaskResult``. The names of these three objects well indicate their role.
83
84 The ``TaskClient`` is the user's ``Task`` farming connection to the IPython cluster. Unlike the ``MultiEngineClient``, the ``TaskControler`` handles all the scheduling and distribution of work, so the ``TaskClient`` has no notion of engines, it just submits Tasks and requests their results. The Tasks are described as ``Task`` objects, and their results are wrapped in ``TaskResult`` objects. Thus, there are very few necessary methods for the user to manage.
85
86 Inside the task system is a Scheduler object, which assigns tasks to workers. The default scheduler is a simple FIFO queue. Subclassing the Scheduler should be easy, just implementing your own priority system.
87
88 The TaskClient
89 ==============
90
91 The ``TaskClient`` is the object the user use to connect to the ``Controller`` that is managing the user's Tasks. It is the analog of the ``MultiEngineClient`` for the standard IPython multiplexing interface. As with all client interfaces, the first step is to import the IPython Client Module::
92
93 In [1]: from IPython.kernel import client
94
95 Just as with the ``MultiEngineClient``, the user create the ``TaskClient`` with a tuple, containing the ip-address and port of the ``Controller``. the ``client`` module conveniently has the default address of the ``Task`` interface of the controller. Creating a default ``TaskClient`` object would be done with this::
96
97 In [2]: tc = client.TaskClient(client.default_task_address)
98
99 or, if the user want to specify a non default location of the ``Controller``, the user can specify explicitly::
100
101 In [3]: tc = client.TaskClient(("192.168.1.1", 10113))
102
103 As discussed earlier, the ``TaskClient`` only has a few basic methods.
104
105 * ``tc.run(task)``
106 ``run`` is the method by which the user submits Tasks. It takes exactly one argument, a ``Task`` object. All the advanced control of ``Task`` behavior is handled by properties of the ``Task`` object, rather than the submission command, so they will be discussed later in the `Task`_ section. ``run`` returns an integer, the ``Task``ID by which the ``Task`` and its results can be tracked and retrieved::
107
108 In [4]: ``Task``ID = tc.run(``Task``)
109
110 * ``tc.get_task_result(taskid, block=``False``)``
111 ``get_task_result`` is the method by which results are retrieved. It takes a single integer argument, the ``Task``ID`` of the result the user wish to retrieve. ``get_task_result`` also takes a keyword argument ``block``. ``block`` specifies whether the user actually want to wait for the result. If ``block`` is false, as it is by default, ``get_task_result`` will return immediately. If the ``Task`` has completed, it will return the ``TaskResult`` object for that ``Task``. But if the ``Task`` has not completed, it will return ``None``. If the user specify ``block=``True``, then ``get_task_result`` will wait for the ``Task`` to complete, and always return the ``TaskResult`` for the requested ``Task``.
112 * ``tc.barrier(taskid(s))``
113 ``barrier`` is a synchronization method. It takes exactly one argument, a ``Task``ID or list of taskIDs. ``barrier`` will block until all the specified Tasks have completed. In practice, a barrier is often called between the ``Task`` submission section of the code and the result gathering section::
114
115 In [5]: taskIDs = [ tc.run(``Task``) for ``Task`` in myTasks ]
116
117 In [6]: tc.get_task_result(taskIDs[-1]) is None
118 Out[6]: ``True``
119
120 In [7]: tc.barrier(``Task``ID)
121
122 In [8]: results = [ tc.get_task_result(tid) for tid in taskIDs ]
123
124 * ``tc.queue_status(verbose=``False``)``
125 ``queue_status`` is a method for querying the state of the ``TaskControler``. ``queue_status`` returns a dict of the form::
126
127 {'scheduled': Tasks that have been submitted but yet run
128 'pending' : Tasks that are currently running
129 'succeeded': Tasks that have completed successfully
130 'failed' : Tasks that have finished with a failure
131 }
132
133 if @verbose is not specified (or is ``False``), then the values of the dict are integers - the number of Tasks in each state. if @verbose is ``True``, then each element in the dict is a list of the taskIDs in that state::
134
135 In [8]: tc.queue_status()
136 Out[8]: {'scheduled': 4,
137 'pending' : 2,
138 'succeeded': 5,
139 'failed' : 1
140 }
141
142 In [9]: tc.queue_status(verbose=True)
143 Out[9]: {'scheduled': [8,9,10,11],
144 'pending' : [6,7],
145 'succeeded': [0,1,2,4,5],
146 'failed' : [3]
147 }
148
149 * ``tc.abort(taskid)``
150 ``abort`` allows the user to abort Tasks that have already been submitted. ``abort`` will always return immediately. If the ``Task`` has completed, ``abort`` will raise an ``IndexError ``Task`` Already Completed``. An obvious case for ``abort`` would be where the user submits a long-running ``Task`` with a number of retries (see ``Task``_ section for how to specify retries) in an interactive session, but realizes there has been a typo. The user can then abort the ``Task``, preventing certain failures from cluttering up the queue. It can also be used for parallel search-type problems, where only one ``Task`` will give the solution, so once the user find the solution, the user would want to abort all remaining Tasks to prevent wasted work.
151 * ``tc.spin()``
152 ``spin`` simply triggers the scheduler in the ``TaskControler``. Under most normal circumstances, this will do nothing. The primary known usage case involves the ``Task`` dependency (see `Dependencies`_). The dependency is a function of an Engine's ``properties``, but changing the ``properties`` via the ``MutliEngineClient`` does not trigger a reschedule event. The main example case for this requires the following event sequence:
153 * ``engine`` is available, ``Task`` is submitted, but ``engine`` does not have ``Task``'s dependencies.
154 * ``engine`` gets necessary dependencies while no new Tasks are submitted or completed.
155 * now ``engine`` can run ``Task``, but a ``Task`` event is required for the ``TaskControler`` to try scheduling ``Task`` again.
156
157 ``spin`` is just an empty ping method to ensure that the Controller has scheduled all available Tasks, and should not be needed under most normal circumstances.
158
159 That covers the ``TaskClient``, a simple interface to the cluster. With this, the user can submit jobs (and abort if necessary), request their results, synchronize on arbitrary subsets of jobs.
160
161 .. _task: The Task Object
162
163 The Task Object
164 ===============
165
166 The ``Task`` is the basic object for describing a job. It can be used in a very simple manner, where the user just specifies a command string to be executed as the ``Task``. The usage of this first argument is exactly the same as the ``execute`` method of the ``MultiEngine`` (in fact, ``execute`` is called to run the code)::
167
168 In [1]: t = client.Task("a = str(id)")
169
170 This ``Task`` would run, and store the string representation of the ``id`` element in ``a`` in each worker's namespace, but it is fairly useless because the user does not know anything about the state of the ``worker`` on which it ran at the time of retrieving results. It is important that each ``Task`` not expect the state of the ``worker`` to persist after the ``Task`` is completed.
171 There are many different situations for using ``Task`` Farming, and the ``Task`` object has many attributes for use in customizing the ``Task`` behavior. All of a ``Task``'s attributes may be specified in the constructor, through keyword arguments, or after ``Task`` construction through attribute assignment.
172
173 Data Attributes
174 ***************
175 It is likely that the user may want to move data around before or after executing the ``Task``. We provide methods of sending data to initialize the worker's namespace, and specifying what data to bring back as the ``Task``'s results.
176
177 * pull = []
178 The obvious case is as above, where ``t`` would execute and store the result of ``myfunc`` in ``a``, it is likely that the user would want to bring ``a`` back to their namespace. This is done through the ``pull`` attribute. ``pull`` can be a string or list of strings, and it specifies the names of variables to be retrieved. The ``TaskResult`` object retrieved by ``get_task_result`` will have a dictionary of keys and values, and the ``Task``'s ``pull`` attribute determines what goes into it::
179
180 In [2]: t = client.Task("a = str(id)", pull = "a")
181
182 In [3]: t = client.Task("a = str(id)", pull = ["a", "id"])
183
184 * push = {}
185 A user might also want to initialize some data into the namespace before the code part of the ``Task`` is run. Enter ``push``. ``push`` is a dictionary of key/value pairs to be loaded from the user's namespace into the worker's immediately before execution::
186
187 In [4]: t = client.Task("a = f(submitted)", push=dict(submitted=time.time()), pull="a")
188
189 push and pull result directly in calling an ``engine``'s ``push`` and ``pull`` methods before and after ``Task`` execution respectively, and thus their api is the same.
190
191 Namespace Cleaning
192 ******************
193 When a user is running a large number of Tasks, it is likely that the namespace of the worker's could become cluttered. Some Tasks might be sensitive to clutter, while others might be known to cause namespace pollution. For these reasons, Tasks have two boolean attributes for cleaning up the namespace.
194
195 * ``clear_after``
196 if clear_after is specified ``True``, the worker on which the ``Task`` was run will be reset (via ``engine.reset``) upon completion of the ``Task``. This can be useful for both Tasks that produce clutter or Tasks whose intermediate data one might wish to be kept private::
197
198 In [5]: t = client.Task("a = range(1e10)", pull = "a",clear_after=True)
199
200
201 * ``clear_before``
202 as one might guess, clear_before is identical to ``clear_after``, but it takes place before the ``Task`` is run. This ensures that the ``Task`` runs on a fresh worker::
203
204 In [6]: t = client.Task("a = globals()", pull = "a",clear_before=True)
205
206 Of course, a user can both at the same time, ensuring that all workers are clear except when they are currently running a job. Both of these default to ``False``.
207
208 Fault Tolerance
209 ***************
210 It is possible that Tasks might fail, and there are a variety of reasons this could happen. One might be that the worker it was running on disconnected, and there was nothing wrong with the ``Task`` itself. With the fault tolerance attributes of the ``Task``, the user can specify how many times to resubmit the ``Task``, and what to do if it never succeeds.
211
212 * ``retries``
213 ``retries`` is an integer, specifying the number of times a ``Task`` is to be retried. It defaults to zero. It is often a good idea for this number to be 1 or 2, to protect the ``Task`` from disconnecting engines, but not a large number. If a ``Task`` is failing 100 times, there is probably something wrong with the ``Task``. The canonical bad example:
214
215 In [7]: t = client.Task("os.kill(os.getpid(), 9)", retries=99)
216
217 This would actually take down 100 workers.
218
219 * ``recovery_task``
220 ``recovery_task`` is another ``Task`` object, to be run in the event of the original ``Task`` still failing after running out of retries. Since ``recovery_task`` is another ``Task`` object, it can have its own ``recovery_task``. The chain of Tasks is limitless, except loops are not allowed (that would be bad!).
221
222 Dependencies
223 ************
224 Dependencies are the most powerful part of the ``Task`` farming system, because it allows the user to do some classification of the workers, and guide the ``Task`` distribution without meddling with the controller directly. It makes use of two objects - the ``Task``'s ``depend`` attribute, and the engine's ``properties``. See the `MultiEngine`_ reference for how to use engine properties. The engine properties api exists for extending IPython, allowing conditional execution and new controllers that make decisions based on properties of its engines. Currently the ``Task`` dependency is the only internal use of the properties api.
225
226 .. _MultiEngine: ./parallel_multiengine
227
228 The ``depend`` attribute of a ``Task`` must be a function of exactly one argument, the worker's properties dictionary, and it should return ``True`` if the ``Task`` should be allowed to run on the worker and ``False`` if not. The usage in the controller is fault tolerant, so exceptions raised by ``Task.depend`` will be ignored and functionally equivalent to always returning ``False``. Tasks`` with invalid ``depend`` functions will never be assigned to a worker::
229
230 In [8]: def dep(properties):
231 ... return properties["RAM"] > 2**32 # have at least 4GB
232 In [9]: t = client.Task("a = bigfunc()", depend=dep)
233
234 It is important to note that assignment of values to the properties dict is done entirely by the user, either locally (in the engine) using the EngineAPI, or remotely, through the ``MultiEngineClient``'s get/set_properties methods.
235
236
237
238
239
240
General Comments 0
You need to be logged in to leave comments. Login now