##// END OF EJS Templates
dependency tweaks + dependency/scheduler docs
MinRK -
Show More
@@ -0,0 +1,172 b''
1 .. _dag_dependencies:
2
3 ================
4 DAG Dependencies
5 ================
6
7 Often, parallel workflow is described in terms of a `Directed Acyclic Graph
8 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library
9 for working with Graphs is NetworkX_. Here, we will walk through a demo mapping
10 a nx DAG to task dependencies.
11
12 The full script that runs this demo can be found in
13 :file:`docs/examples/newparallel/dagdeps.py`.
14
15 Why are DAGs good for task dependencies?
16 ----------------------------------------
17
18 The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect
19 the nodes. For our purposes, each node would be a task, and each edge would be a
20 dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a
21 direction associated with it. So we can interpret the edge (a,b) as meaning that b depends
22 on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that
23 there must not be any closed loops in the graph. This is important for dependencies,
24 because if a loop were closed, then a task could ultimately depend on itself, and never be
25 able to run. If your workflow can be described as a DAG, then it is impossible for your
26 dependencies to cause a deadlock.
27
28 A Sample DAG
29 ------------
30
31 Here, we have a very simple 5-node DAG:
32
33 .. figure:: simpledag.*
34
35 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
37 1 and 2; and 4 depends only on 1.
38
39 A possible sequence of events for this workflow:
40
41 0. Task 0 can run right away
42 1. 0 finishes, so 1,2 can start
43 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away
44 3. 2 finishes, and 3 can finally start
45
46
47 Further, taking failures into account, assuming all dependencies are run with the default
48 `success_only=True`, the following cases would occur for each node's failure:
49
50 0. fails: all other tasks fail as Impossible
51 1. 2 can still succeed, but 3,4 are unreachable
52 2. 3 becomes unreachable, but 4 is unaffected
53 3. and 4. are terminal, and can have no effect on other nodes
54
55 The code to generate the simple DAG:
56
57 .. sourcecode:: python
58
59 import networkx as nx
60
61 G = nx.DiGraph()
62
63 # add 5 nodes, labeled 0-4:
64 map(G.add_node, range(5))
65 # 1,2 depend on 0:
66 G.add_edge(0,1)
67 G.add_edge(0,2)
68 # 3 depends on 1,2
69 G.add_edge(1,3)
70 G.add_edge(2,3)
71 # 4 depends on 1
72 G.add_edge(1,4)
73
74 # now draw the graph:
75 pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
76 3 : (0,2), 4 : (2,2)}
77 nx.draw(G, pos, edge_color='r')
78
79
80 For demonstration purposes, we have a function that generates a random DAG with a given
81 number of nodes and edges.
82
83 .. literalinclude:: ../../examples/newparallel/dagdeps.py
84 :language: python
85 :lines: 20-36
86
87 So first, we start with a graph of 32 nodes, with 128 edges:
88
89 .. sourcecode:: ipython
90
91 In [2]: G = random_dag(32,128)
92
93 Now, we need to build our dict of jobs corresponding to the nodes on the graph:
94
95 .. sourcecode:: ipython
96
97 In [3]: jobs = {}
98
99 # in reality, each job would presumably be different
100 # randomwait is just a function that sleeps for a random interval
101 In [4]: for node in G:
102 ...: jobs[node] = randomwait
103
104 Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs,
105 and linking up the dependencies. Since we don't know a job's msg_id until it is submitted,
106 which is necessary for building dependencies, it is critical that we don't submit any jobs
107 before other jobs it may depend on. Fortunately, NetworkX provides a
108 :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that
109 guarantees that when you arrive at a node, you have already visited all the nodes it
110 on which it depends:
111
112 .. sourcecode:: ipython
113
114 In [5]: c = client.Client()
115
116 In [6]: results = {}
117
118 In [7]: for node in G.topological_sort():
119 ...: # get list of AsyncResult objects from nodes
120 ...: # leading into this one as dependencies
121 ...: deps = [ results[n] for n in G.predecessors(node) ]
122 ...: # submit and store AsyncResult object
123 ...: results[node] = client.apply(jobs[node], after=deps, block=False)
124
125 Now that we have submitted all the jobs, we can wait for the results:
126
127 .. sourcecode:: ipython
128
129 In [8]: [ r.get() for r in results.values() ]
130
131 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
132 raised an error if a task failed). But we don't know that the ordering was properly
133 respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult.
134
135 These objects store a variety of metadata about each task, including various timestamps.
136 We can validate that the dependencies were respected by checking that each task was
137 started after all of its predecessors were completed:
138
139 .. literalinclude:: ../../examples/newparallel/dagdeps.py
140 :language: python
141 :lines: 64-70
142
143 We can also validate the graph visually. By drawing the graph with each node's x-position
144 as its start time, all arrows must be pointing to the right if the order was respected.
145 For spreading, the y-position will be the in-degree, so tasks with lots of dependencies
146 will be at the top, and tasks with few dependencies will be at the bottom.
147
148 .. sourcecode:: ipython
149
150 In [10]: from matplotlib.dates import date2num
151
152 In [11]: from matplotlib.cm import gist_rainbow
153
154 In [12]: pos = {}; colors = {}
155
156 In [12]: for node in G:
157 ...: md = results[node].metadata
158 ...: start = date2num(md.started)
159 ...: runtime = date2num(md.completed) - start
160 ...: pos[node] = (start, runtime)
161 ...: colors[node] = md.engine_id
162
163 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
164 ...: cmap=gist_rainbow)
165
166 .. figure:: dagdeps.*
167
168 Time started on x, runtime on y, and color-coded by engine-id (in this case there
169 were four engines).
170
171
172 .. _NetworkX: http://networkx.lanl.gov/
1 NO CONTENT: new file 100644, binary diff hidden
1 NO CONTENT: new file 100644, binary diff hidden
1 NO CONTENT: new file 100644, binary diff hidden
1 NO CONTENT: new file 100644, binary diff hidden
@@ -863,14 +863,9 b' class Client(object):'
863 863 return dep.msg_ids
864 864 elif dep is None:
865 865 return []
866 elif isinstance(dep, set):
867 return list(dep)
868 elif isinstance(dep, (list,dict)):
869 return dep
870 elif isinstance(dep, str):
871 return [dep]
872 866 else:
873 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
867 # pass to Dependency constructor
868 return list(Dependency(dep))
874 869
875 870 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
876 871 after=None, follow=None, timeout=None):
@@ -921,9 +916,11 b' class Client(object):'
921 916 This job will only be run on an engine where this dependency
922 917 is met.
923 918
924 timeout : float or None
919 timeout : float/int or None
925 920 Only for load-balanced execution (targets=None)
926 Specify an amount of time (in seconds)
921 Specify an amount of time (in seconds) for the scheduler to
922 wait for dependencies to be met before failing with a
923 DependencyTimeout.
927 924
928 925 Returns
929 926 -------
@@ -950,9 +947,6 b' class Client(object):'
950 947 if not isinstance(kwargs, dict):
951 948 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
952 949
953 after = self._build_dependency(after)
954 follow = self._build_dependency(follow)
955
956 950 options = dict(bound=bound, block=block)
957 951
958 952 if targets is None:
@@ -984,6 +978,8 b' class Client(object):'
984 978 warnings.warn(msg, RuntimeWarning)
985 979
986 980
981 after = self._build_dependency(after)
982 follow = self._build_dependency(follow)
987 983 subheader = dict(after=after, follow=follow, timeout=timeout)
988 984 bufs = ss.pack_apply_message(f,args,kwargs)
989 985 content = dict(bound=bound)
@@ -2,13 +2,7 b''
2 2
3 3 from IPython.external.decorator import decorator
4 4 from error import UnmetDependency
5
6
7 # flags
8 ALL = 1 << 0
9 ANY = 1 << 1
10 HERE = 1 << 2
11 ANYWHERE = 1 << 3
5 from asyncresult import AsyncResult
12 6
13 7
14 8 class depend(object):
@@ -59,53 +53,58 b' class Dependency(set):'
59 53
60 54 Subclassed from set()."""
61 55
62 mode='all'
56 all=True
63 57 success_only=True
64 58
65 def __init__(self, dependencies=[], mode='all', success_only=True):
59 def __init__(self, dependencies=[], all=True, success_only=True):
66 60 if isinstance(dependencies, dict):
67 61 # load from dict
68 mode = dependencies.get('mode', mode)
62 all = dependencies.get('all', True)
69 63 success_only = dependencies.get('success_only', success_only)
70 64 dependencies = dependencies.get('dependencies', [])
71 set.__init__(self, dependencies)
72 self.mode = mode.lower()
65 ids = []
66 if isinstance(dependencies, AsyncResult):
67 ids.extend(AsyncResult.msg_ids)
68 else:
69 for d in dependencies:
70 if isinstance(d, basestring):
71 ids.append(d)
72 elif isinstance(d, AsyncResult):
73 ids.extend(d.msg_ids)
74 else:
75 raise TypeError("invalid dependency type: %r"%type(d))
76 set.__init__(self, ids)
77 self.all = all
73 78 self.success_only=success_only
74 if self.mode not in ('any', 'all'):
75 raise NotImplementedError("Only any|all supported, not %r"%mode)
76 79
77 80 def check(self, completed, failed=None):
78 81 if failed is not None and not self.success_only:
79 82 completed = completed.union(failed)
80 83 if len(self) == 0:
81 84 return True
82 if self.mode == 'all':
85 if self.all:
83 86 return self.issubset(completed)
84 elif self.mode == 'any':
85 return not self.isdisjoint(completed)
86 87 else:
87 raise NotImplementedError("Only any|all supported, not %r"%mode)
88 return not self.isdisjoint(completed)
88 89
89 90 def unreachable(self, failed):
90 91 if len(self) == 0 or len(failed) == 0 or not self.success_only:
91 92 return False
92 print self, self.success_only, self.mode, failed
93 if self.mode == 'all':
93 # print self, self.success_only, self.all, failed
94 if self.all:
94 95 return not self.isdisjoint(failed)
95 elif self.mode == 'any':
96 return self.issubset(failed)
97 96 else:
98 raise NotImplementedError("Only any|all supported, not %r"%mode)
97 return self.issubset(failed)
99 98
100 99
101 100 def as_dict(self):
102 101 """Represent this dependency as a dict. For json compatibility."""
103 102 return dict(
104 103 dependencies=list(self),
105 mode=self.mode,
104 all=self.all,
106 105 success_only=self.success_only,
107 106 )
108 107
109 108
110 __all__ = ['depend', 'require', 'Dependency']
109 __all__ = ['depend', 'require', 'dependent', 'Dependency']
111 110
@@ -154,7 +154,10 b' class UnmetDependency(KernelError):'
154 154 class ImpossibleDependency(UnmetDependency):
155 155 pass
156 156
157 class DependencyTimeout(UnmetDependency):
157 class DependencyTimeout(ImpossibleDependency):
158 pass
159
160 class InvalidDependency(ImpossibleDependency):
158 161 pass
159 162
160 163 class RemoteError(KernelError):
@@ -100,7 +100,7 b' class HubFactory(RegistrationFactory):'
100 100 """The Configurable for setting up a Hub."""
101 101
102 102 # name of a scheduler scheme
103 scheme = Str('lru', config=True)
103 scheme = Str('leastload', config=True)
104 104
105 105 # port-pairs for monitoredqueues:
106 106 hb = Instance(list, config=True)
@@ -20,7 +20,9 b' import logging'
20 20 import os
21 21 import signal
22 22 import logging
23 import errno
23 24
25 import zmq
24 26 from zmq.eventloop import ioloop
25 27
26 28 from IPython.external.argparse import ArgumentParser, SUPPRESS
@@ -385,7 +387,8 b' class IPClusterApp(ApplicationWithClusterDir):'
385 387 # observing of engine stopping is inconsistent. Some launchers
386 388 # might trigger on a single engine stopping, other wait until
387 389 # all stop. TODO: think more about how to handle this.
388
390 else:
391 self.controller_launcher = None
389 392
390 393 el_class = import_item(config.Global.engine_launcher)
391 394 self.engine_launcher = el_class(
@@ -427,7 +430,7 b' class IPClusterApp(ApplicationWithClusterDir):'
427 430
428 431 def stop_controller(self, r=None):
429 432 # self.log.info("In stop_controller")
430 if self.controller_launcher.running:
433 if self.controller_launcher and self.controller_launcher.running:
431 434 return self.controller_launcher.stop()
432 435
433 436 def stop_engines(self, r=None):
@@ -516,8 +519,13 b' class IPClusterApp(ApplicationWithClusterDir):'
516 519 self.write_pid_file()
517 520 try:
518 521 self.loop.start()
519 except:
520 self.log.info("stopping...")
522 except KeyboardInterrupt:
523 pass
524 except zmq.ZMQError as e:
525 if e.errno == errno.EINTR:
526 pass
527 else:
528 raise
521 529 self.remove_pid_file()
522 530
523 531 def start_app_engines(self):
@@ -539,8 +547,13 b' class IPClusterApp(ApplicationWithClusterDir):'
539 547 # self.write_pid_file()
540 548 try:
541 549 self.loop.start()
542 except:
543 self.log.fatal("stopping...")
550 except KeyboardInterrupt:
551 pass
552 except zmq.ZMQError as e:
553 if e.errno == errno.EINTR:
554 pass
555 else:
556 raise
544 557 # self.remove_pid_file()
545 558
546 559 def start_app_stop(self):
@@ -127,7 +127,7 b' class TaskScheduler(SessionFactory):'
127 127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128 128
129 129 # internals:
130 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
130 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
131 131 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
132 132 pending = Dict() # dict by engine_uuid of submitted tasks
133 133 completed = Dict() # dict by engine_uuid of completed tasks
@@ -139,6 +139,7 b' class TaskScheduler(SessionFactory):'
139 139 all_completed = Set() # set of all completed tasks
140 140 all_failed = Set() # set of all failed tasks
141 141 all_done = Set() # set of all finished tasks=union(completed,failed)
142 all_ids = Set() # set of all submitted task IDs
142 143 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
143 144 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
144 145
@@ -239,7 +240,7 b' class TaskScheduler(SessionFactory):'
239 240 msg = self.session.send(self.client_stream, 'apply_reply', content,
240 241 parent=parent, ident=idents)
241 242 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
242 self.update_dependencies(msg_id)
243 self.update_graph(msg_id)
243 244
244 245
245 246 #-----------------------------------------------------------------------
@@ -252,20 +253,21 b' class TaskScheduler(SessionFactory):'
252 253 self.notifier_stream.flush()
253 254 try:
254 255 idents, msg = self.session.feed_identities(raw_msg, copy=False)
255 except Exception as e:
256 self.log.error("task::Invaid msg: %s"%msg)
256 msg = self.session.unpack_message(msg, content=False, copy=False)
257 except:
258 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
257 259 return
258 260
259 261 # send to monitor
260 262 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261 263
262 msg = self.session.unpack_message(msg, content=False, copy=False)
263 264 header = msg['header']
264 265 msg_id = header['msg_id']
266 self.all_ids.add(msg_id)
265 267
266 268 # time dependencies
267 269 after = Dependency(header.get('after', []))
268 if after.mode == 'all':
270 if after.all:
269 271 after.difference_update(self.all_completed)
270 272 if not after.success_only:
271 273 after.difference_update(self.all_failed)
@@ -276,10 +278,16 b' class TaskScheduler(SessionFactory):'
276 278
277 279 # location dependencies
278 280 follow = Dependency(header.get('follow', []))
279 # check if unreachable:
280 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
281 self.depending[msg_id] = [raw_msg,MET,MET,None]
282 return self.fail_unreachable(msg_id)
281
282 for dep in after,follow:
283 # check valid:
284 if msg_id in dep or dep.difference(self.all_ids):
285 self.depending[msg_id] = [raw_msg,MET,MET,None]
286 return self.fail_unreachable(msg_id, error.InvalidDependency)
287 # check if unreachable:
288 if dep.unreachable(self.all_failed):
289 self.depending[msg_id] = [raw_msg,MET,MET,None]
290 return self.fail_unreachable(msg_id)
283 291
284 292 # turn timeouts into datetime objects:
285 293 timeout = header.get('timeout', None)
@@ -288,7 +296,7 b' class TaskScheduler(SessionFactory):'
288 296
289 297 if after.check(self.all_completed, self.all_failed):
290 298 # time deps already met, try to run
291 if not self.maybe_run(msg_id, raw_msg, follow):
299 if not self.maybe_run(msg_id, raw_msg, follow, timeout):
292 300 # can't run yet
293 301 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
294 302 else:
@@ -306,25 +314,23 b' class TaskScheduler(SessionFactory):'
306 314 self.fail_unreachable(msg_id, timeout=True)
307 315
308 316 @logged
309 def fail_unreachable(self, msg_id, timeout=False):
317 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
310 318 """a message has become unreachable"""
311 319 if msg_id not in self.depending:
312 320 self.log.error("msg %r already failed!"%msg_id)
313 321 return
314 322 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
315 323 for mid in follow.union(after):
316 if mid in self.dependencies:
317 self.dependencies[mid].remove(msg_id)
324 if mid in self.graph:
325 self.graph[mid].remove(msg_id)
318 326
319 327 # FIXME: unpacking a message I've already unpacked, but didn't save:
320 328 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 329 msg = self.session.unpack_message(msg, copy=False, content=False)
322 330 header = msg['header']
323 331
324 impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency
325
326 332 try:
327 raise impossible()
333 raise why()
328 334 except:
329 335 content = ss.wrap_exception()
330 336
@@ -335,10 +341,10 b' class TaskScheduler(SessionFactory):'
335 341 parent=header, ident=idents)
336 342 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
337 343
338 self.update_dependencies(msg_id, success=False)
344 self.update_graph(msg_id, success=False)
339 345
340 346 @logged
341 def maybe_run(self, msg_id, raw_msg, follow=None):
347 def maybe_run(self, msg_id, raw_msg, follow=None, timeout=None):
342 348 """check location dependencies, and run if they are met."""
343 349
344 350 if follow:
@@ -349,8 +355,7 b' class TaskScheduler(SessionFactory):'
349 355
350 356 indices = filter(can_run, range(len(self.targets)))
351 357 if not indices:
352 # TODO evaluate unmeetable follow dependencies
353 if follow.mode == 'all':
358 if follow.all:
354 359 dests = set()
355 360 relevant = self.all_completed if follow.success_only else self.all_done
356 361 for m in follow.intersection(relevant):
@@ -363,7 +368,7 b' class TaskScheduler(SessionFactory):'
363 368 else:
364 369 indices = None
365 370
366 self.submit_task(msg_id, raw_msg, indices)
371 self.submit_task(msg_id, raw_msg, follow, timeout, indices)
367 372 return True
368 373
369 374 @logged
@@ -372,12 +377,12 b' class TaskScheduler(SessionFactory):'
372 377 self.depending[msg_id] = [raw_msg,after,follow,timeout]
373 378 # track the ids in follow or after, but not those already finished
374 379 for dep_id in after.union(follow).difference(self.all_done):
375 if dep_id not in self.dependencies:
376 self.dependencies[dep_id] = set()
377 self.dependencies[dep_id].add(msg_id)
380 if dep_id not in self.graph:
381 self.graph[dep_id] = set()
382 self.graph[dep_id].add(msg_id)
378 383
379 384 @logged
380 def submit_task(self, msg_id, raw_msg, follow=None, indices=None):
385 def submit_task(self, msg_id, raw_msg, follow, timeout, indices=None):
381 386 """Submit a task to any of a subset of our targets."""
382 387 if indices:
383 388 loads = [self.loads[i] for i in indices]
@@ -391,7 +396,7 b' class TaskScheduler(SessionFactory):'
391 396 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
392 397 self.engine_stream.send_multipart(raw_msg, copy=False)
393 398 self.add_job(idx)
394 self.pending[target][msg_id] = (raw_msg, follow)
399 self.pending[target][msg_id] = (raw_msg, follow, timeout)
395 400 content = dict(msg_id=msg_id, engine_id=target)
396 401 self.session.send(self.mon_stream, 'task_destination', content=content,
397 402 ident=['tracktask',self.session.session])
@@ -403,10 +408,11 b' class TaskScheduler(SessionFactory):'
403 408 def dispatch_result(self, raw_msg):
404 409 try:
405 410 idents,msg = self.session.feed_identities(raw_msg, copy=False)
406 except Exception as e:
407 self.log.error("task::Invaid result: %s"%msg)
411 msg = self.session.unpack_message(msg, content=False, copy=False)
412 except:
413 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
408 414 return
409 msg = self.session.unpack_message(msg, content=False, copy=False)
415
410 416 header = msg['header']
411 417 if header.get('dependencies_met', True):
412 418 success = (header['status'] == 'ok')
@@ -438,7 +444,7 b' class TaskScheduler(SessionFactory):'
438 444 self.all_done.add(msg_id)
439 445 self.destinations[msg_id] = engine
440 446
441 self.update_dependencies(msg_id, success)
447 self.update_graph(msg_id, success)
442 448
443 449 @logged
444 450 def handle_unmet_dependency(self, idents, parent):
@@ -448,30 +454,30 b' class TaskScheduler(SessionFactory):'
448 454 self.blacklist[msg_id] = set()
449 455 self.blacklist[msg_id].add(engine)
450 456 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
451 if not self.maybe_run(msg_id, raw_msg, follow):
457 if not self.maybe_run(msg_id, raw_msg, follow, timeout):
452 458 # resubmit failed, put it back in our dependency tree
453 459 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
454 460 pass
455 461
456 462 @logged
457 def update_dependencies(self, dep_id, success=True):
463 def update_graph(self, dep_id, success=True):
458 464 """dep_id just finished. Update our dependency
459 465 table and submit any jobs that just became runable."""
460 466 # print ("\n\n***********")
461 467 # pprint (dep_id)
462 # pprint (self.dependencies)
468 # pprint (self.graph)
463 469 # pprint (self.depending)
464 470 # pprint (self.all_completed)
465 471 # pprint (self.all_failed)
466 472 # print ("\n\n***********\n\n")
467 if dep_id not in self.dependencies:
473 if dep_id not in self.graph:
468 474 return
469 jobs = self.dependencies.pop(dep_id)
475 jobs = self.graph.pop(dep_id)
470 476
471 477 for msg_id in jobs:
472 478 raw_msg, after, follow, timeout = self.depending[msg_id]
473 479 # if dep_id in after:
474 # if after.mode == 'all' and (success or not after.success_only):
480 # if after.all and (success or not after.success_only):
475 481 # after.remove(dep_id)
476 482
477 483 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
@@ -479,12 +485,12 b' class TaskScheduler(SessionFactory):'
479 485
480 486 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
481 487 self.depending[msg_id][1] = MET
482 if self.maybe_run(msg_id, raw_msg, follow):
488 if self.maybe_run(msg_id, raw_msg, follow, timeout):
483 489
484 490 self.depending.pop(msg_id)
485 491 for mid in follow.union(after):
486 if mid in self.dependencies:
487 self.dependencies[mid].remove(msg_id)
492 if mid in self.graph:
493 self.graph[mid].remove(msg_id)
488 494
489 495 #----------------------------------------------------------------------
490 496 # methods to be overridden by subclasses
@@ -506,7 +512,8 b' class TaskScheduler(SessionFactory):'
506 512
507 513
508 514
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
515 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
516 log_addr=None, loglevel=logging.DEBUG, scheme='lru'):
510 517 from zmq.eventloop import ioloop
511 518 from zmq.eventloop.zmqstream import ZMQStream
512 519
@@ -228,7 +228,12 b' class DirectView(View):'
228 228 >>> dv_even = client[::2]
229 229 >>> dv_some = client[1:3]
230 230
231 This object provides dictionary access
231 This object provides dictionary access to engine namespaces:
232
233 # push a=5:
234 >>> dv['a'] = 5
235 # pull 'foo':
236 >>> db['foo']
232 237
233 238 """
234 239
@@ -57,7 +57,7 b' def submit_jobs(client, G, jobs):'
57 57 """Submit jobs via client where G describes the time dependencies."""
58 58 results = {}
59 59 for node in nx.topological_sort(G):
60 deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ]
60 deps = [ results[n] for n in G.predecessors(node) ]
61 61 results[node] = client.apply(jobs[node], after=deps)
62 62 return results
63 63
@@ -77,30 +77,34 b' def main(nodes, edges):'
77 77 point at least slightly to the right if the graph is valid.
78 78 """
79 79 from matplotlib.dates import date2num
80 from matplotlib.cm import gist_rainbow
80 81 print "building DAG"
81 82 G = random_dag(nodes, edges)
82 83 jobs = {}
83 84 pos = {}
85 colors = {}
84 86 for node in G:
85 87 jobs[node] = randomwait
86 88
87 89 client = cmod.Client()
88 print "submitting tasks"
90 print "submitting %i tasks with %i dependencies"%(nodes,edges)
89 91 results = submit_jobs(client, G, jobs)
90 92 print "waiting for results"
91 93 client.barrier()
92 94 print "done"
93 95 for node in G:
94 # times[node] = results[node].get()
95 t = date2num(results[node].metadata.started)
96 pos[node] = (t, G.in_degree(node)+random())
97
96 md = results[node].metadata
97 start = date2num(md.started)
98 runtime = date2num(md.completed) - start
99 pos[node] = (start, runtime)
100 colors[node] = md.engine_id
98 101 validate_tree(G, results)
99 nx.draw(G, pos)
102 nx.draw(G, pos, node_list = colors.keys(), node_color=colors.values(), cmap=gist_rainbow)
100 103 return G,results
101 104
102 105 if __name__ == '__main__':
103 106 import pylab
104 main(32,128)
107 # main(5,10)
108 main(32,96)
105 109 pylab.show()
106 110 No newline at end of file
@@ -15,5 +15,6 b' Using IPython for parallel computing (ZMQ)'
15 15 parallel_security.txt
16 16 parallel_winhpc.txt
17 17 parallel_demos.txt
18 dag_dependencies.txt
18 19
19 20
@@ -13,14 +13,7 b' Matplotlib package. IPython can be started in this mode by typing::'
13 13
14 14 ipython --pylab
15 15
16 at the system command line. If this prints an error message, you will
17 need to install the default profiles from within IPython by doing,
18
19 .. sourcecode:: ipython
20
21 In [1]: %install_profiles
22
23 and then restarting IPython.
16 at the system command line.
24 17
25 18 150 million digits of pi
26 19 ========================
@@ -132,11 +132,11 b' The main method for doing remote execution (in fact, all methods that'
132 132 communicate with the engines are built on top of it), is :meth:`Client.apply`.
133 133 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
134 134 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
135 require some more options, they cannot reasonably provide this interface.
135 require some more options, they cannot easily provide this interface.
136 136 Instead, they provide the signature::
137 137
138 c.apply(f, args=None, kwargs=None, bound=True, block=None,
139 targets=None, after=None, follow=None)
138 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
139 after=None, follow=None, timeout=None)
140 140
141 141 In order to provide the nicer interface, we have :class:`View` classes, which wrap
142 142 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
@@ -184,7 +184,7 b' blocks until the engines are done executing the command:'
184 184 In [5]: dview['b'] = 10
185 185
186 186 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
187 Out[6]: [42,42,42,42]
187 Out[6]: [42, 42, 42, 42]
188 188
189 189 Python commands can be executed on specific engines by calling execute using
190 190 the ``targets`` keyword argument, or creating a :class:`DirectView` instance
@@ -197,7 +197,7 b' by index-access to the client:'
197 197 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
198 198
199 199 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
200 Out[8]: [15,-5,15,-5]
200 Out[8]: [15, -5, 15, -5]
201 201
202 202 .. note::
203 203
@@ -258,7 +258,7 b' time through its :meth:`get` method.'
258 258
259 259 .. Note::
260 260
261 The :class:`AsyncResult` object provides the exact same interface as
261 The :class:`AsyncResult` object provides a superset of the interface in
262 262 :py:class:`multiprocessing.pool.AsyncResult`. See the
263 263 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
264 264 for more.
@@ -270,15 +270,12 b' local Python/IPython session:'
270 270 .. sourcecode:: ipython
271 271
272 272 # define our function
273 In [35]: def wait(t):
274 ....: import time
275 ....: tic = time.time()
276 ....: time.sleep(t)
277 ....: return time.time()-tic
273 In [6]: def wait(t):
274 ...: import time
275 ...: tic = time.time()
276 ...: time.sleep(t)
277 ...: return time.time()-tic
278 278
279 # In blocking mode
280 In [6]: rc.apply('import time')
281
282 279 # In non-blocking mode
283 280 In [7]: pr = rc[:].apply_async(wait, 2)
284 281
@@ -316,8 +313,8 b' local Python/IPython session:'
316 313
317 314 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
318 315 are done. For this, there is a the method :meth:`barrier`. This method takes a
319 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the associated
320 results are ready:
316 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
317 associated results are ready:
321 318
322 319 .. sourcecode:: ipython
323 320
@@ -329,7 +326,7 b' results are ready:'
329 326 # Wait until all of them are done
330 327 In [74]: rc.barrier(pr_list)
331 328
332 # Then, their results are ready using get_result or the r attribute
329 # Then, their results are ready using get() or the `.r` attribute
333 330 In [75]: pr_list[0].get()
334 331 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
335 332
@@ -320,4 +320,5 b' channel is established.'
320 320
321 321 .. [RFC5246] <http://tools.ietf.org/html/rfc5246>
322 322
323
323 .. [OpenSSH] <http://www.openssh.com/>
324 .. [Paramiko] <http://www.lag.net/paramiko/>
@@ -4,13 +4,13 b''
4 4 The IPython task interface
5 5 ==========================
6 6
7 The task interface to the controller presents the engines as a fault tolerant,
7 The task interface to the cluster presents the engines as a fault tolerant,
8 8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 the task interface, the user have no direct access to individual engines. By
10 allowing the IPython scheduler to assign work, this interface is both simpler
11 and more powerful.
9 the task interface the user have no direct access to individual engines. By
10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 simpler and more powerful.
12 12
13 Best of all the user can use both of these interfaces running at the same time
13 Best of all, the user can use both of these interfaces running at the same time
14 14 to take advantage of their respective strengths. When the user can break up
15 15 the user's work into segments that do not depend on previous execution, the
16 16 task interface is ideal. But it also has more power and flexibility, allowing
@@ -97,11 +97,275 b' that turns any Python function into a parallel function:'
97 97 In [10]: @lview.parallel()
98 98 ....: def f(x):
99 99 ....: return 10.0*x**4
100 ....:
100 ....:
101 101
102 102 In [11]: f.map(range(32)) # this is done in parallel
103 103 Out[11]: [0.0,10.0,160.0,...]
104 104
105 Dependencies
106 ============
107
108 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
109 may want to associate some kind of `Dependency` that describes when, where, or whether
110 a task can be run. In IPython, we provide two types of dependencies:
111 `Functional Dependencies`_ and `Graph Dependencies`_
112
113 .. note::
114
115 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
116 and you will see errors or warnings if you try to use dependencies with the pure
117 scheduler.
118
119 Functional Dependencies
120 -----------------------
121
122 Functional dependencies are used to determine whether a given engine is capable of running
123 a particular task. This is implemented via a special :class:`Exception` class,
124 :class:`UnmetDependency`, found in `IPython.zmq.parallel.error`. Its use is very simple:
125 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
126 the error up to the client like any other error, catches the error, and submits the task
127 to a different engine. This will repeat indefinitely, and a task will never be submitted
128 to a given engine a second time.
129
130 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
131 some decorators for facilitating this behavior.
132
133 There are two decorators and a class used for functional dependencies:
134
135 .. sourcecode:: ipython
136
137 In [9]: from IPython.zmq.parallel.dependency import depend, require, dependent
138
139 @require
140 ********
141
142 The simplest sort of dependency is requiring that a Python module is available. The
143 ``@require`` decorator lets you define a function that will only run on engines where names
144 you specify are importable:
145
146 .. sourcecode:: ipython
147
148 In [10]: @require('numpy', 'zmq')
149 ...: def myfunc():
150 ...: import numpy,zmq
151 ...: return dostuff()
152
153 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
154 numpy and pyzmq available.
155
156 @depend
157 *******
158
159 The ``@depend`` decorator lets you decorate any function with any *other* function to
160 evaluate the dependency. The dependency function will be called at the start of the task,
161 and if it returns ``False``, then the dependency will be considered unmet, and the task
162 will be assigned to another engine. If the dependency returns *anything other than
163 ``False``*, the rest of the task will continue.
164
165 .. sourcecode:: ipython
166
167 In [10]: def platform_specific(plat):
168 ...: import sys
169 ...: return sys.platform == plat
170
171 In [11]: @depend(platform_specific, 'darwin')
172 ...: def mactask():
173 ...: do_mac_stuff()
174
175 In [12]: @depend(platform_specific, 'nt')
176 ...: def wintask():
177 ...: do_windows_stuff()
178
179 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
180 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
181 signature.
182
183 dependents
184 **********
185
186 You don't have to use the decorators on your tasks, if for instance you may want
187 to run tasks with a single function but varying dependencies, you can directly construct
188 the :class:`dependent` object that the decorators use:
189
190 .. sourcecode::ipython
191
192 In [13]: def mytask(*args):
193 ...: dostuff()
194
195 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
196 # this is the same as decorating the declaration of mytask with @depend
197 # but you can do it again:
198
199 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
200
201 # in general:
202 In [16]: t = dependent(f, g, *dargs, **dkwargs)
203
204 # is equivalent to:
205 In [17]: @depend(g, *dargs, **dkwargs)
206 ...: def t(a,b,c):
207 ...: # contents of f
208
209 Graph Dependencies
210 ------------------
211
212 Sometimes you want to restrict the time and/or location to run a given task as a function
213 of the time and/or location of other tasks. This is implemented via a subclass of
214 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
215 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
216 has been met.
217
218 The switches we provide for interpreting whether a given dependency set has been met:
219
220 any|all
221 Whether the dependency is considered met if *any* of the dependencies are done, or
222 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
223 boolean attribute, which defaults to ``True``.
224
225 success_only
226 Whether to consider only tasks that did not raise an error as being fulfilled.
227 Sometimes you want to run a task after another, but only if that task succeeded. In
228 this case, ``success_only`` should be ``True``. However sometimes you may not care
229 whether the task succeeds, and always want the second task to run, in which case
230 you should use `success_only=False`. The default behavior is to only use successes.
231
232 There are other switches for interpretation that are made at the *task* level. These are
233 specified via keyword arguments to the client's :meth:`apply` method.
234
235 after,follow
236 You may want to run a task *after* a given set of dependencies have been run and/or
237 run it *where* another set of dependencies are met. To support this, every task has an
238 `after` dependency to restrict time, and a `follow` dependency to restrict
239 destination.
240
241 timeout
242 You may also want to set a time-limit for how long the scheduler should wait before a
243 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
244 indicates that the task should never timeout. If the timeout is reached, and the
245 scheduler still hasn't been able to assign the task to an engine, the task will fail
246 with a :class:`DependencyTimeout`.
247
248 .. note::
249
250 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
251 task to run after a job submitted via the MUX interface.
252
253 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
254 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
255 `follow` and `after` keywords to :meth:`client.apply`:
256
257 .. sourcecode:: ipython
258
259 In [14]: client.block=False
260
261 In [15]: ar = client.apply(f, args, kwargs, targets=None)
262
263 In [16]: ar2 = client.apply(f2, targets=None)
264
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2])
266
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5)
268
269
270 .. seealso::
271
272 Some parallel workloads can be described as a `Directed Acyclic Graph
273 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
274 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
275 onto task dependencies.
276
277
278
279 Impossible Dependencies
280 ***********************
281
282 The schedulers do perform some analysis on graph dependencies to determine whether they
283 are not possible to be met. If the scheduler does discover that a dependency cannot be
284 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
285 scheduler realized that a task can never be run, it won't sit indefinitely in the
286 scheduler clogging the pipeline.
287
288 The basic cases that are checked:
289
290 * depending on nonexistent messages
291 * `follow` dependencies were run on more than one machine and `all=True`
292 * any dependencies failed and `all=True,success_only=True`
293 * all dependencies failed and `all=False,success_only=True`
294
295 .. warning::
296
297 This analysis has not been proven to be rigorous, so it is likely possible for tasks
298 to become impossible to run in obscure situations, so a timeout may be a good choice.
299
300 Schedulers
301 ==========
302
303 There are a variety of valid ways to determine where jobs should be assigned in a
304 load-balancing situation. In IPython, we support several standard schemes, and
305 even make it easy to define your own. The scheme can be selected via the ``--scheme``
306 argument to :command:`ipcontrollerz`, or in the :attr:`HubFactory.scheme` attribute
307 of a controller config object.
308
309 The built-in routing schemes:
310
311 lru: Least Recently Used
312
313 Always assign work to the least-recently-used engine. A close relative of
314 round-robin, it will be fair with respect to the number of tasks, agnostic
315 with respect to runtime of each task.
316
317 plainrandom: Plain Random
318 Randomly picks an engine on which to run.
319
320 twobin: Two-Bin Random
321
322 **Depends on numpy**
323
324 Pick two engines at random, and use the LRU of the two. This is known to be better
325 than plain random in many cases, but requires a small amount of computation.
326
327 leastload: Least Load
328
329 **This is the default scheme**
330
331 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
332
333 weighted: Weighted Two-Bin Random
334
335 **Depends on numpy**
336
337 Pick two engines at random using the number of outstanding tasks as inverse weights,
338 and use the one with the lower load.
339
340
341 Pure ZMQ Scheduler
342 ------------------
343
344 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
345 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
346 load-balancing. This scheduler does not support any of the advanced features of the Python
347 :class:`.Scheduler`.
348
349 Disabled features when using the ZMQ Scheduler:
350
351 * Engine unregistration
352 Task farming will be disabled if an engine unregisters.
353 Further, if an engine is unregistered during computation, the scheduler may not recover.
354 * Dependencies
355 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
356 based on message content.
357 * Early destination notification
358 The Python schedulers know which engine gets which task, and notify the Hub. This
359 allows graceful handling of Engines coming and going. There is no way to know
360 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
361 engine until they *finish*. This makes recovery from engine shutdown very difficult.
362
363
364 .. note::
365
366 TODO: performance comparisons
367
368
105 369 More details
106 370 ============
107 371
@@ -125,8 +389,7 b' The following is an overview of how to use these classes together:'
125 389 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
126 390 for and then receive the results.
127 391
128 We are in the process of developing more detailed information about the task
129 interface. For now, the docstrings of the :meth:`Client.apply`,
130 and :func:`depend` methods should be consulted.
131 392
393 .. seealso::
132 394
395 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
@@ -123,7 +123,7 b' opening a Windows Command Prompt and typing ``ipython``. This will'
123 123 start IPython's interactive shell and you should see something like the
124 124 following screenshot:
125 125
126 .. image:: ipython_shell.*
126 .. image:: ../parallel/ipython_shell.*
127 127
128 128 Starting an IPython cluster
129 129 ===========================
@@ -171,7 +171,7 b' You should see a number of messages printed to the screen, ending with'
171 171 "IPython cluster: started". The result should look something like the following
172 172 screenshot:
173 173
174 .. image:: ipclusterz_start.*
174 .. image:: ../parallel/ipcluster_start.*
175 175
176 176 At this point, the controller and two engines are running on your local host.
177 177 This configuration is useful for testing and for situations where you want to
@@ -213,7 +213,7 b' The output of this command is shown in the screenshot below. Notice how'
213 213 :command:`ipclusterz` prints out the location of the newly created cluster
214 214 directory.
215 215
216 .. image:: ipclusterz_create.*
216 .. image:: ../parallel/ipcluster_create.*
217 217
218 218 Configuring a cluster profile
219 219 -----------------------------
@@ -282,7 +282,7 b' must be run again to regenerate the XML job description files. The'
282 282 following screenshot shows what the HPC Job Manager interface looks like
283 283 with a running IPython cluster.
284 284
285 .. image:: hpc_job_manager.*
285 .. image:: ../parallel/hpc_job_manager.*
286 286
287 287 Performing a simple interactive parallel computation
288 288 ====================================================
@@ -333,5 +333,5 b" The :meth:`map` method has the same signature as Python's builtin :func:`map`"
333 333 function, but runs the calculation in parallel. More involved examples of using
334 334 :class:`MultiEngineClient` are provided in the examples that follow.
335 335
336 .. image:: mec_simple.*
336 .. image:: ../parallel/mec_simple.*
337 337
General Comments 0
You need to be logged in to leave comments. Login now