##// END OF EJS Templates
dependency tweaks + dependency/scheduler docs
MinRK -
Show More
@@ -0,0 +1,172 b''
1 .. _dag_dependencies:
2
3 ================
4 DAG Dependencies
5 ================
6
7 Often, parallel workflow is described in terms of a `Directed Acyclic Graph
8 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library
9 for working with Graphs is NetworkX_. Here, we will walk through a demo mapping
10 a nx DAG to task dependencies.
11
12 The full script that runs this demo can be found in
13 :file:`docs/examples/newparallel/dagdeps.py`.
14
15 Why are DAGs good for task dependencies?
16 ----------------------------------------
17
18 The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect
19 the nodes. For our purposes, each node would be a task, and each edge would be a
20 dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a
21 direction associated with it. So we can interpret the edge (a,b) as meaning that b depends
22 on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that
23 there must not be any closed loops in the graph. This is important for dependencies,
24 because if a loop were closed, then a task could ultimately depend on itself, and never be
25 able to run. If your workflow can be described as a DAG, then it is impossible for your
26 dependencies to cause a deadlock.
27
28 A Sample DAG
29 ------------
30
31 Here, we have a very simple 5-node DAG:
32
33 .. figure:: simpledag.*
34
35 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
37 1 and 2; and 4 depends only on 1.
38
39 A possible sequence of events for this workflow:
40
41 0. Task 0 can run right away
42 1. 0 finishes, so 1,2 can start
43 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away
44 3. 2 finishes, and 3 can finally start
45
46
47 Further, taking failures into account, assuming all dependencies are run with the default
48 `success_only=True`, the following cases would occur for each node's failure:
49
50 0. fails: all other tasks fail as Impossible
51 1. 2 can still succeed, but 3,4 are unreachable
52 2. 3 becomes unreachable, but 4 is unaffected
53 3. and 4. are terminal, and can have no effect on other nodes
54
55 The code to generate the simple DAG:
56
57 .. sourcecode:: python
58
59 import networkx as nx
60
61 G = nx.DiGraph()
62
63 # add 5 nodes, labeled 0-4:
64 map(G.add_node, range(5))
65 # 1,2 depend on 0:
66 G.add_edge(0,1)
67 G.add_edge(0,2)
68 # 3 depends on 1,2
69 G.add_edge(1,3)
70 G.add_edge(2,3)
71 # 4 depends on 1
72 G.add_edge(1,4)
73
74 # now draw the graph:
75 pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
76 3 : (0,2), 4 : (2,2)}
77 nx.draw(G, pos, edge_color='r')
78
79
80 For demonstration purposes, we have a function that generates a random DAG with a given
81 number of nodes and edges.
82
83 .. literalinclude:: ../../examples/newparallel/dagdeps.py
84 :language: python
85 :lines: 20-36
86
87 So first, we start with a graph of 32 nodes, with 128 edges:
88
89 .. sourcecode:: ipython
90
91 In [2]: G = random_dag(32,128)
92
93 Now, we need to build our dict of jobs corresponding to the nodes on the graph:
94
95 .. sourcecode:: ipython
96
97 In [3]: jobs = {}
98
99 # in reality, each job would presumably be different
100 # randomwait is just a function that sleeps for a random interval
101 In [4]: for node in G:
102 ...: jobs[node] = randomwait
103
104 Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs,
105 and linking up the dependencies. Since we don't know a job's msg_id until it is submitted,
106 which is necessary for building dependencies, it is critical that we don't submit any jobs
107 before other jobs it may depend on. Fortunately, NetworkX provides a
108 :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that
109 guarantees that when you arrive at a node, you have already visited all the nodes it
110 on which it depends:
111
112 .. sourcecode:: ipython
113
114 In [5]: c = client.Client()
115
116 In [6]: results = {}
117
118 In [7]: for node in G.topological_sort():
119 ...: # get list of AsyncResult objects from nodes
120 ...: # leading into this one as dependencies
121 ...: deps = [ results[n] for n in G.predecessors(node) ]
122 ...: # submit and store AsyncResult object
123 ...: results[node] = client.apply(jobs[node], after=deps, block=False)
124
125 Now that we have submitted all the jobs, we can wait for the results:
126
127 .. sourcecode:: ipython
128
129 In [8]: [ r.get() for r in results.values() ]
130
131 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
132 raised an error if a task failed). But we don't know that the ordering was properly
133 respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult.
134
135 These objects store a variety of metadata about each task, including various timestamps.
136 We can validate that the dependencies were respected by checking that each task was
137 started after all of its predecessors were completed:
138
139 .. literalinclude:: ../../examples/newparallel/dagdeps.py
140 :language: python
141 :lines: 64-70
142
143 We can also validate the graph visually. By drawing the graph with each node's x-position
144 as its start time, all arrows must be pointing to the right if the order was respected.
145 For spreading, the y-position will be the in-degree, so tasks with lots of dependencies
146 will be at the top, and tasks with few dependencies will be at the bottom.
147
148 .. sourcecode:: ipython
149
150 In [10]: from matplotlib.dates import date2num
151
152 In [11]: from matplotlib.cm import gist_rainbow
153
154 In [12]: pos = {}; colors = {}
155
156 In [12]: for node in G:
157 ...: md = results[node].metadata
158 ...: start = date2num(md.started)
159 ...: runtime = date2num(md.completed) - start
160 ...: pos[node] = (start, runtime)
161 ...: colors[node] = md.engine_id
162
163 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
164 ...: cmap=gist_rainbow)
165
166 .. figure:: dagdeps.*
167
168 Time started on x, runtime on y, and color-coded by engine-id (in this case there
169 were four engines).
170
171
172 .. _NetworkX: http://networkx.lanl.gov/
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
@@ -863,14 +863,9 b' class Client(object):'
863 return dep.msg_ids
863 return dep.msg_ids
864 elif dep is None:
864 elif dep is None:
865 return []
865 return []
866 elif isinstance(dep, set):
867 return list(dep)
868 elif isinstance(dep, (list,dict)):
869 return dep
870 elif isinstance(dep, str):
871 return [dep]
872 else:
866 else:
873 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
867 # pass to Dependency constructor
868 return list(Dependency(dep))
874
869
875 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
870 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
876 after=None, follow=None, timeout=None):
871 after=None, follow=None, timeout=None):
@@ -921,9 +916,11 b' class Client(object):'
921 This job will only be run on an engine where this dependency
916 This job will only be run on an engine where this dependency
922 is met.
917 is met.
923
918
924 timeout : float or None
919 timeout : float/int or None
925 Only for load-balanced execution (targets=None)
920 Only for load-balanced execution (targets=None)
926 Specify an amount of time (in seconds)
921 Specify an amount of time (in seconds) for the scheduler to
922 wait for dependencies to be met before failing with a
923 DependencyTimeout.
927
924
928 Returns
925 Returns
929 -------
926 -------
@@ -950,9 +947,6 b' class Client(object):'
950 if not isinstance(kwargs, dict):
947 if not isinstance(kwargs, dict):
951 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
948 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
952
949
953 after = self._build_dependency(after)
954 follow = self._build_dependency(follow)
955
956 options = dict(bound=bound, block=block)
950 options = dict(bound=bound, block=block)
957
951
958 if targets is None:
952 if targets is None:
@@ -984,6 +978,8 b' class Client(object):'
984 warnings.warn(msg, RuntimeWarning)
978 warnings.warn(msg, RuntimeWarning)
985
979
986
980
981 after = self._build_dependency(after)
982 follow = self._build_dependency(follow)
987 subheader = dict(after=after, follow=follow, timeout=timeout)
983 subheader = dict(after=after, follow=follow, timeout=timeout)
988 bufs = ss.pack_apply_message(f,args,kwargs)
984 bufs = ss.pack_apply_message(f,args,kwargs)
989 content = dict(bound=bound)
985 content = dict(bound=bound)
@@ -2,13 +2,7 b''
2
2
3 from IPython.external.decorator import decorator
3 from IPython.external.decorator import decorator
4 from error import UnmetDependency
4 from error import UnmetDependency
5
5 from asyncresult import AsyncResult
6
7 # flags
8 ALL = 1 << 0
9 ANY = 1 << 1
10 HERE = 1 << 2
11 ANYWHERE = 1 << 3
12
6
13
7
14 class depend(object):
8 class depend(object):
@@ -59,53 +53,58 b' class Dependency(set):'
59
53
60 Subclassed from set()."""
54 Subclassed from set()."""
61
55
62 mode='all'
56 all=True
63 success_only=True
57 success_only=True
64
58
65 def __init__(self, dependencies=[], mode='all', success_only=True):
59 def __init__(self, dependencies=[], all=True, success_only=True):
66 if isinstance(dependencies, dict):
60 if isinstance(dependencies, dict):
67 # load from dict
61 # load from dict
68 mode = dependencies.get('mode', mode)
62 all = dependencies.get('all', True)
69 success_only = dependencies.get('success_only', success_only)
63 success_only = dependencies.get('success_only', success_only)
70 dependencies = dependencies.get('dependencies', [])
64 dependencies = dependencies.get('dependencies', [])
71 set.__init__(self, dependencies)
65 ids = []
72 self.mode = mode.lower()
66 if isinstance(dependencies, AsyncResult):
67 ids.extend(AsyncResult.msg_ids)
68 else:
69 for d in dependencies:
70 if isinstance(d, basestring):
71 ids.append(d)
72 elif isinstance(d, AsyncResult):
73 ids.extend(d.msg_ids)
74 else:
75 raise TypeError("invalid dependency type: %r"%type(d))
76 set.__init__(self, ids)
77 self.all = all
73 self.success_only=success_only
78 self.success_only=success_only
74 if self.mode not in ('any', 'all'):
75 raise NotImplementedError("Only any|all supported, not %r"%mode)
76
79
77 def check(self, completed, failed=None):
80 def check(self, completed, failed=None):
78 if failed is not None and not self.success_only:
81 if failed is not None and not self.success_only:
79 completed = completed.union(failed)
82 completed = completed.union(failed)
80 if len(self) == 0:
83 if len(self) == 0:
81 return True
84 return True
82 if self.mode == 'all':
85 if self.all:
83 return self.issubset(completed)
86 return self.issubset(completed)
84 elif self.mode == 'any':
85 return not self.isdisjoint(completed)
86 else:
87 else:
87 raise NotImplementedError("Only any|all supported, not %r"%mode)
88 return not self.isdisjoint(completed)
88
89
89 def unreachable(self, failed):
90 def unreachable(self, failed):
90 if len(self) == 0 or len(failed) == 0 or not self.success_only:
91 if len(self) == 0 or len(failed) == 0 or not self.success_only:
91 return False
92 return False
92 print self, self.success_only, self.mode, failed
93 # print self, self.success_only, self.all, failed
93 if self.mode == 'all':
94 if self.all:
94 return not self.isdisjoint(failed)
95 return not self.isdisjoint(failed)
95 elif self.mode == 'any':
96 return self.issubset(failed)
97 else:
96 else:
98 raise NotImplementedError("Only any|all supported, not %r"%mode)
97 return self.issubset(failed)
99
98
100
99
101 def as_dict(self):
100 def as_dict(self):
102 """Represent this dependency as a dict. For json compatibility."""
101 """Represent this dependency as a dict. For json compatibility."""
103 return dict(
102 return dict(
104 dependencies=list(self),
103 dependencies=list(self),
105 mode=self.mode,
104 all=self.all,
106 success_only=self.success_only,
105 success_only=self.success_only,
107 )
106 )
108
107
109
108
110 __all__ = ['depend', 'require', 'Dependency']
109 __all__ = ['depend', 'require', 'dependent', 'Dependency']
111
110
@@ -154,7 +154,10 b' class UnmetDependency(KernelError):'
154 class ImpossibleDependency(UnmetDependency):
154 class ImpossibleDependency(UnmetDependency):
155 pass
155 pass
156
156
157 class DependencyTimeout(UnmetDependency):
157 class DependencyTimeout(ImpossibleDependency):
158 pass
159
160 class InvalidDependency(ImpossibleDependency):
158 pass
161 pass
159
162
160 class RemoteError(KernelError):
163 class RemoteError(KernelError):
@@ -100,7 +100,7 b' class HubFactory(RegistrationFactory):'
100 """The Configurable for setting up a Hub."""
100 """The Configurable for setting up a Hub."""
101
101
102 # name of a scheduler scheme
102 # name of a scheduler scheme
103 scheme = Str('lru', config=True)
103 scheme = Str('leastload', config=True)
104
104
105 # port-pairs for monitoredqueues:
105 # port-pairs for monitoredqueues:
106 hb = Instance(list, config=True)
106 hb = Instance(list, config=True)
@@ -20,7 +20,9 b' import logging'
20 import os
20 import os
21 import signal
21 import signal
22 import logging
22 import logging
23 import errno
23
24
25 import zmq
24 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
25
27
26 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 from IPython.external.argparse import ArgumentParser, SUPPRESS
@@ -385,7 +387,8 b' class IPClusterApp(ApplicationWithClusterDir):'
385 # observing of engine stopping is inconsistent. Some launchers
387 # observing of engine stopping is inconsistent. Some launchers
386 # might trigger on a single engine stopping, other wait until
388 # might trigger on a single engine stopping, other wait until
387 # all stop. TODO: think more about how to handle this.
389 # all stop. TODO: think more about how to handle this.
388
390 else:
391 self.controller_launcher = None
389
392
390 el_class = import_item(config.Global.engine_launcher)
393 el_class = import_item(config.Global.engine_launcher)
391 self.engine_launcher = el_class(
394 self.engine_launcher = el_class(
@@ -427,7 +430,7 b' class IPClusterApp(ApplicationWithClusterDir):'
427
430
428 def stop_controller(self, r=None):
431 def stop_controller(self, r=None):
429 # self.log.info("In stop_controller")
432 # self.log.info("In stop_controller")
430 if self.controller_launcher.running:
433 if self.controller_launcher and self.controller_launcher.running:
431 return self.controller_launcher.stop()
434 return self.controller_launcher.stop()
432
435
433 def stop_engines(self, r=None):
436 def stop_engines(self, r=None):
@@ -516,8 +519,13 b' class IPClusterApp(ApplicationWithClusterDir):'
516 self.write_pid_file()
519 self.write_pid_file()
517 try:
520 try:
518 self.loop.start()
521 self.loop.start()
519 except:
522 except KeyboardInterrupt:
520 self.log.info("stopping...")
523 pass
524 except zmq.ZMQError as e:
525 if e.errno == errno.EINTR:
526 pass
527 else:
528 raise
521 self.remove_pid_file()
529 self.remove_pid_file()
522
530
523 def start_app_engines(self):
531 def start_app_engines(self):
@@ -539,8 +547,13 b' class IPClusterApp(ApplicationWithClusterDir):'
539 # self.write_pid_file()
547 # self.write_pid_file()
540 try:
548 try:
541 self.loop.start()
549 self.loop.start()
542 except:
550 except KeyboardInterrupt:
543 self.log.fatal("stopping...")
551 pass
552 except zmq.ZMQError as e:
553 if e.errno == errno.EINTR:
554 pass
555 else:
556 raise
544 # self.remove_pid_file()
557 # self.remove_pid_file()
545
558
546 def start_app_stop(self):
559 def start_app_stop(self):
@@ -127,7 +127,7 b' class TaskScheduler(SessionFactory):'
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128
128
129 # internals:
129 # internals:
130 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
130 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
131 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
131 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
132 pending = Dict() # dict by engine_uuid of submitted tasks
132 pending = Dict() # dict by engine_uuid of submitted tasks
133 completed = Dict() # dict by engine_uuid of completed tasks
133 completed = Dict() # dict by engine_uuid of completed tasks
@@ -139,6 +139,7 b' class TaskScheduler(SessionFactory):'
139 all_completed = Set() # set of all completed tasks
139 all_completed = Set() # set of all completed tasks
140 all_failed = Set() # set of all failed tasks
140 all_failed = Set() # set of all failed tasks
141 all_done = Set() # set of all finished tasks=union(completed,failed)
141 all_done = Set() # set of all finished tasks=union(completed,failed)
142 all_ids = Set() # set of all submitted task IDs
142 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
143 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
143 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
144 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
144
145
@@ -239,7 +240,7 b' class TaskScheduler(SessionFactory):'
239 msg = self.session.send(self.client_stream, 'apply_reply', content,
240 msg = self.session.send(self.client_stream, 'apply_reply', content,
240 parent=parent, ident=idents)
241 parent=parent, ident=idents)
241 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
242 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
242 self.update_dependencies(msg_id)
243 self.update_graph(msg_id)
243
244
244
245
245 #-----------------------------------------------------------------------
246 #-----------------------------------------------------------------------
@@ -252,20 +253,21 b' class TaskScheduler(SessionFactory):'
252 self.notifier_stream.flush()
253 self.notifier_stream.flush()
253 try:
254 try:
254 idents, msg = self.session.feed_identities(raw_msg, copy=False)
255 idents, msg = self.session.feed_identities(raw_msg, copy=False)
255 except Exception as e:
256 msg = self.session.unpack_message(msg, content=False, copy=False)
256 self.log.error("task::Invaid msg: %s"%msg)
257 except:
258 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
257 return
259 return
258
260
259 # send to monitor
261 # send to monitor
260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
262 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261
263
262 msg = self.session.unpack_message(msg, content=False, copy=False)
263 header = msg['header']
264 header = msg['header']
264 msg_id = header['msg_id']
265 msg_id = header['msg_id']
266 self.all_ids.add(msg_id)
265
267
266 # time dependencies
268 # time dependencies
267 after = Dependency(header.get('after', []))
269 after = Dependency(header.get('after', []))
268 if after.mode == 'all':
270 if after.all:
269 after.difference_update(self.all_completed)
271 after.difference_update(self.all_completed)
270 if not after.success_only:
272 if not after.success_only:
271 after.difference_update(self.all_failed)
273 after.difference_update(self.all_failed)
@@ -276,10 +278,16 b' class TaskScheduler(SessionFactory):'
276
278
277 # location dependencies
279 # location dependencies
278 follow = Dependency(header.get('follow', []))
280 follow = Dependency(header.get('follow', []))
279 # check if unreachable:
281
280 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
282 for dep in after,follow:
281 self.depending[msg_id] = [raw_msg,MET,MET,None]
283 # check valid:
282 return self.fail_unreachable(msg_id)
284 if msg_id in dep or dep.difference(self.all_ids):
285 self.depending[msg_id] = [raw_msg,MET,MET,None]
286 return self.fail_unreachable(msg_id, error.InvalidDependency)
287 # check if unreachable:
288 if dep.unreachable(self.all_failed):
289 self.depending[msg_id] = [raw_msg,MET,MET,None]
290 return self.fail_unreachable(msg_id)
283
291
284 # turn timeouts into datetime objects:
292 # turn timeouts into datetime objects:
285 timeout = header.get('timeout', None)
293 timeout = header.get('timeout', None)
@@ -288,7 +296,7 b' class TaskScheduler(SessionFactory):'
288
296
289 if after.check(self.all_completed, self.all_failed):
297 if after.check(self.all_completed, self.all_failed):
290 # time deps already met, try to run
298 # time deps already met, try to run
291 if not self.maybe_run(msg_id, raw_msg, follow):
299 if not self.maybe_run(msg_id, raw_msg, follow, timeout):
292 # can't run yet
300 # can't run yet
293 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
301 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
294 else:
302 else:
@@ -306,25 +314,23 b' class TaskScheduler(SessionFactory):'
306 self.fail_unreachable(msg_id, timeout=True)
314 self.fail_unreachable(msg_id, timeout=True)
307
315
308 @logged
316 @logged
309 def fail_unreachable(self, msg_id, timeout=False):
317 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
310 """a message has become unreachable"""
318 """a message has become unreachable"""
311 if msg_id not in self.depending:
319 if msg_id not in self.depending:
312 self.log.error("msg %r already failed!"%msg_id)
320 self.log.error("msg %r already failed!"%msg_id)
313 return
321 return
314 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
322 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
315 for mid in follow.union(after):
323 for mid in follow.union(after):
316 if mid in self.dependencies:
324 if mid in self.graph:
317 self.dependencies[mid].remove(msg_id)
325 self.graph[mid].remove(msg_id)
318
326
319 # FIXME: unpacking a message I've already unpacked, but didn't save:
327 # FIXME: unpacking a message I've already unpacked, but didn't save:
320 idents,msg = self.session.feed_identities(raw_msg, copy=False)
328 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 msg = self.session.unpack_message(msg, copy=False, content=False)
329 msg = self.session.unpack_message(msg, copy=False, content=False)
322 header = msg['header']
330 header = msg['header']
323
331
324 impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency
325
326 try:
332 try:
327 raise impossible()
333 raise why()
328 except:
334 except:
329 content = ss.wrap_exception()
335 content = ss.wrap_exception()
330
336
@@ -335,10 +341,10 b' class TaskScheduler(SessionFactory):'
335 parent=header, ident=idents)
341 parent=header, ident=idents)
336 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
342 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
337
343
338 self.update_dependencies(msg_id, success=False)
344 self.update_graph(msg_id, success=False)
339
345
340 @logged
346 @logged
341 def maybe_run(self, msg_id, raw_msg, follow=None):
347 def maybe_run(self, msg_id, raw_msg, follow=None, timeout=None):
342 """check location dependencies, and run if they are met."""
348 """check location dependencies, and run if they are met."""
343
349
344 if follow:
350 if follow:
@@ -349,8 +355,7 b' class TaskScheduler(SessionFactory):'
349
355
350 indices = filter(can_run, range(len(self.targets)))
356 indices = filter(can_run, range(len(self.targets)))
351 if not indices:
357 if not indices:
352 # TODO evaluate unmeetable follow dependencies
358 if follow.all:
353 if follow.mode == 'all':
354 dests = set()
359 dests = set()
355 relevant = self.all_completed if follow.success_only else self.all_done
360 relevant = self.all_completed if follow.success_only else self.all_done
356 for m in follow.intersection(relevant):
361 for m in follow.intersection(relevant):
@@ -363,7 +368,7 b' class TaskScheduler(SessionFactory):'
363 else:
368 else:
364 indices = None
369 indices = None
365
370
366 self.submit_task(msg_id, raw_msg, indices)
371 self.submit_task(msg_id, raw_msg, follow, timeout, indices)
367 return True
372 return True
368
373
369 @logged
374 @logged
@@ -372,12 +377,12 b' class TaskScheduler(SessionFactory):'
372 self.depending[msg_id] = [raw_msg,after,follow,timeout]
377 self.depending[msg_id] = [raw_msg,after,follow,timeout]
373 # track the ids in follow or after, but not those already finished
378 # track the ids in follow or after, but not those already finished
374 for dep_id in after.union(follow).difference(self.all_done):
379 for dep_id in after.union(follow).difference(self.all_done):
375 if dep_id not in self.dependencies:
380 if dep_id not in self.graph:
376 self.dependencies[dep_id] = set()
381 self.graph[dep_id] = set()
377 self.dependencies[dep_id].add(msg_id)
382 self.graph[dep_id].add(msg_id)
378
383
379 @logged
384 @logged
380 def submit_task(self, msg_id, raw_msg, follow=None, indices=None):
385 def submit_task(self, msg_id, raw_msg, follow, timeout, indices=None):
381 """Submit a task to any of a subset of our targets."""
386 """Submit a task to any of a subset of our targets."""
382 if indices:
387 if indices:
383 loads = [self.loads[i] for i in indices]
388 loads = [self.loads[i] for i in indices]
@@ -391,7 +396,7 b' class TaskScheduler(SessionFactory):'
391 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
396 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
392 self.engine_stream.send_multipart(raw_msg, copy=False)
397 self.engine_stream.send_multipart(raw_msg, copy=False)
393 self.add_job(idx)
398 self.add_job(idx)
394 self.pending[target][msg_id] = (raw_msg, follow)
399 self.pending[target][msg_id] = (raw_msg, follow, timeout)
395 content = dict(msg_id=msg_id, engine_id=target)
400 content = dict(msg_id=msg_id, engine_id=target)
396 self.session.send(self.mon_stream, 'task_destination', content=content,
401 self.session.send(self.mon_stream, 'task_destination', content=content,
397 ident=['tracktask',self.session.session])
402 ident=['tracktask',self.session.session])
@@ -403,10 +408,11 b' class TaskScheduler(SessionFactory):'
403 def dispatch_result(self, raw_msg):
408 def dispatch_result(self, raw_msg):
404 try:
409 try:
405 idents,msg = self.session.feed_identities(raw_msg, copy=False)
410 idents,msg = self.session.feed_identities(raw_msg, copy=False)
406 except Exception as e:
411 msg = self.session.unpack_message(msg, content=False, copy=False)
407 self.log.error("task::Invaid result: %s"%msg)
412 except:
413 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
408 return
414 return
409 msg = self.session.unpack_message(msg, content=False, copy=False)
415
410 header = msg['header']
416 header = msg['header']
411 if header.get('dependencies_met', True):
417 if header.get('dependencies_met', True):
412 success = (header['status'] == 'ok')
418 success = (header['status'] == 'ok')
@@ -438,7 +444,7 b' class TaskScheduler(SessionFactory):'
438 self.all_done.add(msg_id)
444 self.all_done.add(msg_id)
439 self.destinations[msg_id] = engine
445 self.destinations[msg_id] = engine
440
446
441 self.update_dependencies(msg_id, success)
447 self.update_graph(msg_id, success)
442
448
443 @logged
449 @logged
444 def handle_unmet_dependency(self, idents, parent):
450 def handle_unmet_dependency(self, idents, parent):
@@ -448,30 +454,30 b' class TaskScheduler(SessionFactory):'
448 self.blacklist[msg_id] = set()
454 self.blacklist[msg_id] = set()
449 self.blacklist[msg_id].add(engine)
455 self.blacklist[msg_id].add(engine)
450 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
456 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
451 if not self.maybe_run(msg_id, raw_msg, follow):
457 if not self.maybe_run(msg_id, raw_msg, follow, timeout):
452 # resubmit failed, put it back in our dependency tree
458 # resubmit failed, put it back in our dependency tree
453 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
459 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
454 pass
460 pass
455
461
456 @logged
462 @logged
457 def update_dependencies(self, dep_id, success=True):
463 def update_graph(self, dep_id, success=True):
458 """dep_id just finished. Update our dependency
464 """dep_id just finished. Update our dependency
459 table and submit any jobs that just became runable."""
465 table and submit any jobs that just became runable."""
460 # print ("\n\n***********")
466 # print ("\n\n***********")
461 # pprint (dep_id)
467 # pprint (dep_id)
462 # pprint (self.dependencies)
468 # pprint (self.graph)
463 # pprint (self.depending)
469 # pprint (self.depending)
464 # pprint (self.all_completed)
470 # pprint (self.all_completed)
465 # pprint (self.all_failed)
471 # pprint (self.all_failed)
466 # print ("\n\n***********\n\n")
472 # print ("\n\n***********\n\n")
467 if dep_id not in self.dependencies:
473 if dep_id not in self.graph:
468 return
474 return
469 jobs = self.dependencies.pop(dep_id)
475 jobs = self.graph.pop(dep_id)
470
476
471 for msg_id in jobs:
477 for msg_id in jobs:
472 raw_msg, after, follow, timeout = self.depending[msg_id]
478 raw_msg, after, follow, timeout = self.depending[msg_id]
473 # if dep_id in after:
479 # if dep_id in after:
474 # if after.mode == 'all' and (success or not after.success_only):
480 # if after.all and (success or not after.success_only):
475 # after.remove(dep_id)
481 # after.remove(dep_id)
476
482
477 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
483 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
@@ -479,12 +485,12 b' class TaskScheduler(SessionFactory):'
479
485
480 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
486 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
481 self.depending[msg_id][1] = MET
487 self.depending[msg_id][1] = MET
482 if self.maybe_run(msg_id, raw_msg, follow):
488 if self.maybe_run(msg_id, raw_msg, follow, timeout):
483
489
484 self.depending.pop(msg_id)
490 self.depending.pop(msg_id)
485 for mid in follow.union(after):
491 for mid in follow.union(after):
486 if mid in self.dependencies:
492 if mid in self.graph:
487 self.dependencies[mid].remove(msg_id)
493 self.graph[mid].remove(msg_id)
488
494
489 #----------------------------------------------------------------------
495 #----------------------------------------------------------------------
490 # methods to be overridden by subclasses
496 # methods to be overridden by subclasses
@@ -506,7 +512,8 b' class TaskScheduler(SessionFactory):'
506
512
507
513
508
514
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
515 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
516 log_addr=None, loglevel=logging.DEBUG, scheme='lru'):
510 from zmq.eventloop import ioloop
517 from zmq.eventloop import ioloop
511 from zmq.eventloop.zmqstream import ZMQStream
518 from zmq.eventloop.zmqstream import ZMQStream
512
519
@@ -228,7 +228,12 b' class DirectView(View):'
228 >>> dv_even = client[::2]
228 >>> dv_even = client[::2]
229 >>> dv_some = client[1:3]
229 >>> dv_some = client[1:3]
230
230
231 This object provides dictionary access
231 This object provides dictionary access to engine namespaces:
232
233 # push a=5:
234 >>> dv['a'] = 5
235 # pull 'foo':
236 >>> db['foo']
232
237
233 """
238 """
234
239
@@ -57,7 +57,7 b' def submit_jobs(client, G, jobs):'
57 """Submit jobs via client where G describes the time dependencies."""
57 """Submit jobs via client where G describes the time dependencies."""
58 results = {}
58 results = {}
59 for node in nx.topological_sort(G):
59 for node in nx.topological_sort(G):
60 deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ]
60 deps = [ results[n] for n in G.predecessors(node) ]
61 results[node] = client.apply(jobs[node], after=deps)
61 results[node] = client.apply(jobs[node], after=deps)
62 return results
62 return results
63
63
@@ -77,30 +77,34 b' def main(nodes, edges):'
77 point at least slightly to the right if the graph is valid.
77 point at least slightly to the right if the graph is valid.
78 """
78 """
79 from matplotlib.dates import date2num
79 from matplotlib.dates import date2num
80 from matplotlib.cm import gist_rainbow
80 print "building DAG"
81 print "building DAG"
81 G = random_dag(nodes, edges)
82 G = random_dag(nodes, edges)
82 jobs = {}
83 jobs = {}
83 pos = {}
84 pos = {}
85 colors = {}
84 for node in G:
86 for node in G:
85 jobs[node] = randomwait
87 jobs[node] = randomwait
86
88
87 client = cmod.Client()
89 client = cmod.Client()
88 print "submitting tasks"
90 print "submitting %i tasks with %i dependencies"%(nodes,edges)
89 results = submit_jobs(client, G, jobs)
91 results = submit_jobs(client, G, jobs)
90 print "waiting for results"
92 print "waiting for results"
91 client.barrier()
93 client.barrier()
92 print "done"
94 print "done"
93 for node in G:
95 for node in G:
94 # times[node] = results[node].get()
96 md = results[node].metadata
95 t = date2num(results[node].metadata.started)
97 start = date2num(md.started)
96 pos[node] = (t, G.in_degree(node)+random())
98 runtime = date2num(md.completed) - start
97
99 pos[node] = (start, runtime)
100 colors[node] = md.engine_id
98 validate_tree(G, results)
101 validate_tree(G, results)
99 nx.draw(G, pos)
102 nx.draw(G, pos, node_list = colors.keys(), node_color=colors.values(), cmap=gist_rainbow)
100 return G,results
103 return G,results
101
104
102 if __name__ == '__main__':
105 if __name__ == '__main__':
103 import pylab
106 import pylab
104 main(32,128)
107 # main(5,10)
108 main(32,96)
105 pylab.show()
109 pylab.show()
106 No newline at end of file
110
@@ -15,5 +15,6 b' Using IPython for parallel computing (ZMQ)'
15 parallel_security.txt
15 parallel_security.txt
16 parallel_winhpc.txt
16 parallel_winhpc.txt
17 parallel_demos.txt
17 parallel_demos.txt
18 dag_dependencies.txt
18
19
19
20
@@ -13,14 +13,7 b' Matplotlib package. IPython can be started in this mode by typing::'
13
13
14 ipython --pylab
14 ipython --pylab
15
15
16 at the system command line. If this prints an error message, you will
16 at the system command line.
17 need to install the default profiles from within IPython by doing,
18
19 .. sourcecode:: ipython
20
21 In [1]: %install_profiles
22
23 and then restarting IPython.
24
17
25 150 million digits of pi
18 150 million digits of pi
26 ========================
19 ========================
@@ -132,11 +132,11 b' The main method for doing remote execution (in fact, all methods that'
132 communicate with the engines are built on top of it), is :meth:`Client.apply`.
132 communicate with the engines are built on top of it), is :meth:`Client.apply`.
133 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
133 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
134 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
134 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
135 require some more options, they cannot reasonably provide this interface.
135 require some more options, they cannot easily provide this interface.
136 Instead, they provide the signature::
136 Instead, they provide the signature::
137
137
138 c.apply(f, args=None, kwargs=None, bound=True, block=None,
138 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
139 targets=None, after=None, follow=None)
139 after=None, follow=None, timeout=None)
140
140
141 In order to provide the nicer interface, we have :class:`View` classes, which wrap
141 In order to provide the nicer interface, we have :class:`View` classes, which wrap
142 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
142 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
@@ -184,7 +184,7 b' blocks until the engines are done executing the command:'
184 In [5]: dview['b'] = 10
184 In [5]: dview['b'] = 10
185
185
186 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
186 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
187 Out[6]: [42,42,42,42]
187 Out[6]: [42, 42, 42, 42]
188
188
189 Python commands can be executed on specific engines by calling execute using
189 Python commands can be executed on specific engines by calling execute using
190 the ``targets`` keyword argument, or creating a :class:`DirectView` instance
190 the ``targets`` keyword argument, or creating a :class:`DirectView` instance
@@ -197,7 +197,7 b' by index-access to the client:'
197 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
197 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
198
198
199 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
199 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
200 Out[8]: [15,-5,15,-5]
200 Out[8]: [15, -5, 15, -5]
201
201
202 .. note::
202 .. note::
203
203
@@ -258,7 +258,7 b' time through its :meth:`get` method.'
258
258
259 .. Note::
259 .. Note::
260
260
261 The :class:`AsyncResult` object provides the exact same interface as
261 The :class:`AsyncResult` object provides a superset of the interface in
262 :py:class:`multiprocessing.pool.AsyncResult`. See the
262 :py:class:`multiprocessing.pool.AsyncResult`. See the
263 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
263 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
264 for more.
264 for more.
@@ -270,15 +270,12 b' local Python/IPython session:'
270 .. sourcecode:: ipython
270 .. sourcecode:: ipython
271
271
272 # define our function
272 # define our function
273 In [35]: def wait(t):
273 In [6]: def wait(t):
274 ....: import time
274 ...: import time
275 ....: tic = time.time()
275 ...: tic = time.time()
276 ....: time.sleep(t)
276 ...: time.sleep(t)
277 ....: return time.time()-tic
277 ...: return time.time()-tic
278
278
279 # In blocking mode
280 In [6]: rc.apply('import time')
281
282 # In non-blocking mode
279 # In non-blocking mode
283 In [7]: pr = rc[:].apply_async(wait, 2)
280 In [7]: pr = rc[:].apply_async(wait, 2)
284
281
@@ -316,8 +313,8 b' local Python/IPython session:'
316
313
317 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
314 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
318 are done. For this, there is a the method :meth:`barrier`. This method takes a
315 are done. For this, there is a the method :meth:`barrier`. This method takes a
319 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the associated
316 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
320 results are ready:
317 associated results are ready:
321
318
322 .. sourcecode:: ipython
319 .. sourcecode:: ipython
323
320
@@ -329,7 +326,7 b' results are ready:'
329 # Wait until all of them are done
326 # Wait until all of them are done
330 In [74]: rc.barrier(pr_list)
327 In [74]: rc.barrier(pr_list)
331
328
332 # Then, their results are ready using get_result or the r attribute
329 # Then, their results are ready using get() or the `.r` attribute
333 In [75]: pr_list[0].get()
330 In [75]: pr_list[0].get()
334 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
331 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
335
332
@@ -320,4 +320,5 b' channel is established.'
320
320
321 .. [RFC5246] <http://tools.ietf.org/html/rfc5246>
321 .. [RFC5246] <http://tools.ietf.org/html/rfc5246>
322
322
323
323 .. [OpenSSH] <http://www.openssh.com/>
324 .. [Paramiko] <http://www.lag.net/paramiko/>
@@ -4,13 +4,13 b''
4 The IPython task interface
4 The IPython task interface
5 ==========================
5 ==========================
6
6
7 The task interface to the controller presents the engines as a fault tolerant,
7 The task interface to the cluster presents the engines as a fault tolerant,
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 the task interface, the user have no direct access to individual engines. By
9 the task interface the user have no direct access to individual engines. By
10 allowing the IPython scheduler to assign work, this interface is both simpler
10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 and more powerful.
11 simpler and more powerful.
12
12
13 Best of all the user can use both of these interfaces running at the same time
13 Best of all, the user can use both of these interfaces running at the same time
14 to take advantage of their respective strengths. When the user can break up
14 to take advantage of their respective strengths. When the user can break up
15 the user's work into segments that do not depend on previous execution, the
15 the user's work into segments that do not depend on previous execution, the
16 task interface is ideal. But it also has more power and flexibility, allowing
16 task interface is ideal. But it also has more power and flexibility, allowing
@@ -97,11 +97,275 b' that turns any Python function into a parallel function:'
97 In [10]: @lview.parallel()
97 In [10]: @lview.parallel()
98 ....: def f(x):
98 ....: def f(x):
99 ....: return 10.0*x**4
99 ....: return 10.0*x**4
100 ....:
100 ....:
101
101
102 In [11]: f.map(range(32)) # this is done in parallel
102 In [11]: f.map(range(32)) # this is done in parallel
103 Out[11]: [0.0,10.0,160.0,...]
103 Out[11]: [0.0,10.0,160.0,...]
104
104
105 Dependencies
106 ============
107
108 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
109 may want to associate some kind of `Dependency` that describes when, where, or whether
110 a task can be run. In IPython, we provide two types of dependencies:
111 `Functional Dependencies`_ and `Graph Dependencies`_
112
113 .. note::
114
115 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
116 and you will see errors or warnings if you try to use dependencies with the pure
117 scheduler.
118
119 Functional Dependencies
120 -----------------------
121
122 Functional dependencies are used to determine whether a given engine is capable of running
123 a particular task. This is implemented via a special :class:`Exception` class,
124 :class:`UnmetDependency`, found in `IPython.zmq.parallel.error`. Its use is very simple:
125 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
126 the error up to the client like any other error, catches the error, and submits the task
127 to a different engine. This will repeat indefinitely, and a task will never be submitted
128 to a given engine a second time.
129
130 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
131 some decorators for facilitating this behavior.
132
133 There are two decorators and a class used for functional dependencies:
134
135 .. sourcecode:: ipython
136
137 In [9]: from IPython.zmq.parallel.dependency import depend, require, dependent
138
139 @require
140 ********
141
142 The simplest sort of dependency is requiring that a Python module is available. The
143 ``@require`` decorator lets you define a function that will only run on engines where names
144 you specify are importable:
145
146 .. sourcecode:: ipython
147
148 In [10]: @require('numpy', 'zmq')
149 ...: def myfunc():
150 ...: import numpy,zmq
151 ...: return dostuff()
152
153 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
154 numpy and pyzmq available.
155
156 @depend
157 *******
158
159 The ``@depend`` decorator lets you decorate any function with any *other* function to
160 evaluate the dependency. The dependency function will be called at the start of the task,
161 and if it returns ``False``, then the dependency will be considered unmet, and the task
162 will be assigned to another engine. If the dependency returns *anything other than
163 ``False``*, the rest of the task will continue.
164
165 .. sourcecode:: ipython
166
167 In [10]: def platform_specific(plat):
168 ...: import sys
169 ...: return sys.platform == plat
170
171 In [11]: @depend(platform_specific, 'darwin')
172 ...: def mactask():
173 ...: do_mac_stuff()
174
175 In [12]: @depend(platform_specific, 'nt')
176 ...: def wintask():
177 ...: do_windows_stuff()
178
179 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
180 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
181 signature.
182
183 dependents
184 **********
185
186 You don't have to use the decorators on your tasks, if for instance you may want
187 to run tasks with a single function but varying dependencies, you can directly construct
188 the :class:`dependent` object that the decorators use:
189
190 .. sourcecode::ipython
191
192 In [13]: def mytask(*args):
193 ...: dostuff()
194
195 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
196 # this is the same as decorating the declaration of mytask with @depend
197 # but you can do it again:
198
199 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
200
201 # in general:
202 In [16]: t = dependent(f, g, *dargs, **dkwargs)
203
204 # is equivalent to:
205 In [17]: @depend(g, *dargs, **dkwargs)
206 ...: def t(a,b,c):
207 ...: # contents of f
208
209 Graph Dependencies
210 ------------------
211
212 Sometimes you want to restrict the time and/or location to run a given task as a function
213 of the time and/or location of other tasks. This is implemented via a subclass of
214 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
215 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
216 has been met.
217
218 The switches we provide for interpreting whether a given dependency set has been met:
219
220 any|all
221 Whether the dependency is considered met if *any* of the dependencies are done, or
222 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
223 boolean attribute, which defaults to ``True``.
224
225 success_only
226 Whether to consider only tasks that did not raise an error as being fulfilled.
227 Sometimes you want to run a task after another, but only if that task succeeded. In
228 this case, ``success_only`` should be ``True``. However sometimes you may not care
229 whether the task succeeds, and always want the second task to run, in which case
230 you should use `success_only=False`. The default behavior is to only use successes.
231
232 There are other switches for interpretation that are made at the *task* level. These are
233 specified via keyword arguments to the client's :meth:`apply` method.
234
235 after,follow
236 You may want to run a task *after* a given set of dependencies have been run and/or
237 run it *where* another set of dependencies are met. To support this, every task has an
238 `after` dependency to restrict time, and a `follow` dependency to restrict
239 destination.
240
241 timeout
242 You may also want to set a time-limit for how long the scheduler should wait before a
243 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
244 indicates that the task should never timeout. If the timeout is reached, and the
245 scheduler still hasn't been able to assign the task to an engine, the task will fail
246 with a :class:`DependencyTimeout`.
247
248 .. note::
249
250 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
251 task to run after a job submitted via the MUX interface.
252
253 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
254 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
255 `follow` and `after` keywords to :meth:`client.apply`:
256
257 .. sourcecode:: ipython
258
259 In [14]: client.block=False
260
261 In [15]: ar = client.apply(f, args, kwargs, targets=None)
262
263 In [16]: ar2 = client.apply(f2, targets=None)
264
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2])
266
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5)
268
269
270 .. seealso::
271
272 Some parallel workloads can be described as a `Directed Acyclic Graph
273 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
274 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
275 onto task dependencies.
276
277
278
279 Impossible Dependencies
280 ***********************
281
282 The schedulers do perform some analysis on graph dependencies to determine whether they
283 are not possible to be met. If the scheduler does discover that a dependency cannot be
284 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
285 scheduler realized that a task can never be run, it won't sit indefinitely in the
286 scheduler clogging the pipeline.
287
288 The basic cases that are checked:
289
290 * depending on nonexistent messages
291 * `follow` dependencies were run on more than one machine and `all=True`
292 * any dependencies failed and `all=True,success_only=True`
293 * all dependencies failed and `all=False,success_only=True`
294
295 .. warning::
296
297 This analysis has not been proven to be rigorous, so it is likely possible for tasks
298 to become impossible to run in obscure situations, so a timeout may be a good choice.
299
300 Schedulers
301 ==========
302
303 There are a variety of valid ways to determine where jobs should be assigned in a
304 load-balancing situation. In IPython, we support several standard schemes, and
305 even make it easy to define your own. The scheme can be selected via the ``--scheme``
306 argument to :command:`ipcontrollerz`, or in the :attr:`HubFactory.scheme` attribute
307 of a controller config object.
308
309 The built-in routing schemes:
310
311 lru: Least Recently Used
312
313 Always assign work to the least-recently-used engine. A close relative of
314 round-robin, it will be fair with respect to the number of tasks, agnostic
315 with respect to runtime of each task.
316
317 plainrandom: Plain Random
318 Randomly picks an engine on which to run.
319
320 twobin: Two-Bin Random
321
322 **Depends on numpy**
323
324 Pick two engines at random, and use the LRU of the two. This is known to be better
325 than plain random in many cases, but requires a small amount of computation.
326
327 leastload: Least Load
328
329 **This is the default scheme**
330
331 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
332
333 weighted: Weighted Two-Bin Random
334
335 **Depends on numpy**
336
337 Pick two engines at random using the number of outstanding tasks as inverse weights,
338 and use the one with the lower load.
339
340
341 Pure ZMQ Scheduler
342 ------------------
343
344 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
345 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
346 load-balancing. This scheduler does not support any of the advanced features of the Python
347 :class:`.Scheduler`.
348
349 Disabled features when using the ZMQ Scheduler:
350
351 * Engine unregistration
352 Task farming will be disabled if an engine unregisters.
353 Further, if an engine is unregistered during computation, the scheduler may not recover.
354 * Dependencies
355 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
356 based on message content.
357 * Early destination notification
358 The Python schedulers know which engine gets which task, and notify the Hub. This
359 allows graceful handling of Engines coming and going. There is no way to know
360 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
361 engine until they *finish*. This makes recovery from engine shutdown very difficult.
362
363
364 .. note::
365
366 TODO: performance comparisons
367
368
105 More details
369 More details
106 ============
370 ============
107
371
@@ -125,8 +389,7 b' The following is an overview of how to use these classes together:'
125 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
389 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
126 for and then receive the results.
390 for and then receive the results.
127
391
128 We are in the process of developing more detailed information about the task
129 interface. For now, the docstrings of the :meth:`Client.apply`,
130 and :func:`depend` methods should be consulted.
131
392
393 .. seealso::
132
394
395 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
@@ -123,7 +123,7 b' opening a Windows Command Prompt and typing ``ipython``. This will'
123 start IPython's interactive shell and you should see something like the
123 start IPython's interactive shell and you should see something like the
124 following screenshot:
124 following screenshot:
125
125
126 .. image:: ipython_shell.*
126 .. image:: ../parallel/ipython_shell.*
127
127
128 Starting an IPython cluster
128 Starting an IPython cluster
129 ===========================
129 ===========================
@@ -171,7 +171,7 b' You should see a number of messages printed to the screen, ending with'
171 "IPython cluster: started". The result should look something like the following
171 "IPython cluster: started". The result should look something like the following
172 screenshot:
172 screenshot:
173
173
174 .. image:: ipclusterz_start.*
174 .. image:: ../parallel/ipcluster_start.*
175
175
176 At this point, the controller and two engines are running on your local host.
176 At this point, the controller and two engines are running on your local host.
177 This configuration is useful for testing and for situations where you want to
177 This configuration is useful for testing and for situations where you want to
@@ -213,7 +213,7 b' The output of this command is shown in the screenshot below. Notice how'
213 :command:`ipclusterz` prints out the location of the newly created cluster
213 :command:`ipclusterz` prints out the location of the newly created cluster
214 directory.
214 directory.
215
215
216 .. image:: ipclusterz_create.*
216 .. image:: ../parallel/ipcluster_create.*
217
217
218 Configuring a cluster profile
218 Configuring a cluster profile
219 -----------------------------
219 -----------------------------
@@ -282,7 +282,7 b' must be run again to regenerate the XML job description files. The'
282 following screenshot shows what the HPC Job Manager interface looks like
282 following screenshot shows what the HPC Job Manager interface looks like
283 with a running IPython cluster.
283 with a running IPython cluster.
284
284
285 .. image:: hpc_job_manager.*
285 .. image:: ../parallel/hpc_job_manager.*
286
286
287 Performing a simple interactive parallel computation
287 Performing a simple interactive parallel computation
288 ====================================================
288 ====================================================
@@ -333,5 +333,5 b" The :meth:`map` method has the same signature as Python's builtin :func:`map`"
333 function, but runs the calculation in parallel. More involved examples of using
333 function, but runs the calculation in parallel. More involved examples of using
334 :class:`MultiEngineClient` are provided in the examples that follow.
334 :class:`MultiEngineClient` are provided in the examples that follow.
335
335
336 .. image:: mec_simple.*
336 .. image:: ../parallel/mec_simple.*
337
337
General Comments 0
You need to be logged in to leave comments. Login now