Show More
@@ -0,0 +1,240 b'' | |||||
|
1 | .. _paralleltask: | |||
|
2 | ||||
|
3 | ========================== | |||
|
4 | The IPython task interface | |||
|
5 | ========================== | |||
|
6 | ||||
|
7 | .. contents:: | |||
|
8 | ||||
|
9 | The ``Task`` interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the ``MultiEngine`` interface, in the ``Task`` interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. Best of all the user can use both of these interfaces at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the ``Task`` interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign Tasks to engines explicitly. | |||
|
10 | ||||
|
11 | Starting the IPython controller and engines | |||
|
12 | =========================================== | |||
|
13 | ||||
|
14 | To follow along with this tutorial, the user will need to start the IPython | |||
|
15 | controller and four IPython engines. The simplest way of doing this is to | |||
|
16 | use the ``ipcluster`` command:: | |||
|
17 | ||||
|
18 | $ ipcluster -n 4 | |||
|
19 | ||||
|
20 | For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing. | |||
|
21 | ||||
|
22 | The magic here is that this single controller and set of engines is running both the MultiEngine and ``Task`` interfaces simultaneously. | |||
|
23 | ||||
|
24 | QuickStart Task Farming | |||
|
25 | ======================= | |||
|
26 | ||||
|
27 | First, a quick example of how to start running the most basic Tasks. | |||
|
28 | The first step is to import the IPython ``client`` module and then create a ``TaskClient`` instance:: | |||
|
29 | ||||
|
30 | In [1]: from IPython.kernel import client | |||
|
31 | ||||
|
32 | In [2]: tc = client.TaskClient() | |||
|
33 | ||||
|
34 | Then the user wrap the commands the user want to run in Tasks:: | |||
|
35 | ||||
|
36 | In [3]: tasklist = [] | |||
|
37 | In [4]: for n in range(1000): | |||
|
38 | ... tasklist.append(client.Task("a = %i"%n, pull="a")) | |||
|
39 | ||||
|
40 | The first argument of the ``Task`` constructor is a string, the command to be executed. The most important optional keyword argument is ``pull``, which can be a string or list of strings, and it specifies the variable names to be saved as results of the ``Task``. | |||
|
41 | ||||
|
42 | Next, the user need to submit the Tasks to the ``TaskController`` with the ``TaskClient``:: | |||
|
43 | ||||
|
44 | In [5]: taskids = [ tc.run(t) for t in tasklist ] | |||
|
45 | ||||
|
46 | This will give the user a list of the TaskIDs used by the controller to keep track of the Tasks and their results. Now at some point the user are going to want to get those results back. The ``barrier`` method allows the user to wait for the Tasks to finish running:: | |||
|
47 | ||||
|
48 | In [6]: tc.barrier(taskids) | |||
|
49 | ||||
|
50 | This command will block until all the Tasks in ``taskids`` have finished. Now, the user probably want to look at the user's results:: | |||
|
51 | ||||
|
52 | In [7]: task_results = [ tc.get_task_result(taskid) for taskid in taskids ] | |||
|
53 | ||||
|
54 | Now the user have a list of ``TaskResult`` objects, which have the actual result as a dictionary, but also keep track of some useful metadata about the ``Task``:: | |||
|
55 | ||||
|
56 | In [8]: tr = ``Task``_results[73] | |||
|
57 | ||||
|
58 | In [9]: tr | |||
|
59 | Out[9]: ``TaskResult``[ID:73]:{'a':73} | |||
|
60 | ||||
|
61 | In [10]: tr.engineid | |||
|
62 | Out[10]: 1 | |||
|
63 | ||||
|
64 | In [11]: tr.submitted, tr.completed, tr.duration | |||
|
65 | Out[11]: ("2008/03/08 03:41:42", "2008/03/08 03:41:44", 2.12345) | |||
|
66 | ||||
|
67 | The actual results are stored in a dictionary, ``tr.results``, and a namespace object ``tr.ns`` which accesses the result keys by attribute:: | |||
|
68 | ||||
|
69 | In [12]: tr.results['a'] | |||
|
70 | Out[12]: 73 | |||
|
71 | ||||
|
72 | In [13]: tr.ns.a | |||
|
73 | Out[13]: 73 | |||
|
74 | ||||
|
75 | That should cover the basics of running simple Tasks. There are several more powerful things the user can do with Tasks covered later. The most useful probably being using a ``MutiEngineClient`` interface to initialize all the engines with the import dependencies necessary to run the user's Tasks. | |||
|
76 | ||||
|
77 | There are many options for running and managing Tasks. The best way to learn further about the ``Task`` interface is to study the examples in ``docs/examples``. If the user do so and learn a lots about this interface, we encourage the user to expand this documentation about the ``Task`` system. | |||
|
78 | ||||
|
79 | Overview of the Task System | |||
|
80 | =========================== | |||
|
81 | ||||
|
82 | The user's view of the ``Task`` system has three basic objects: The ``TaskClient``, the ``Task``, and the ``TaskResult``. The names of these three objects well indicate their role. | |||
|
83 | ||||
|
84 | The ``TaskClient`` is the user's ``Task`` farming connection to the IPython cluster. Unlike the ``MultiEngineClient``, the ``TaskControler`` handles all the scheduling and distribution of work, so the ``TaskClient`` has no notion of engines, it just submits Tasks and requests their results. The Tasks are described as ``Task`` objects, and their results are wrapped in ``TaskResult`` objects. Thus, there are very few necessary methods for the user to manage. | |||
|
85 | ||||
|
86 | Inside the task system is a Scheduler object, which assigns tasks to workers. The default scheduler is a simple FIFO queue. Subclassing the Scheduler should be easy, just implementing your own priority system. | |||
|
87 | ||||
|
88 | The TaskClient | |||
|
89 | ============== | |||
|
90 | ||||
|
91 | The ``TaskClient`` is the object the user use to connect to the ``Controller`` that is managing the user's Tasks. It is the analog of the ``MultiEngineClient`` for the standard IPython multiplexing interface. As with all client interfaces, the first step is to import the IPython Client Module:: | |||
|
92 | ||||
|
93 | In [1]: from IPython.kernel import client | |||
|
94 | ||||
|
95 | Just as with the ``MultiEngineClient``, the user create the ``TaskClient`` with a tuple, containing the ip-address and port of the ``Controller``. the ``client`` module conveniently has the default address of the ``Task`` interface of the controller. Creating a default ``TaskClient`` object would be done with this:: | |||
|
96 | ||||
|
97 | In [2]: tc = client.TaskClient(client.default_task_address) | |||
|
98 | ||||
|
99 | or, if the user want to specify a non default location of the ``Controller``, the user can specify explicitly:: | |||
|
100 | ||||
|
101 | In [3]: tc = client.TaskClient(("192.168.1.1", 10113)) | |||
|
102 | ||||
|
103 | As discussed earlier, the ``TaskClient`` only has a few basic methods. | |||
|
104 | ||||
|
105 | * ``tc.run(task)`` | |||
|
106 | ``run`` is the method by which the user submits Tasks. It takes exactly one argument, a ``Task`` object. All the advanced control of ``Task`` behavior is handled by properties of the ``Task`` object, rather than the submission command, so they will be discussed later in the `Task`_ section. ``run`` returns an integer, the ``Task``ID by which the ``Task`` and its results can be tracked and retrieved:: | |||
|
107 | ||||
|
108 | In [4]: ``Task``ID = tc.run(``Task``) | |||
|
109 | ||||
|
110 | * ``tc.get_task_result(taskid, block=``False``)`` | |||
|
111 | ``get_task_result`` is the method by which results are retrieved. It takes a single integer argument, the ``Task``ID`` of the result the user wish to retrieve. ``get_task_result`` also takes a keyword argument ``block``. ``block`` specifies whether the user actually want to wait for the result. If ``block`` is false, as it is by default, ``get_task_result`` will return immediately. If the ``Task`` has completed, it will return the ``TaskResult`` object for that ``Task``. But if the ``Task`` has not completed, it will return ``None``. If the user specify ``block=``True``, then ``get_task_result`` will wait for the ``Task`` to complete, and always return the ``TaskResult`` for the requested ``Task``. | |||
|
112 | * ``tc.barrier(taskid(s))`` | |||
|
113 | ``barrier`` is a synchronization method. It takes exactly one argument, a ``Task``ID or list of taskIDs. ``barrier`` will block until all the specified Tasks have completed. In practice, a barrier is often called between the ``Task`` submission section of the code and the result gathering section:: | |||
|
114 | ||||
|
115 | In [5]: taskIDs = [ tc.run(``Task``) for ``Task`` in myTasks ] | |||
|
116 | ||||
|
117 | In [6]: tc.get_task_result(taskIDs[-1]) is None | |||
|
118 | Out[6]: ``True`` | |||
|
119 | ||||
|
120 | In [7]: tc.barrier(``Task``ID) | |||
|
121 | ||||
|
122 | In [8]: results = [ tc.get_task_result(tid) for tid in taskIDs ] | |||
|
123 | ||||
|
124 | * ``tc.queue_status(verbose=``False``)`` | |||
|
125 | ``queue_status`` is a method for querying the state of the ``TaskControler``. ``queue_status`` returns a dict of the form:: | |||
|
126 | ||||
|
127 | {'scheduled': Tasks that have been submitted but yet run | |||
|
128 | 'pending' : Tasks that are currently running | |||
|
129 | 'succeeded': Tasks that have completed successfully | |||
|
130 | 'failed' : Tasks that have finished with a failure | |||
|
131 | } | |||
|
132 | ||||
|
133 | if @verbose is not specified (or is ``False``), then the values of the dict are integers - the number of Tasks in each state. if @verbose is ``True``, then each element in the dict is a list of the taskIDs in that state:: | |||
|
134 | ||||
|
135 | In [8]: tc.queue_status() | |||
|
136 | Out[8]: {'scheduled': 4, | |||
|
137 | 'pending' : 2, | |||
|
138 | 'succeeded': 5, | |||
|
139 | 'failed' : 1 | |||
|
140 | } | |||
|
141 | ||||
|
142 | In [9]: tc.queue_status(verbose=True) | |||
|
143 | Out[9]: {'scheduled': [8,9,10,11], | |||
|
144 | 'pending' : [6,7], | |||
|
145 | 'succeeded': [0,1,2,4,5], | |||
|
146 | 'failed' : [3] | |||
|
147 | } | |||
|
148 | ||||
|
149 | * ``tc.abort(taskid)`` | |||
|
150 | ``abort`` allows the user to abort Tasks that have already been submitted. ``abort`` will always return immediately. If the ``Task`` has completed, ``abort`` will raise an ``IndexError ``Task`` Already Completed``. An obvious case for ``abort`` would be where the user submits a long-running ``Task`` with a number of retries (see ``Task``_ section for how to specify retries) in an interactive session, but realizes there has been a typo. The user can then abort the ``Task``, preventing certain failures from cluttering up the queue. It can also be used for parallel search-type problems, where only one ``Task`` will give the solution, so once the user find the solution, the user would want to abort all remaining Tasks to prevent wasted work. | |||
|
151 | * ``tc.spin()`` | |||
|
152 | ``spin`` simply triggers the scheduler in the ``TaskControler``. Under most normal circumstances, this will do nothing. The primary known usage case involves the ``Task`` dependency (see `Dependencies`_). The dependency is a function of an Engine's ``properties``, but changing the ``properties`` via the ``MutliEngineClient`` does not trigger a reschedule event. The main example case for this requires the following event sequence: | |||
|
153 | * ``engine`` is available, ``Task`` is submitted, but ``engine`` does not have ``Task``'s dependencies. | |||
|
154 | * ``engine`` gets necessary dependencies while no new Tasks are submitted or completed. | |||
|
155 | * now ``engine`` can run ``Task``, but a ``Task`` event is required for the ``TaskControler`` to try scheduling ``Task`` again. | |||
|
156 | ||||
|
157 | ``spin`` is just an empty ping method to ensure that the Controller has scheduled all available Tasks, and should not be needed under most normal circumstances. | |||
|
158 | ||||
|
159 | That covers the ``TaskClient``, a simple interface to the cluster. With this, the user can submit jobs (and abort if necessary), request their results, synchronize on arbitrary subsets of jobs. | |||
|
160 | ||||
|
161 | .. _task: The Task Object | |||
|
162 | ||||
|
163 | The Task Object | |||
|
164 | =============== | |||
|
165 | ||||
|
166 | The ``Task`` is the basic object for describing a job. It can be used in a very simple manner, where the user just specifies a command string to be executed as the ``Task``. The usage of this first argument is exactly the same as the ``execute`` method of the ``MultiEngine`` (in fact, ``execute`` is called to run the code):: | |||
|
167 | ||||
|
168 | In [1]: t = client.Task("a = str(id)") | |||
|
169 | ||||
|
170 | This ``Task`` would run, and store the string representation of the ``id`` element in ``a`` in each worker's namespace, but it is fairly useless because the user does not know anything about the state of the ``worker`` on which it ran at the time of retrieving results. It is important that each ``Task`` not expect the state of the ``worker`` to persist after the ``Task`` is completed. | |||
|
171 | There are many different situations for using ``Task`` Farming, and the ``Task`` object has many attributes for use in customizing the ``Task`` behavior. All of a ``Task``'s attributes may be specified in the constructor, through keyword arguments, or after ``Task`` construction through attribute assignment. | |||
|
172 | ||||
|
173 | Data Attributes | |||
|
174 | *************** | |||
|
175 | It is likely that the user may want to move data around before or after executing the ``Task``. We provide methods of sending data to initialize the worker's namespace, and specifying what data to bring back as the ``Task``'s results. | |||
|
176 | ||||
|
177 | * pull = [] | |||
|
178 | The obvious case is as above, where ``t`` would execute and store the result of ``myfunc`` in ``a``, it is likely that the user would want to bring ``a`` back to their namespace. This is done through the ``pull`` attribute. ``pull`` can be a string or list of strings, and it specifies the names of variables to be retrieved. The ``TaskResult`` object retrieved by ``get_task_result`` will have a dictionary of keys and values, and the ``Task``'s ``pull`` attribute determines what goes into it:: | |||
|
179 | ||||
|
180 | In [2]: t = client.Task("a = str(id)", pull = "a") | |||
|
181 | ||||
|
182 | In [3]: t = client.Task("a = str(id)", pull = ["a", "id"]) | |||
|
183 | ||||
|
184 | * push = {} | |||
|
185 | A user might also want to initialize some data into the namespace before the code part of the ``Task`` is run. Enter ``push``. ``push`` is a dictionary of key/value pairs to be loaded from the user's namespace into the worker's immediately before execution:: | |||
|
186 | ||||
|
187 | In [4]: t = client.Task("a = f(submitted)", push=dict(submitted=time.time()), pull="a") | |||
|
188 | ||||
|
189 | push and pull result directly in calling an ``engine``'s ``push`` and ``pull`` methods before and after ``Task`` execution respectively, and thus their api is the same. | |||
|
190 | ||||
|
191 | Namespace Cleaning | |||
|
192 | ****************** | |||
|
193 | When a user is running a large number of Tasks, it is likely that the namespace of the worker's could become cluttered. Some Tasks might be sensitive to clutter, while others might be known to cause namespace pollution. For these reasons, Tasks have two boolean attributes for cleaning up the namespace. | |||
|
194 | ||||
|
195 | * ``clear_after`` | |||
|
196 | if clear_after is specified ``True``, the worker on which the ``Task`` was run will be reset (via ``engine.reset``) upon completion of the ``Task``. This can be useful for both Tasks that produce clutter or Tasks whose intermediate data one might wish to be kept private:: | |||
|
197 | ||||
|
198 | In [5]: t = client.Task("a = range(1e10)", pull = "a",clear_after=True) | |||
|
199 | ||||
|
200 | ||||
|
201 | * ``clear_before`` | |||
|
202 | as one might guess, clear_before is identical to ``clear_after``, but it takes place before the ``Task`` is run. This ensures that the ``Task`` runs on a fresh worker:: | |||
|
203 | ||||
|
204 | In [6]: t = client.Task("a = globals()", pull = "a",clear_before=True) | |||
|
205 | ||||
|
206 | Of course, a user can both at the same time, ensuring that all workers are clear except when they are currently running a job. Both of these default to ``False``. | |||
|
207 | ||||
|
208 | Fault Tolerance | |||
|
209 | *************** | |||
|
210 | It is possible that Tasks might fail, and there are a variety of reasons this could happen. One might be that the worker it was running on disconnected, and there was nothing wrong with the ``Task`` itself. With the fault tolerance attributes of the ``Task``, the user can specify how many times to resubmit the ``Task``, and what to do if it never succeeds. | |||
|
211 | ||||
|
212 | * ``retries`` | |||
|
213 | ``retries`` is an integer, specifying the number of times a ``Task`` is to be retried. It defaults to zero. It is often a good idea for this number to be 1 or 2, to protect the ``Task`` from disconnecting engines, but not a large number. If a ``Task`` is failing 100 times, there is probably something wrong with the ``Task``. The canonical bad example: | |||
|
214 | ||||
|
215 | In [7]: t = client.Task("os.kill(os.getpid(), 9)", retries=99) | |||
|
216 | ||||
|
217 | This would actually take down 100 workers. | |||
|
218 | ||||
|
219 | * ``recovery_task`` | |||
|
220 | ``recovery_task`` is another ``Task`` object, to be run in the event of the original ``Task`` still failing after running out of retries. Since ``recovery_task`` is another ``Task`` object, it can have its own ``recovery_task``. The chain of Tasks is limitless, except loops are not allowed (that would be bad!). | |||
|
221 | ||||
|
222 | Dependencies | |||
|
223 | ************ | |||
|
224 | Dependencies are the most powerful part of the ``Task`` farming system, because it allows the user to do some classification of the workers, and guide the ``Task`` distribution without meddling with the controller directly. It makes use of two objects - the ``Task``'s ``depend`` attribute, and the engine's ``properties``. See the `MultiEngine`_ reference for how to use engine properties. The engine properties api exists for extending IPython, allowing conditional execution and new controllers that make decisions based on properties of its engines. Currently the ``Task`` dependency is the only internal use of the properties api. | |||
|
225 | ||||
|
226 | .. _MultiEngine: ./parallel_multiengine | |||
|
227 | ||||
|
228 | The ``depend`` attribute of a ``Task`` must be a function of exactly one argument, the worker's properties dictionary, and it should return ``True`` if the ``Task`` should be allowed to run on the worker and ``False`` if not. The usage in the controller is fault tolerant, so exceptions raised by ``Task.depend`` will be ignored and functionally equivalent to always returning ``False``. Tasks`` with invalid ``depend`` functions will never be assigned to a worker:: | |||
|
229 | ||||
|
230 | In [8]: def dep(properties): | |||
|
231 | ... return properties["RAM"] > 2**32 # have at least 4GB | |||
|
232 | In [9]: t = client.Task("a = bigfunc()", depend=dep) | |||
|
233 | ||||
|
234 | It is important to note that assignment of values to the properties dict is done entirely by the user, either locally (in the engine) using the EngineAPI, or remotely, through the ``MultiEngineClient``'s get/set_properties methods. | |||
|
235 | ||||
|
236 | ||||
|
237 | ||||
|
238 | ||||
|
239 | ||||
|
240 |
@@ -1,8 +1,8 b'' | |||||
1 | .. _ip1par: |
|
1 | .. _ip1par: | |
2 |
|
2 | |||
3 |
============================ |
|
3 | ============================ | |
4 | Using IPython for parallel computing |
|
4 | Overview and getting started | |
5 |
============================ |
|
5 | ============================ | |
6 |
|
6 | |||
7 | .. contents:: |
|
7 | .. contents:: | |
8 |
|
8 |
@@ -1,57 +1,115 b'' | |||||
1 | .. _parallelmultiengine: |
|
1 | .. _parallelmultiengine: | |
2 |
|
2 | |||
3 |
=============================== |
|
3 | =============================== | |
4 |
IPython's |
|
4 | IPython's multiengine interface | |
5 |
=============================== |
|
5 | =============================== | |
6 |
|
6 | |||
7 | .. contents:: |
|
7 | .. contents:: | |
8 |
|
8 | |||
9 |
The |
|
9 | The multiengine interface represents one possible way of working with a set of | |
10 |
|
|
10 | IPython engines. The basic idea behind the multiengine interface is that the | |
11 |
|
|
11 | capabilities of each engine are directly and explicitly exposed to the user. | |
12 |
Thus, in the |
|
12 | Thus, in the multiengine interface, each engine is given an id that is used to | |
13 |
|
|
13 | identify the engine and give it work to do. This interface is very intuitive | |
14 |
|
|
14 | and is designed with interactive usage in mind, and is thus the best place for | |
15 |
|
|
15 | new users of IPython to begin. | |
16 |
|
16 | |||
17 | Starting the IPython controller and engines |
|
17 | Starting the IPython controller and engines | |
18 | =========================================== |
|
18 | =========================================== | |
19 |
|
19 | |||
20 | To follow along with this tutorial, you will need to start the IPython |
|
20 | To follow along with this tutorial, you will need to start the IPython | |
21 | controller and four IPython engines. The simplest way of doing this is to |
|
21 | controller and four IPython engines. The simplest way of doing this is to use | |
22 |
|
|
22 | the :command:`ipcluster` command:: | |
23 |
|
23 | |||
24 | $ ipcluster -n 4 |
|
24 | $ ipcluster -n 4 | |
25 |
|
25 | |||
26 |
For more detailed information about starting the controller and engines, see |
|
26 | For more detailed information about starting the controller and engines, see | |
|
27 | our :ref:`introduction <ip1par>` to using IPython for parallel computing. | |||
27 |
|
28 | |||
28 | Creating a ``MultiEngineClient`` instance |
|
29 | Creating a ``MultiEngineClient`` instance | |
29 | ========================================= |
|
30 | ========================================= | |
30 |
|
31 | |||
31 |
The first step is to import the IPython |
|
32 | The first step is to import the IPython :mod:`IPython.kernel.client` module | |
|
33 | and then create a :class:`MultiEngineClient` instance:: | |||
32 |
|
34 | |||
33 | In [1]: from IPython.kernel import client |
|
35 | In [1]: from IPython.kernel import client | |
34 |
|
36 | |||
35 | In [2]: mec = client.MultiEngineClient() |
|
37 | In [2]: mec = client.MultiEngineClient() | |
36 |
|
38 | |||
37 | To make sure there are engines connected to the controller, use can get a list of engine ids:: |
|
39 | This form assumes that the :file:`ipcontroller-mec.furl` is in the | |
|
40 | :file:`~./ipython/security` directory on the client's host. If not, the | |||
|
41 | location of the ``.furl`` file must be given as an argument to the | |||
|
42 | constructor:: | |||
|
43 | ||||
|
44 | In[2]: mec = client.MultiEngineClient('/path/to/my/ipcontroller-mec.furl') | |||
|
45 | ||||
|
46 | To make sure there are engines connected to the controller, use can get a list | |||
|
47 | of engine ids:: | |||
38 |
|
48 | |||
39 | In [3]: mec.get_ids() |
|
49 | In [3]: mec.get_ids() | |
40 | Out[3]: [0, 1, 2, 3] |
|
50 | Out[3]: [0, 1, 2, 3] | |
41 |
|
51 | |||
42 | Here we see that there are four engines ready to do work for us. |
|
52 | Here we see that there are four engines ready to do work for us. | |
43 |
|
53 | |||
|
54 | Quick and easy parallelism | |||
|
55 | ========================== | |||
|
56 | ||||
|
57 | In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. The multiengine interface provides two simple ways of accomplishing this: a parallel version of :func:`map` and ``@parallel`` function decorator. | |||
|
58 | ||||
|
59 | Parallel map | |||
|
60 | ------------ | |||
|
61 | ||||
|
62 | Python's builtin :func:`map` functions allows a function to be applied to a | |||
|
63 | sequence element-by-element. This type of code is typically trivial to | |||
|
64 | parallelize. In fact, the multiengine interface in IPython already has a | |||
|
65 | parallel version of :meth:`map` that works just like its serial counterpart:: | |||
|
66 | ||||
|
67 | In [63]: serial_result = map(lambda x:x**10, range(32)) | |||
|
68 | ||||
|
69 | In [64]: parallel_result = mec.map(lambda x:x**10, range(32)) | |||
|
70 | ||||
|
71 | In [65]: serial_result==parallel_result | |||
|
72 | Out[65]: True | |||
|
73 | ||||
|
74 | .. note:: | |||
|
75 | ||||
|
76 | The multiengine interface version of :meth:`map` does not do any load | |||
|
77 | balancing. For a load balanced version, see the task interface. | |||
|
78 | ||||
|
79 | .. seealso:: | |||
|
80 | ||||
|
81 | The :meth:`map` method has a number of options that can be controlled by | |||
|
82 | the :meth:`mapper` method. See its docstring for more information. | |||
|
83 | ||||
|
84 | Parallel function decorator | |||
|
85 | --------------------------- | |||
|
86 | ||||
|
87 | Parallel functions are just like normal function, but they can be called on sequences and *in parallel*. The multiengine interface provides a decorator that turns any Python function into a parallel function:: | |||
|
88 | ||||
|
89 | In [10]: @mec.parallel() | |||
|
90 | ....: def f(x): | |||
|
91 | ....: return 10.0*x**4 | |||
|
92 | ....: | |||
|
93 | ||||
|
94 | In [11]: f(range(32)) # this is done in parallel | |||
|
95 | Out[11]: | |||
|
96 | [0.0,10.0,160.0,...] | |||
|
97 | ||||
|
98 | See the docstring for the :meth:`parallel` decorator for options. | |||
|
99 | ||||
44 | Running Python commands |
|
100 | Running Python commands | |
45 | ======================= |
|
101 | ======================= | |
46 |
|
102 | |||
47 | The most basic type of operation that can be performed on the engines is to execute Python code. Executing Python code can be done in blocking or non-blocking mode (blocking is default) using the ``execute`` method. |
|
103 | The most basic type of operation that can be performed on the engines is to | |
|
104 | execute Python code. Executing Python code can be done in blocking or | |||
|
105 | non-blocking mode (blocking is default) using the :meth:`execute` method. | |||
48 |
|
106 | |||
49 | Blocking execution |
|
107 | Blocking execution | |
50 | ------------------ |
|
108 | ------------------ | |
51 |
|
109 | |||
52 |
In blocking mode, the |
|
110 | In blocking mode, the :class:`MultiEngineClient` object (called ``mec`` in | |
53 | these examples) submits the command to the controller, which places the |
|
111 | these examples) submits the command to the controller, which places the | |
54 |
command in the engines' queues for execution. The |
|
112 | command in the engines' queues for execution. The :meth:`execute` call then | |
55 | blocks until the engines are done executing the command:: |
|
113 | blocks until the engines are done executing the command:: | |
56 |
|
114 | |||
57 | # The default is to run on all engines |
|
115 | # The default is to run on all engines | |
@@ -71,7 +129,8 b' blocks until the engines are done executing the command::' | |||||
71 | [2] In [2]: b=10 |
|
129 | [2] In [2]: b=10 | |
72 | [3] In [2]: b=10 |
|
130 | [3] In [2]: b=10 | |
73 |
|
131 | |||
74 |
Python commands can be executed on specific engines by calling execute using |
|
132 | Python commands can be executed on specific engines by calling execute using | |
|
133 | the ``targets`` keyword argument:: | |||
75 |
|
134 | |||
76 | In [6]: mec.execute('c=a+b',targets=[0,2]) |
|
135 | In [6]: mec.execute('c=a+b',targets=[0,2]) | |
77 | Out[6]: |
|
136 | Out[6]: | |
@@ -102,7 +161,9 b' Python commands can be executed on specific engines by calling execute using the' | |||||
102 | [3] In [4]: print c |
|
161 | [3] In [4]: print c | |
103 | [3] Out[4]: -5 |
|
162 | [3] Out[4]: -5 | |
104 |
|
163 | |||
105 | This example also shows one of the most important things about the IPython engines: they have a persistent user namespaces. The ``execute`` method returns a Python ``dict`` that contains useful information:: |
|
164 | This example also shows one of the most important things about the IPython | |
|
165 | engines: they have a persistent user namespaces. The :meth:`execute` method | |||
|
166 | returns a Python ``dict`` that contains useful information:: | |||
106 |
|
167 | |||
107 | In [9]: result_dict = mec.execute('d=10; print d') |
|
168 | In [9]: result_dict = mec.execute('d=10; print d') | |
108 |
|
169 | |||
@@ -118,10 +179,12 b' This example also shows one of the most important things about the IPython engin' | |||||
118 | Non-blocking execution |
|
179 | Non-blocking execution | |
119 | ---------------------- |
|
180 | ---------------------- | |
120 |
|
181 | |||
121 |
In non-blocking mode, |
|
182 | In non-blocking mode, :meth:`execute` submits the command to be executed and | |
122 | ``PendingResult`` object immediately. The ``PendingResult`` object gives you a way of getting a |
|
183 | then returns a :class:`PendingResult` object immediately. The | |
123 | result at a later time through its ``get_result`` method or ``r`` attribute. This allows you to |
|
184 | :class:`PendingResult` object gives you a way of getting a result at a later | |
124 | quickly submit long running commands without blocking your local Python/IPython session:: |
|
185 | time through its :meth:`get_result` method or :attr:`r` attribute. This allows | |
|
186 | you to quickly submit long running commands without blocking your local | |||
|
187 | Python/IPython session:: | |||
125 |
|
188 | |||
126 | # In blocking mode |
|
189 | # In blocking mode | |
127 | In [6]: mec.execute('import time') |
|
190 | In [6]: mec.execute('import time') | |
@@ -159,7 +222,10 b' quickly submit long running commands without blocking your local Python/IPython ' | |||||
159 | [2] In [3]: time.sleep(10) |
|
222 | [2] In [3]: time.sleep(10) | |
160 | [3] In [3]: time.sleep(10) |
|
223 | [3] In [3]: time.sleep(10) | |
161 |
|
224 | |||
162 | Often, it is desirable to wait until a set of ``PendingResult`` objects are done. For this, there is a the method ``barrier``. This method takes a tuple of ``PendingResult`` objects and blocks until all of the associated results are ready:: |
|
225 | Often, it is desirable to wait until a set of :class:`PendingResult` objects | |
|
226 | are done. For this, there is a the method :meth:`barrier`. This method takes a | |||
|
227 | tuple of :class:`PendingResult` objects and blocks until all of the associated | |||
|
228 | results are ready:: | |||
163 |
|
229 | |||
164 | In [72]: mec.block=False |
|
230 | In [72]: mec.block=False | |
165 |
|
231 | |||
@@ -182,11 +248,13 b' Often, it is desirable to wait until a set of ``PendingResult`` objects are done' | |||||
182 | The ``block`` and ``targets`` keyword arguments and attributes |
|
248 | The ``block`` and ``targets`` keyword arguments and attributes | |
183 | -------------------------------------------------------------- |
|
249 | -------------------------------------------------------------- | |
184 |
|
250 | |||
185 |
Most |
|
251 | Most methods in the multiengine interface (like :meth:`execute`) accept | |
186 | as keyword arguments. As we have seen above, these keyword arguments control the blocking mode |
|
252 | ``block`` and ``targets`` as keyword arguments. As we have seen above, these | |
187 | and which engines the command is applied to. The ``MultiEngineClient`` class also has ``block`` |
|
253 | keyword arguments control the blocking mode and which engines the command is | |
188 | and ``targets`` attributes that control the default behavior when the keyword arguments are not |
|
254 | applied to. The :class:`MultiEngineClient` class also has :attr:`block` and | |
189 | provided. Thus the following logic is used for ``block`` and ``targets``: |
|
255 | :attr:`targets` attributes that control the default behavior when the keyword | |
|
256 | arguments are not provided. Thus the following logic is used for :attr:`block` | |||
|
257 | and :attr:`targets`: | |||
190 |
|
258 | |||
191 |
|
|
259 | * If no keyword argument is provided, the instance attributes are used. | |
192 |
|
|
260 | * Keyword argument, if provided override the instance attributes. | |
@@ -225,14 +293,19 b' The following examples demonstrate how to use the instance attributes::' | |||||
225 | [3] In [6]: b=10; print b |
|
293 | [3] In [6]: b=10; print b | |
226 | [3] Out[6]: 10 |
|
294 | [3] Out[6]: 10 | |
227 |
|
295 | |||
228 |
The |
|
296 | The :attr:`block` and :attr:`targets` instance attributes also determine the | |
229 | magic commands... |
|
297 | behavior of the parallel magic commands. | |
230 |
|
298 | |||
231 |
|
299 | |||
232 | Parallel magic commands |
|
300 | Parallel magic commands | |
233 | ----------------------- |
|
301 | ----------------------- | |
234 |
|
302 | |||
235 | We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to ``execute`` and ``get_result``. The ``%px`` magic executes a single Python command on the engines specified by the `magicTargets``targets` attribute of the ``MultiEngineClient`` instance (by default this is 'all'):: |
|
303 | We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) | |
|
304 | that make it more pleasant to execute Python commands on the engines | |||
|
305 | interactively. These are simply shortcuts to :meth:`execute` and | |||
|
306 | :meth:`get_result`. The ``%px`` magic executes a single Python command on the | |||
|
307 | engines specified by the :attr:`targets` attribute of the | |||
|
308 | :class:`MultiEngineClient` instance (by default this is ``'all'``):: | |||
236 |
|
309 | |||
237 | # Make this MultiEngineClient active for parallel magic commands |
|
310 | # Make this MultiEngineClient active for parallel magic commands | |
238 | In [23]: mec.activate() |
|
311 | In [23]: mec.activate() | |
@@ -277,7 +350,9 b' We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) t' | |||||
277 | [3] In [9]: print numpy.linalg.eigvals(a) |
|
350 | [3] In [9]: print numpy.linalg.eigvals(a) | |
278 | [3] Out[9]: [ 0.83664764 -0.25602658] |
|
351 | [3] Out[9]: [ 0.83664764 -0.25602658] | |
279 |
|
352 | |||
280 |
The ``%result`` magic gets and prints the stdin/stdout/stderr of the last |
|
353 | The ``%result`` magic gets and prints the stdin/stdout/stderr of the last | |
|
354 | command executed on each engine. It is simply a shortcut to the | |||
|
355 | :meth:`get_result` method:: | |||
281 |
|
356 | |||
282 | In [29]: %result |
|
357 | In [29]: %result | |
283 | Out[29]: |
|
358 | Out[29]: | |
@@ -294,7 +369,8 b' The ``%result`` magic gets and prints the stdin/stdout/stderr of the last comman' | |||||
294 | [3] In [9]: print numpy.linalg.eigvals(a) |
|
369 | [3] In [9]: print numpy.linalg.eigvals(a) | |
295 | [3] Out[9]: [ 0.83664764 -0.25602658] |
|
370 | [3] Out[9]: [ 0.83664764 -0.25602658] | |
296 |
|
371 | |||
297 |
The ``%autopx`` magic switches to a mode where everything you type is executed |
|
372 | The ``%autopx`` magic switches to a mode where everything you type is executed | |
|
373 | on the engines given by the :attr:`targets` attribute:: | |||
298 |
|
374 | |||
299 | In [30]: mec.block=False |
|
375 | In [30]: mec.block=False | |
300 |
|
376 | |||
@@ -335,51 +411,19 b' The ``%autopx`` magic switches to a mode where everything you type is executed o' | |||||
335 | [3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals) |
|
411 | [3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals) | |
336 | [3] Out[12]: Average max eigenvalue is: 10.1158837784 |
|
412 | [3] Out[12]: Average max eigenvalue is: 10.1158837784 | |
337 |
|
413 | |||
338 | Using the ``with`` statement of Python 2.5 |
|
|||
339 | ------------------------------------------ |
|
|||
340 |
|
||||
341 | Python 2.5 introduced the ``with`` statement. The ``MultiEngineClient`` can be used with the ``with`` statement to execute a block of code on the engines indicated by the ``targets`` attribute:: |
|
|||
342 |
|
||||
343 | In [3]: with mec: |
|
|||
344 | ...: client.remote() # Required so the following code is not run locally |
|
|||
345 | ...: a = 10 |
|
|||
346 | ...: b = 30 |
|
|||
347 | ...: c = a+b |
|
|||
348 | ...: |
|
|||
349 | ...: |
|
|||
350 |
|
||||
351 | In [4]: mec.get_result() |
|
|||
352 | Out[4]: |
|
|||
353 | <Results List> |
|
|||
354 | [0] In [1]: a = 10 |
|
|||
355 | b = 30 |
|
|||
356 | c = a+b |
|
|||
357 |
|
||||
358 | [1] In [1]: a = 10 |
|
|||
359 | b = 30 |
|
|||
360 | c = a+b |
|
|||
361 |
|
414 | |||
362 | [2] In [1]: a = 10 |
|
415 | Moving Python objects around | |
363 | b = 30 |
|
416 | ============================ | |
364 | c = a+b |
|
|||
365 |
|
417 | |||
366 | [3] In [1]: a = 10 |
|
418 | In addition to executing code on engines, you can transfer Python objects to | |
367 | b = 30 |
|
419 | and from your IPython session and the engines. In IPython, these operations | |
368 | c = a+b |
|
420 | are called :meth:`push` (sending an object to the engines) and :meth:`pull` | |
369 |
|
421 | (getting an object from the engines). | ||
370 | This is basically another way of calling execute, but one with allows you to avoid writing code in strings. When used in this way, the attributes ``targets`` and ``block`` are used to control how the code is executed. For now, if you run code in non-blocking mode you won't have access to the ``PendingResult``. |
|
|||
371 |
|
||||
372 | Moving Python object around |
|
|||
373 | =========================== |
|
|||
374 |
|
||||
375 | In addition to executing code on engines, you can transfer Python objects to and from your |
|
|||
376 | IPython session and the engines. In IPython, these operations are called ``push`` (sending an |
|
|||
377 | object to the engines) and ``pull`` (getting an object from the engines). |
|
|||
378 |
|
422 | |||
379 | Basic push and pull |
|
423 | Basic push and pull | |
380 | ------------------- |
|
424 | ------------------- | |
381 |
|
425 | |||
382 |
Here are some examples of how you use |
|
426 | Here are some examples of how you use :meth:`push` and :meth:`pull`:: | |
383 |
|
427 | |||
384 | In [38]: mec.push(dict(a=1.03234,b=3453)) |
|
428 | In [38]: mec.push(dict(a=1.03234,b=3453)) | |
385 | Out[38]: [None, None, None, None] |
|
429 | Out[38]: [None, None, None, None] | |
@@ -415,7 +459,8 b' Here are some examples of how you use ``push`` and ``pull``::' | |||||
415 | [3] In [13]: print c |
|
459 | [3] In [13]: print c | |
416 | [3] Out[13]: speed |
|
460 | [3] Out[13]: speed | |
417 |
|
461 | |||
418 |
In non-blocking mode |
|
462 | In non-blocking mode :meth:`push` and :meth:`pull` also return | |
|
463 | :class:`PendingResult` objects:: | |||
419 |
|
464 | |||
420 | In [47]: mec.block=False |
|
465 | In [47]: mec.block=False | |
421 |
|
466 | |||
@@ -428,7 +473,11 b' In non-blocking mode ``push`` and ``pull`` also return ``PendingResult`` objects' | |||||
428 | Push and pull for functions |
|
473 | Push and pull for functions | |
429 | --------------------------- |
|
474 | --------------------------- | |
430 |
|
475 | |||
431 |
Functions can also be pushed and pulled using |
|
476 | Functions can also be pushed and pulled using :meth:`push_function` and | |
|
477 | :meth:`pull_function`:: | |||
|
478 | ||||
|
479 | ||||
|
480 | In [52]: mec.block=True | |||
432 |
|
481 | |||
433 | In [53]: def f(x): |
|
482 | In [53]: def f(x): | |
434 | ....: return 2.0*x**4 |
|
483 | ....: return 2.0*x**4 | |
@@ -466,7 +515,10 b' Functions can also be pushed and pulled using ``push_function`` and ``pull_funct' | |||||
466 | Dictionary interface |
|
515 | Dictionary interface | |
467 | -------------------- |
|
516 | -------------------- | |
468 |
|
517 | |||
469 | As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class implements some of the Python dictionary interface. This make the remote namespaces of the engines appear as a local dictionary. Underneath, this uses ``push`` and ``pull``:: |
|
518 | As a shorthand to :meth:`push` and :meth:`pull`, the | |
|
519 | :class:`MultiEngineClient` class implements some of the Python dictionary | |||
|
520 | interface. This make the remote namespaces of the engines appear as a local | |||
|
521 | dictionary. Underneath, this uses :meth:`push` and :meth:`pull`:: | |||
470 |
|
522 | |||
471 | In [50]: mec.block=True |
|
523 | In [50]: mec.block=True | |
472 |
|
524 | |||
@@ -478,11 +530,13 b' As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class impleme' | |||||
478 | Scatter and gather |
|
530 | Scatter and gather | |
479 | ------------------ |
|
531 | ------------------ | |
480 |
|
532 | |||
481 |
Sometimes it is useful to partition a sequence and push the partitions to |
|
533 | Sometimes it is useful to partition a sequence and push the partitions to | |
482 |
MPI language, this is know as scatter/gather and we |
|
534 | different engines. In MPI language, this is know as scatter/gather and we | |
483 | important to remember that in IPython ``scatter`` is from the interactive IPython session to |
|
535 | follow that terminology. However, it is important to remember that in | |
484 | the engines and ``gather`` is from the engines back to the interactive IPython session. For |
|
536 | IPython's :class:`MultiEngineClient` class, :meth:`scatter` is from the | |
485 | scatter/gather operations between engines, MPI should be used:: |
|
537 | interactive IPython session to the engines and :meth:`gather` is from the | |
|
538 | engines back to the interactive IPython session. For scatter/gather operations | |||
|
539 | between engines, MPI should be used:: | |||
486 |
|
540 | |||
487 | In [58]: mec.scatter('a',range(16)) |
|
541 | In [58]: mec.scatter('a',range(16)) | |
488 | Out[58]: [None, None, None, None] |
|
542 | Out[58]: [None, None, None, None] | |
@@ -510,24 +564,12 b' scatter/gather operations between engines, MPI should be used::' | |||||
510 | Other things to look at |
|
564 | Other things to look at | |
511 | ======================= |
|
565 | ======================= | |
512 |
|
566 | |||
513 | Parallel map |
|
|||
514 | ------------ |
|
|||
515 |
|
||||
516 | Python's builtin ``map`` functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, the MultiEngine interface in IPython already has a parallel version of ``map`` that works just like its serial counterpart:: |
|
|||
517 |
|
||||
518 | In [63]: serial_result = map(lambda x:x**10, range(32)) |
|
|||
519 |
|
||||
520 | In [64]: parallel_result = mec.map(lambda x:x**10, range(32)) |
|
|||
521 |
|
||||
522 | In [65]: serial_result==parallel_result |
|
|||
523 | Out[65]: True |
|
|||
524 |
|
||||
525 | As you would expect, the parallel version of ``map`` is also influenced by the ``block`` and ``targets`` keyword arguments and attributes. |
|
|||
526 |
|
||||
527 | How to do parallel list comprehensions |
|
567 | How to do parallel list comprehensions | |
528 | -------------------------------------- |
|
568 | -------------------------------------- | |
529 |
|
569 | |||
530 | In many cases list comprehensions are nicer than using the map function. While we don't have fully parallel list comprehensions, it is simple to get the basic effect using ``scatter`` and ``gather``:: |
|
570 | In many cases list comprehensions are nicer than using the map function. While | |
|
571 | we don't have fully parallel list comprehensions, it is simple to get the | |||
|
572 | basic effect using :meth:`scatter` and :meth:`gather`:: | |||
531 |
|
573 | |||
532 | In [66]: mec.scatter('x',range(64)) |
|
574 | In [66]: mec.scatter('x',range(64)) | |
533 | Out[66]: [None, None, None, None] |
|
575 | Out[66]: [None, None, None, None] | |
@@ -547,10 +589,16 b' In many cases list comprehensions are nicer than using the map function. While ' | |||||
547 | In [69]: print y |
|
589 | In [69]: print y | |
548 | [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] |
|
590 | [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] | |
549 |
|
591 | |||
550 |
Parallel |
|
592 | Parallel exceptions | |
551 | ------------------- |
|
593 | ------------------- | |
552 |
|
594 | |||
553 | In the MultiEngine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, the MultiEngine interface has a ``CompositeError`` exception class that will be raised in most cases. The ``CompositeError`` class is a special type of exception that wraps one or more other types of exceptions. Here is how it works:: |
|
595 | In the multiengine interface, parallel commands can raise Python exceptions, | |
|
596 | just like serial commands. But, it is a little subtle, because a single | |||
|
597 | parallel command can actually raise multiple exceptions (one for each engine | |||
|
598 | the command was run on). To express this idea, the MultiEngine interface has a | |||
|
599 | :exc:`CompositeError` exception class that will be raised in most cases. The | |||
|
600 | :exc:`CompositeError` class is a special type of exception that wraps one or | |||
|
601 | more other types of exceptions. Here is how it works:: | |||
554 |
|
602 | |||
555 | In [76]: mec.block=True |
|
603 | In [76]: mec.block=True | |
556 |
|
604 | |||
@@ -580,7 +628,7 b' In the MultiEngine interface, parallel commands can raise Python exceptions, jus' | |||||
580 | [2:execute]: ZeroDivisionError: integer division or modulo by zero |
|
628 | [2:execute]: ZeroDivisionError: integer division or modulo by zero | |
581 | [3:execute]: ZeroDivisionError: integer division or modulo by zero |
|
629 | [3:execute]: ZeroDivisionError: integer division or modulo by zero | |
582 |
|
630 | |||
583 |
Notice how the error message printed when |
|
631 | Notice how the error message printed when :exc:`CompositeError` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions:: | |
584 |
|
632 | |||
585 | In [80]: try: |
|
633 | In [80]: try: | |
586 | ....: mec.execute('1/0') |
|
634 | ....: mec.execute('1/0') | |
@@ -602,7 +650,9 b' Notice how the error message printed when ``CompositeError`` is raised has infor' | |||||
602 |
|
650 | |||
603 | ZeroDivisionError: integer division or modulo by zero |
|
651 | ZeroDivisionError: integer division or modulo by zero | |
604 |
|
652 | |||
605 |
If you are working in IPython, you can simple type ``%debug`` after one of |
|
653 | If you are working in IPython, you can simple type ``%debug`` after one of | |
|
654 | these :exc:`CompositeError` exceptions is raised, and inspect the exception | |||
|
655 | instance:: | |||
606 |
|
656 | |||
607 | In [81]: mec.execute('1/0') |
|
657 | In [81]: mec.execute('1/0') | |
608 | --------------------------------------------------------------------------- |
|
658 | --------------------------------------------------------------------------- | |
@@ -679,6 +729,11 b' If you are working in IPython, you can simple type ``%debug`` after one of these' | |||||
679 |
|
729 | |||
680 | ZeroDivisionError: integer division or modulo by zero |
|
730 | ZeroDivisionError: integer division or modulo by zero | |
681 |
|
731 | |||
|
732 | .. note:: | |||
|
733 | ||||
|
734 | The above example appears to be broken right now because of a change in | |||
|
735 | how we are using Twisted. | |||
|
736 | ||||
682 | All of this same error handling magic even works in non-blocking mode:: |
|
737 | All of this same error handling magic even works in non-blocking mode:: | |
683 |
|
738 | |||
684 | In [83]: mec.block=False |
|
739 | In [83]: mec.block=False |
@@ -1,240 +1,93 b'' | |||||
1 | .. _paralleltask: |
|
1 | .. _paralleltask: | |
2 |
|
2 | |||
3 |
========================== |
|
3 | ========================== | |
4 |
The IPython |
|
4 | The IPython task interface | |
5 |
========================== |
|
5 | ========================== | |
6 |
|
6 | |||
7 | .. contents:: |
|
7 | .. contents:: | |
8 |
|
8 | |||
9 | The ``Task`` interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the ``MultiEngine`` interface, in the ``Task`` interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. Best of all the user can use both of these interfaces at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the ``Task`` interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign Tasks to engines explicitly. |
|
9 | The task interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the multiengine interface, in the task interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. | |
|
10 | ||||
|
11 | Best of all the user can use both of these interfaces running at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the task interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign tasks to engines explicitly. | |||
10 |
|
12 | |||
11 | Starting the IPython controller and engines |
|
13 | Starting the IPython controller and engines | |
12 | =========================================== |
|
14 | =========================================== | |
13 |
|
15 | |||
14 |
To follow along with this tutorial, |
|
16 | To follow along with this tutorial, you will need to start the IPython | |
15 | controller and four IPython engines. The simplest way of doing this is to |
|
17 | controller and four IPython engines. The simplest way of doing this is to use | |
16 |
|
|
18 | the :command:`ipcluster` command:: | |
17 |
|
19 | |||
18 | $ ipcluster -n 4 |
|
20 | $ ipcluster -n 4 | |
19 |
|
21 | |||
20 |
For more detailed information about starting the controller and engines, see |
|
22 | For more detailed information about starting the controller and engines, see | |
21 |
|
23 | our :ref:`introduction <ip1par>` to using IPython for parallel computing. | ||
22 | The magic here is that this single controller and set of engines is running both the MultiEngine and ``Task`` interfaces simultaneously. |
|
|||
23 |
|
24 | |||
24 | QuickStart Task Farming |
|
25 | Creating a ``TaskClient`` instance | |
25 | ======================= |
|
26 | ========================================= | |
26 |
|
27 | |||
27 | First, a quick example of how to start running the most basic Tasks. |
|
28 | The first step is to import the IPython :mod:`IPython.kernel.client` module | |
28 | The first step is to import the IPython ``client`` module and then create a ``TaskClient`` instance:: |
|
29 | and then create a :class:`TaskClient` instance:: | |
29 |
|
30 | |||
30 |
|
|
31 | In [1]: from IPython.kernel import client | |
31 |
|
32 | |||
32 |
|
|
33 | In [2]: tc = client.TaskClient() | |
33 |
|
34 | |||
34 | Then the user wrap the commands the user want to run in Tasks:: |
|
35 | This form assumes that the :file:`ipcontroller-tc.furl` is in the | |
35 |
|
36 | :file:`~./ipython/security` directory on the client's host. If not, the | ||
36 | In [3]: tasklist = [] |
|
37 | location of the ``.furl`` file must be given as an argument to the | |
37 | In [4]: for n in range(1000): |
|
38 | constructor:: | |
38 | ... tasklist.append(client.Task("a = %i"%n, pull="a")) |
|
|||
39 |
|
||||
40 | The first argument of the ``Task`` constructor is a string, the command to be executed. The most important optional keyword argument is ``pull``, which can be a string or list of strings, and it specifies the variable names to be saved as results of the ``Task``. |
|
|||
41 |
|
||||
42 | Next, the user need to submit the Tasks to the ``TaskController`` with the ``TaskClient``:: |
|
|||
43 |
|
||||
44 | In [5]: taskids = [ tc.run(t) for t in tasklist ] |
|
|||
45 |
|
||||
46 | This will give the user a list of the TaskIDs used by the controller to keep track of the Tasks and their results. Now at some point the user are going to want to get those results back. The ``barrier`` method allows the user to wait for the Tasks to finish running:: |
|
|||
47 |
|
||||
48 | In [6]: tc.barrier(taskids) |
|
|||
49 |
|
||||
50 | This command will block until all the Tasks in ``taskids`` have finished. Now, the user probably want to look at the user's results:: |
|
|||
51 |
|
||||
52 | In [7]: task_results = [ tc.get_task_result(taskid) for taskid in taskids ] |
|
|||
53 |
|
||||
54 | Now the user have a list of ``TaskResult`` objects, which have the actual result as a dictionary, but also keep track of some useful metadata about the ``Task``:: |
|
|||
55 |
|
||||
56 | In [8]: tr = ``Task``_results[73] |
|
|||
57 |
|
||||
58 | In [9]: tr |
|
|||
59 | Out[9]: ``TaskResult``[ID:73]:{'a':73} |
|
|||
60 |
|
||||
61 | In [10]: tr.engineid |
|
|||
62 | Out[10]: 1 |
|
|||
63 |
|
||||
64 | In [11]: tr.submitted, tr.completed, tr.duration |
|
|||
65 | Out[11]: ("2008/03/08 03:41:42", "2008/03/08 03:41:44", 2.12345) |
|
|||
66 |
|
||||
67 | The actual results are stored in a dictionary, ``tr.results``, and a namespace object ``tr.ns`` which accesses the result keys by attribute:: |
|
|||
68 |
|
||||
69 | In [12]: tr.results['a'] |
|
|||
70 | Out[12]: 73 |
|
|||
71 |
|
||||
72 | In [13]: tr.ns.a |
|
|||
73 | Out[13]: 73 |
|
|||
74 |
|
||||
75 | That should cover the basics of running simple Tasks. There are several more powerful things the user can do with Tasks covered later. The most useful probably being using a ``MutiEngineClient`` interface to initialize all the engines with the import dependencies necessary to run the user's Tasks. |
|
|||
76 |
|
||||
77 | There are many options for running and managing Tasks. The best way to learn further about the ``Task`` interface is to study the examples in ``docs/examples``. If the user do so and learn a lots about this interface, we encourage the user to expand this documentation about the ``Task`` system. |
|
|||
78 |
|
||||
79 | Overview of the Task System |
|
|||
80 | =========================== |
|
|||
81 |
|
||||
82 | The user's view of the ``Task`` system has three basic objects: The ``TaskClient``, the ``Task``, and the ``TaskResult``. The names of these three objects well indicate their role. |
|
|||
83 |
|
||||
84 | The ``TaskClient`` is the user's ``Task`` farming connection to the IPython cluster. Unlike the ``MultiEngineClient``, the ``TaskControler`` handles all the scheduling and distribution of work, so the ``TaskClient`` has no notion of engines, it just submits Tasks and requests their results. The Tasks are described as ``Task`` objects, and their results are wrapped in ``TaskResult`` objects. Thus, there are very few necessary methods for the user to manage. |
|
|||
85 |
|
||||
86 | Inside the task system is a Scheduler object, which assigns tasks to workers. The default scheduler is a simple FIFO queue. Subclassing the Scheduler should be easy, just implementing your own priority system. |
|
|||
87 |
|
||||
88 | The TaskClient |
|
|||
89 | ============== |
|
|||
90 |
|
||||
91 | The ``TaskClient`` is the object the user use to connect to the ``Controller`` that is managing the user's Tasks. It is the analog of the ``MultiEngineClient`` for the standard IPython multiplexing interface. As with all client interfaces, the first step is to import the IPython Client Module:: |
|
|||
92 |
|
||||
93 | In [1]: from IPython.kernel import client |
|
|||
94 |
|
||||
95 | Just as with the ``MultiEngineClient``, the user create the ``TaskClient`` with a tuple, containing the ip-address and port of the ``Controller``. the ``client`` module conveniently has the default address of the ``Task`` interface of the controller. Creating a default ``TaskClient`` object would be done with this:: |
|
|||
96 |
|
||||
97 | In [2]: tc = client.TaskClient(client.default_task_address) |
|
|||
98 |
|
||||
99 | or, if the user want to specify a non default location of the ``Controller``, the user can specify explicitly:: |
|
|||
100 |
|
||||
101 | In [3]: tc = client.TaskClient(("192.168.1.1", 10113)) |
|
|||
102 |
|
||||
103 | As discussed earlier, the ``TaskClient`` only has a few basic methods. |
|
|||
104 |
|
||||
105 | * ``tc.run(task)`` |
|
|||
106 | ``run`` is the method by which the user submits Tasks. It takes exactly one argument, a ``Task`` object. All the advanced control of ``Task`` behavior is handled by properties of the ``Task`` object, rather than the submission command, so they will be discussed later in the `Task`_ section. ``run`` returns an integer, the ``Task``ID by which the ``Task`` and its results can be tracked and retrieved:: |
|
|||
107 |
|
||||
108 | In [4]: ``Task``ID = tc.run(``Task``) |
|
|||
109 |
|
||||
110 | * ``tc.get_task_result(taskid, block=``False``)`` |
|
|||
111 | ``get_task_result`` is the method by which results are retrieved. It takes a single integer argument, the ``Task``ID`` of the result the user wish to retrieve. ``get_task_result`` also takes a keyword argument ``block``. ``block`` specifies whether the user actually want to wait for the result. If ``block`` is false, as it is by default, ``get_task_result`` will return immediately. If the ``Task`` has completed, it will return the ``TaskResult`` object for that ``Task``. But if the ``Task`` has not completed, it will return ``None``. If the user specify ``block=``True``, then ``get_task_result`` will wait for the ``Task`` to complete, and always return the ``TaskResult`` for the requested ``Task``. |
|
|||
112 | * ``tc.barrier(taskid(s))`` |
|
|||
113 | ``barrier`` is a synchronization method. It takes exactly one argument, a ``Task``ID or list of taskIDs. ``barrier`` will block until all the specified Tasks have completed. In practice, a barrier is often called between the ``Task`` submission section of the code and the result gathering section:: |
|
|||
114 |
|
||||
115 | In [5]: taskIDs = [ tc.run(``Task``) for ``Task`` in myTasks ] |
|
|||
116 |
|
||||
117 | In [6]: tc.get_task_result(taskIDs[-1]) is None |
|
|||
118 | Out[6]: ``True`` |
|
|||
119 |
|
||||
120 | In [7]: tc.barrier(``Task``ID) |
|
|||
121 |
|
||||
122 | In [8]: results = [ tc.get_task_result(tid) for tid in taskIDs ] |
|
|||
123 |
|
||||
124 | * ``tc.queue_status(verbose=``False``)`` |
|
|||
125 | ``queue_status`` is a method for querying the state of the ``TaskControler``. ``queue_status`` returns a dict of the form:: |
|
|||
126 |
|
||||
127 | {'scheduled': Tasks that have been submitted but yet run |
|
|||
128 | 'pending' : Tasks that are currently running |
|
|||
129 | 'succeeded': Tasks that have completed successfully |
|
|||
130 | 'failed' : Tasks that have finished with a failure |
|
|||
131 | } |
|
|||
132 |
|
||||
133 | if @verbose is not specified (or is ``False``), then the values of the dict are integers - the number of Tasks in each state. if @verbose is ``True``, then each element in the dict is a list of the taskIDs in that state:: |
|
|||
134 |
|
||||
135 | In [8]: tc.queue_status() |
|
|||
136 | Out[8]: {'scheduled': 4, |
|
|||
137 | 'pending' : 2, |
|
|||
138 | 'succeeded': 5, |
|
|||
139 | 'failed' : 1 |
|
|||
140 | } |
|
|||
141 |
|
||||
142 | In [9]: tc.queue_status(verbose=True) |
|
|||
143 | Out[9]: {'scheduled': [8,9,10,11], |
|
|||
144 | 'pending' : [6,7], |
|
|||
145 | 'succeeded': [0,1,2,4,5], |
|
|||
146 | 'failed' : [3] |
|
|||
147 | } |
|
|||
148 |
|
||||
149 | * ``tc.abort(taskid)`` |
|
|||
150 | ``abort`` allows the user to abort Tasks that have already been submitted. ``abort`` will always return immediately. If the ``Task`` has completed, ``abort`` will raise an ``IndexError ``Task`` Already Completed``. An obvious case for ``abort`` would be where the user submits a long-running ``Task`` with a number of retries (see ``Task``_ section for how to specify retries) in an interactive session, but realizes there has been a typo. The user can then abort the ``Task``, preventing certain failures from cluttering up the queue. It can also be used for parallel search-type problems, where only one ``Task`` will give the solution, so once the user find the solution, the user would want to abort all remaining Tasks to prevent wasted work. |
|
|||
151 | * ``tc.spin()`` |
|
|||
152 | ``spin`` simply triggers the scheduler in the ``TaskControler``. Under most normal circumstances, this will do nothing. The primary known usage case involves the ``Task`` dependency (see `Dependencies`_). The dependency is a function of an Engine's ``properties``, but changing the ``properties`` via the ``MutliEngineClient`` does not trigger a reschedule event. The main example case for this requires the following event sequence: |
|
|||
153 | * ``engine`` is available, ``Task`` is submitted, but ``engine`` does not have ``Task``'s dependencies. |
|
|||
154 | * ``engine`` gets necessary dependencies while no new Tasks are submitted or completed. |
|
|||
155 | * now ``engine`` can run ``Task``, but a ``Task`` event is required for the ``TaskControler`` to try scheduling ``Task`` again. |
|
|||
156 |
|
||||
157 | ``spin`` is just an empty ping method to ensure that the Controller has scheduled all available Tasks, and should not be needed under most normal circumstances. |
|
|||
158 |
|
||||
159 | That covers the ``TaskClient``, a simple interface to the cluster. With this, the user can submit jobs (and abort if necessary), request their results, synchronize on arbitrary subsets of jobs. |
|
|||
160 |
|
||||
161 | .. _task: The Task Object |
|
|||
162 |
|
||||
163 | The Task Object |
|
|||
164 | =============== |
|
|||
165 |
|
||||
166 | The ``Task`` is the basic object for describing a job. It can be used in a very simple manner, where the user just specifies a command string to be executed as the ``Task``. The usage of this first argument is exactly the same as the ``execute`` method of the ``MultiEngine`` (in fact, ``execute`` is called to run the code):: |
|
|||
167 |
|
||||
168 | In [1]: t = client.Task("a = str(id)") |
|
|||
169 |
|
||||
170 | This ``Task`` would run, and store the string representation of the ``id`` element in ``a`` in each worker's namespace, but it is fairly useless because the user does not know anything about the state of the ``worker`` on which it ran at the time of retrieving results. It is important that each ``Task`` not expect the state of the ``worker`` to persist after the ``Task`` is completed. |
|
|||
171 | There are many different situations for using ``Task`` Farming, and the ``Task`` object has many attributes for use in customizing the ``Task`` behavior. All of a ``Task``'s attributes may be specified in the constructor, through keyword arguments, or after ``Task`` construction through attribute assignment. |
|
|||
172 |
|
||||
173 | Data Attributes |
|
|||
174 | *************** |
|
|||
175 | It is likely that the user may want to move data around before or after executing the ``Task``. We provide methods of sending data to initialize the worker's namespace, and specifying what data to bring back as the ``Task``'s results. |
|
|||
176 |
|
||||
177 | * pull = [] |
|
|||
178 | The obvious case is as above, where ``t`` would execute and store the result of ``myfunc`` in ``a``, it is likely that the user would want to bring ``a`` back to their namespace. This is done through the ``pull`` attribute. ``pull`` can be a string or list of strings, and it specifies the names of variables to be retrieved. The ``TaskResult`` object retrieved by ``get_task_result`` will have a dictionary of keys and values, and the ``Task``'s ``pull`` attribute determines what goes into it:: |
|
|||
179 |
|
||||
180 | In [2]: t = client.Task("a = str(id)", pull = "a") |
|
|||
181 |
|
||||
182 | In [3]: t = client.Task("a = str(id)", pull = ["a", "id"]) |
|
|||
183 |
|
||||
184 | * push = {} |
|
|||
185 | A user might also want to initialize some data into the namespace before the code part of the ``Task`` is run. Enter ``push``. ``push`` is a dictionary of key/value pairs to be loaded from the user's namespace into the worker's immediately before execution:: |
|
|||
186 |
|
||||
187 | In [4]: t = client.Task("a = f(submitted)", push=dict(submitted=time.time()), pull="a") |
|
|||
188 |
|
||||
189 | push and pull result directly in calling an ``engine``'s ``push`` and ``pull`` methods before and after ``Task`` execution respectively, and thus their api is the same. |
|
|||
190 |
|
||||
191 | Namespace Cleaning |
|
|||
192 | ****************** |
|
|||
193 | When a user is running a large number of Tasks, it is likely that the namespace of the worker's could become cluttered. Some Tasks might be sensitive to clutter, while others might be known to cause namespace pollution. For these reasons, Tasks have two boolean attributes for cleaning up the namespace. |
|
|||
194 |
|
||||
195 | * ``clear_after`` |
|
|||
196 | if clear_after is specified ``True``, the worker on which the ``Task`` was run will be reset (via ``engine.reset``) upon completion of the ``Task``. This can be useful for both Tasks that produce clutter or Tasks whose intermediate data one might wish to be kept private:: |
|
|||
197 |
|
||||
198 | In [5]: t = client.Task("a = range(1e10)", pull = "a",clear_after=True) |
|
|||
199 |
|
||||
200 |
|
39 | |||
201 | * ``clear_before`` |
|
40 | In[2]: mec = client.TaskClient('/path/to/my/ipcontroller-tc.furl') | |
202 | as one might guess, clear_before is identical to ``clear_after``, but it takes place before the ``Task`` is run. This ensures that the ``Task`` runs on a fresh worker:: |
|
|||
203 |
|
41 | |||
204 | In [6]: t = client.Task("a = globals()", pull = "a",clear_before=True) |
|
42 | Quick and easy parallelism | |
|
43 | ========================== | |||
205 |
|
44 | |||
206 | Of course, a user can both at the same time, ensuring that all workers are clear except when they are currently running a job. Both of these default to ``False``. |
|
45 | In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. Like the multiengine interface, the task interface provides two simple ways of accomplishing this: a parallel version of :func:`map` and ``@parallel`` function decorator. However, the verions in the task interface have one important difference: they are dynamically load balanced. Thus, if the execution time per item varies significantly, you should use the versions in the task interface. | |
207 |
|
46 | |||
208 | Fault Tolerance |
|
47 | Parallel map | |
209 | *************** |
|
48 | ------------ | |
210 | It is possible that Tasks might fail, and there are a variety of reasons this could happen. One might be that the worker it was running on disconnected, and there was nothing wrong with the ``Task`` itself. With the fault tolerance attributes of the ``Task``, the user can specify how many times to resubmit the ``Task``, and what to do if it never succeeds. |
|
|||
211 |
|
49 | |||
212 | * ``retries`` |
|
50 | The parallel :meth:`map` in the task interface is similar to that in the multiengine interface:: | |
213 | ``retries`` is an integer, specifying the number of times a ``Task`` is to be retried. It defaults to zero. It is often a good idea for this number to be 1 or 2, to protect the ``Task`` from disconnecting engines, but not a large number. If a ``Task`` is failing 100 times, there is probably something wrong with the ``Task``. The canonical bad example: |
|
|||
214 |
|
51 | |||
215 | In [7]: t = client.Task("os.kill(os.getpid(), 9)", retries=99) |
|
52 | In [63]: serial_result = map(lambda x:x**10, range(32)) | |
216 |
|
53 | |||
217 | This would actually take down 100 workers. |
|
54 | In [64]: parallel_result = tc.map(lambda x:x**10, range(32)) | |
218 |
|
55 | |||
219 | * ``recovery_task`` |
|
56 | In [65]: serial_result==parallel_result | |
220 | ``recovery_task`` is another ``Task`` object, to be run in the event of the original ``Task`` still failing after running out of retries. Since ``recovery_task`` is another ``Task`` object, it can have its own ``recovery_task``. The chain of Tasks is limitless, except loops are not allowed (that would be bad!). |
|
57 | Out[65]: True | |
221 |
|
58 | |||
222 | Dependencies |
|
59 | Parallel function decorator | |
223 | ************ |
|
60 | --------------------------- | |
224 | Dependencies are the most powerful part of the ``Task`` farming system, because it allows the user to do some classification of the workers, and guide the ``Task`` distribution without meddling with the controller directly. It makes use of two objects - the ``Task``'s ``depend`` attribute, and the engine's ``properties``. See the `MultiEngine`_ reference for how to use engine properties. The engine properties api exists for extending IPython, allowing conditional execution and new controllers that make decisions based on properties of its engines. Currently the ``Task`` dependency is the only internal use of the properties api. |
|
|||
225 |
|
61 | |||
226 | .. _MultiEngine: ./parallel_multiengine |
|
62 | Parallel functions are just like normal function, but they can be called on sequences and *in parallel*. The multiengine interface provides a decorator that turns any Python function into a parallel function:: | |
227 |
|
63 | |||
228 | The ``depend`` attribute of a ``Task`` must be a function of exactly one argument, the worker's properties dictionary, and it should return ``True`` if the ``Task`` should be allowed to run on the worker and ``False`` if not. The usage in the controller is fault tolerant, so exceptions raised by ``Task.depend`` will be ignored and functionally equivalent to always returning ``False``. Tasks`` with invalid ``depend`` functions will never be assigned to a worker:: |
|
64 | In [10]: @tc.parallel() | |
|
65 | ....: def f(x): | |||
|
66 | ....: return 10.0*x**4 | |||
|
67 | ....: | |||
229 |
|
68 | |||
230 | In [8]: def dep(properties): |
|
69 | In [11]: f(range(32)) # this is done in parallel | |
231 | ... return properties["RAM"] > 2**32 # have at least 4GB |
|
70 | Out[11]: | |
232 | In [9]: t = client.Task("a = bigfunc()", depend=dep) |
|
71 | [0.0,10.0,160.0,...] | |
233 |
|
72 | |||
234 | It is important to note that assignment of values to the properties dict is done entirely by the user, either locally (in the engine) using the EngineAPI, or remotely, through the ``MultiEngineClient``'s get/set_properties methods. |
|
73 | More details | |
|
74 | ============ | |||
235 |
|
75 | |||
|
76 | The :class:`TaskClient` has many more powerful features that allow quite a bit of flexibility in how tasks are defined and run. The next places to look are in the following classes: | |||
236 |
|
77 | |||
|
78 | * :class:`IPython.kernel.client.TaskClient` | |||
|
79 | * :class:`IPython.kernel.client.StringTask` | |||
|
80 | * :class:`IPython.kernel.client.MapTask` | |||
237 |
|
81 | |||
|
82 | The following is an overview of how to use these classes together: | |||
238 |
|
83 | |||
|
84 | 1. Create a :class:`TaskClient`. | |||
|
85 | 2. Create one or more instances of :class:`StringTask` or :class:`MapTask` | |||
|
86 | to define your tasks. | |||
|
87 | 3. Submit your tasks to using the :meth:`run` method of your | |||
|
88 | :class:`TaskClient` instance. | |||
|
89 | 4. Use :meth:`TaskClient.get_task_result` to get the results of the | |||
|
90 | tasks. | |||
239 |
|
91 | |||
|
92 | We are in the process of developing more detailed information about the task interface. For now, the docstrings of the :class:`TaskClient`, :class:`StringTask` and :class:`MapTask` classes should be consulted. | |||
240 |
|
93 |
General Comments 0
You need to be logged in to leave comments.
Login now