##// END OF EJS Templates
added dependencies & Python scheduler
MinRK -
Show More
@@ -0,0 +1,401
1 #----------------------------------------------------------------------
2 # Imports
3 #----------------------------------------------------------------------
4
5 from random import randint,random
6
7 try:
8 import numpy
9 except ImportError:
10 numpy = None
11
12 import zmq
13 from zmq.eventloop import ioloop, zmqstream
14
15 # local imports
16 from IPython.zmq.log import logger # a Logger object
17 from client import Client
18 from dependency import Dependency
19 import streamsession as ss
20
21 from IPython.external.decorator import decorator
22
23 @decorator
24 def logged(f,self,*args,**kwargs):
25 print ("#--------------------")
26 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
27 return f(self,*args, **kwargs)
28
29 #----------------------------------------------------------------------
30 # Chooser functions
31 #----------------------------------------------------------------------
32
33 def plainrandom(loads):
34 """Plain random pick."""
35 n = len(loads)
36 return randint(0,n-1)
37
38 def lru(loads):
39 """Always pick the front of the line.
40
41 The content of loads is ignored.
42
43 Assumes LRU ordering of loads, with oldest first.
44 """
45 return 0
46
47 def twobin(loads):
48 """Pick two at random, use the LRU of the two.
49
50 The content of loads is ignored.
51
52 Assumes LRU ordering of loads, with oldest first.
53 """
54 n = len(loads)
55 a = randint(0,n-1)
56 b = randint(0,n-1)
57 return min(a,b)
58
59 def weighted(loads):
60 """Pick two at random using inverse load as weight.
61
62 Return the less loaded of the two.
63 """
64 # weight 0 a million times more than 1:
65 weights = 1./(1e-6+numpy.array(loads))
66 sums = weights.cumsum()
67 t = sums[-1]
68 x = random()*t
69 y = random()*t
70 idx = 0
71 idy = 0
72 while sums[idx] < x:
73 idx += 1
74 while sums[idy] < y:
75 idy += 1
76 if weights[idy] > weights[idx]:
77 return idy
78 else:
79 return idx
80
81 def leastload(loads):
82 """Always choose the lowest load.
83
84 If the lowest load occurs more than once, the first
85 occurance will be used. If loads has LRU ordering, this means
86 the LRU of those with the lowest load is chosen.
87 """
88 return loads.index(min(loads))
89
90 #---------------------------------------------------------------------
91 # Classes
92 #---------------------------------------------------------------------
93 class TaskScheduler(object):
94 """Simple Python TaskScheduler object.
95
96 This is the simplest object that supports msg_id based
97 DAG dependencies. *Only* task msg_ids are checked, not
98 msg_ids of jobs submitted via the MUX queue.
99
100 """
101
102 scheme = leastload # function for determining the destination
103 client_stream = None # client-facing stream
104 engine_stream = None # engine-facing stream
105 mon_stream = None # controller-facing stream
106 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
107 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
108 pending = None # dict by engine_uuid of submitted tasks
109 completed = None # dict by engine_uuid of completed tasks
110 clients = None # dict by msg_id for who submitted the task
111 targets = None # list of target IDENTs
112 loads = None # list of engine loads
113 all_done = None # set of all completed tasks
114 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
115
116
117 def __init__(self, client_stream, engine_stream, mon_stream,
118 notifier_stream, scheme=None, io_loop=None):
119 if io_loop is None:
120 io_loop = ioloop.IOLoop.instance()
121 self.io_loop = io_loop
122 self.client_stream = client_stream
123 self.engine_stream = engine_stream
124 self.mon_stream = mon_stream
125 self.notifier_stream = notifier_stream
126
127 if scheme is not None:
128 self.scheme = scheme
129 else:
130 self.scheme = TaskScheduler.scheme
131
132 self.session = ss.StreamSession(username="TaskScheduler")
133
134 self.dependencies = {}
135 self.depending = {}
136 self.completed = {}
137 self.pending = {}
138 self.all_done = set()
139
140 self.targets = []
141 self.loads = []
142
143 engine_stream.on_recv(self.dispatch_result, copy=False)
144 self._notification_handlers = dict(
145 registration_notification = self._register_engine,
146 unregistration_notification = self._unregister_engine
147 )
148 self.notifier_stream.on_recv(self.dispatch_notification)
149
150 def resume_receiving(self):
151 """resume accepting jobs"""
152 self.client_stream.on_recv(self.dispatch_submission, copy=False)
153
154 def stop_receiving(self):
155 self.client_stream.on_recv(None)
156
157 #-----------------------------------------------------------------------
158 # [Un]Registration Handling
159 #-----------------------------------------------------------------------
160
161 def dispatch_notification(self, msg):
162 """dispatch register/unregister events."""
163 idents,msg = self.session.feed_identities(msg)
164 msg = self.session.unpack_message(msg)
165 msg_type = msg['msg_type']
166 handler = self._notification_handlers.get(msg_type, None)
167 if handler is None:
168 raise Exception("Unhandled message type: %s"%msg_type)
169 else:
170 try:
171 handler(str(msg['content']['queue']))
172 except KeyError:
173 logger.error("task::Invalid notification msg: %s"%msg)
174 @logged
175 def _register_engine(self, uid):
176 """new engine became available"""
177 # head of the line:
178 self.targets.insert(0,uid)
179 self.loads.insert(0,0)
180 # initialize sets
181 self.completed[uid] = set()
182 self.pending[uid] = {}
183 if len(self.targets) == 1:
184 self.resume_receiving()
185
186 def _unregister_engine(self, uid):
187 """existing engine became unavailable"""
188 # handle any potentially finished tasks:
189 if len(self.targets) == 1:
190 self.stop_receiving()
191 self.engine_stream.flush()
192
193 self.completed.pop(uid)
194 lost = self.pending.pop(uid)
195
196 idx = self.targets.index(uid)
197 self.targets.pop(idx)
198 self.loads.pop(idx)
199
200 self.handle_stranded_tasks(lost)
201
202 def handle_stranded_tasks(self, lost):
203 """deal with jobs resident in an engine that died."""
204 # TODO: resubmit the tasks?
205 for msg_id in lost:
206 pass
207
208
209 #-----------------------------------------------------------------------
210 # Job Submission
211 #-----------------------------------------------------------------------
212 @logged
213 def dispatch_submission(self, raw_msg):
214 """dispatch job submission"""
215 # ensure targets up to date:
216 self.notifier_stream.flush()
217 try:
218 idents, msg = self.session.feed_identities(raw_msg, copy=False)
219 except Exception, e:
220 logger.error("task::Invaid msg: %s"%msg)
221 return
222
223 msg = self.session.unpack_message(msg, content=False, copy=False)
224 print idents,msg
225 header = msg['header']
226 msg_id = header['msg_id']
227 after = Dependency(header.get('after', []))
228 if after.mode == 'all':
229 after.difference_update(self.all_done)
230 if after.check(self.all_done):
231 # recast as empty set, if we are already met,
232 # to prevent
233 after = Dependency([])
234
235 follow = Dependency(header.get('follow', []))
236 print raw_msg
237 if len(after) == 0:
238 # time deps already met, try to run
239 if not self.maybe_run(msg_id, raw_msg, follow):
240 # can't run yet
241 self.save_unmet(msg_id, raw_msg, after, follow)
242 else:
243 self.save_unmet(msg_id, raw_msg, after, follow)
244 # send to monitor
245 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
246 @logged
247 def maybe_run(self, msg_id, raw_msg, follow=None):
248 """check location dependencies, and run if they are met."""
249
250 if follow:
251 def can_run(idx):
252 target = self.targets[idx]
253 return target not in self.blacklist.get(msg_id, []) and\
254 follow.check(self.completed[target])
255
256 indices = filter(can_run, range(len(self.targets)))
257 if not indices:
258 return False
259 else:
260 indices = None
261
262 self.submit_task(msg_id, raw_msg, indices)
263 return True
264
265 @logged
266 def save_unmet(self, msg_id, msg, after, follow):
267 """Save a message for later submission when its dependencies are met."""
268 self.depending[msg_id] = (msg_id,msg,after,follow)
269 # track the ids in both follow/after, but not those already completed
270 for dep_id in after.union(follow).difference(self.all_done):
271 if dep_id not in self.dependencies:
272 self.dependencies[dep_id] = set()
273 self.dependencies[dep_id].add(msg_id)
274 @logged
275 def submit_task(self, msg_id, msg, follow=None, indices=None):
276 """submit a task to any of a subset of our targets"""
277 if indices:
278 loads = [self.loads[i] for i in indices]
279 else:
280 loads = self.loads
281 idx = self.scheme(loads)
282 if indices:
283 idx = indices[idx]
284 target = self.targets[idx]
285 print target, map(str, msg[:3])
286 self.engine_stream.socket.send(target, flags=zmq.SNDMORE, copy=False)
287 self.engine_stream.socket.send_multipart(msg, copy=False)
288 self.add_job(idx)
289 self.pending[target][msg_id] = (msg, follow)
290
291 #-----------------------------------------------------------------------
292 # Result Handling
293 #-----------------------------------------------------------------------
294 @logged
295 def dispatch_result(self, raw_msg):
296 try:
297 idents,msg = self.session.feed_identities(raw_msg, copy=False)
298 except Exception, e:
299 logger.error("task::Invaid result: %s"%msg)
300 return
301 msg = self.session.unpack_message(msg, content=False, copy=False)
302 header = msg['header']
303 if header.get('dependencies_met', True):
304 self.handle_result_success(idents, msg['parent_header'], raw_msg)
305 # send to monitor
306 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
307 else:
308 self.handle_unmet_dependency(self, idents, msg['parent_header'])
309
310 @logged
311 def handle_result_success(self, idents, parent, raw_msg):
312 # first, relay result to client
313 engine = idents[0]
314 client = idents[1]
315 # swap_ids for XREP-XREP mirror
316 raw_msg[:2] = [client,engine]
317 print map(str, raw_msg[:4])
318 self.client_stream.send_multipart(raw_msg, copy=False)
319 # now, update our data structures
320 msg_id = parent['msg_id']
321 self.pending[engine].pop(msg_id)
322 self.completed[engine].add(msg_id)
323
324 self.update_dependencies(msg_id)
325
326 @logged
327 def handle_unmet_dependency(self, idents, parent):
328 engine = idents[0]
329 msg_id = parent['msg_id']
330 if msg_id not in self.blacklist:
331 self.blacklist[msg_id] = set()
332 self.blacklist[msg_id].add(engine)
333 raw_msg,follow = self.pending[engine].pop(msg_id)
334 if not self.maybe_run(raw_msg, follow):
335 # resubmit failed, put it back in our dependency tree
336 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
337 pass
338 @logged
339 def update_dependencies(self, dep_id):
340 """dep_id just finished. Update our dependency
341 table and submit any jobs that just became runable."""
342 if dep_id not in self.dependencies:
343 return
344 jobs = self.dependencies.pop(dep_id)
345 for job in jobs:
346 msg_id, raw_msg, after, follow = self.depending[job]
347 if msg_id in after:
348 after.remove(msg_id)
349 if not after: # time deps met
350 if self.maybe_run(msg_id, raw_msg, follow):
351 self.depending.pop(job)
352 for mid in follow:
353 self.dependencies[mid].remove(msg_id)
354
355 #----------------------------------------------------------------------
356 # methods to be overridden by subclasses
357 #----------------------------------------------------------------------
358
359 def add_job(self, idx):
360 """Called after self.targets[idx] just got the job with header.
361 Override with subclasses. The default ordering is simple LRU.
362 The default loads are the number of outstanding jobs."""
363 self.loads[idx] += 1
364 for lis in (self.targets, self.loads):
365 lis.append(lis.pop(idx))
366
367
368 def finish_job(self, idx):
369 """Called after self.targets[idx] just finished a job.
370 Override with subclasses."""
371 self.loads[idx] -= 1
372
373
374
375 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
376 from zmq.eventloop import ioloop
377 from zmq.eventloop.zmqstream import ZMQStream
378
379 ctx = zmq.Context()
380 loop = ioloop.IOLoop()
381
382 scheme = globals().get(scheme)
383
384 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
385 ins.bind(in_addr)
386 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
387 outs.bind(out_addr)
388 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
389 mons.connect(mon_addr)
390 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
391 nots.setsockopt(zmq.SUBSCRIBE, '')
392 nots.connect(not_addr)
393
394 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
395
396 loop.start()
397
398
399 if __name__ == '__main__':
400 iface = 'tcp://127.0.0.1:%i'
401 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -2,19 +2,17
2 """A semi-synchronous Client for the ZMQ controller"""
2 """A semi-synchronous Client for the ZMQ controller"""
3
3
4 import time
4 import time
5 import threading
6
5
7 from pprint import pprint
6 from pprint import pprint
8
7
9 from functools import wraps
10
11 from IPython.external.decorator import decorator
8 from IPython.external.decorator import decorator
12
9
13 import streamsession as ss
10 import streamsession as ss
14 import zmq
11 import zmq
15
12 from zmq.eventloop import ioloop, zmqstream
16 from remotenamespace import RemoteNamespace
13 from remotenamespace import RemoteNamespace
17 from view import DirectView
14 from view import DirectView
15 from dependency import Dependency, depend, require
18
16
19 def _push(ns):
17 def _push(ns):
20 globals().update(ns)
18 globals().update(ns)
@@ -147,13 +145,13 class Client(object):
147 self.history = []
145 self.history = []
148 self.debug = debug
146 self.debug = debug
149 self.session.debug = debug
147 self.session.debug = debug
150 self._connect()
151
148
152 self._notification_handlers = {'registration_notification' : self._register_engine,
149 self._notification_handlers = {'registration_notification' : self._register_engine,
153 'unregistration_notification' : self._unregister_engine,
150 'unregistration_notification' : self._unregister_engine,
154 }
151 }
155 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
152 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
156 'apply_reply' : self._handle_apply_reply}
153 'apply_reply' : self._handle_apply_reply}
154 self._connect()
157
155
158
156
159 @property
157 @property
@@ -453,7 +451,8 class Client(object):
453 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
451 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
454 return result
452 return result
455
453
456 def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
454 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
455 after=None, follow=None):
457 """the underlying method for applying functions in a load balanced
456 """the underlying method for applying functions in a load balanced
458 manner."""
457 manner."""
459 block = block if block is not None else self.block
458 block = block if block is not None else self.block
@@ -471,17 +470,29 class Client(object):
471 else:
470 else:
472 return msg_id
471 return msg_id
473
472
474 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
473 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
474 after=None, follow=None):
475 """Then underlying method for applying functions to specific engines."""
475 """Then underlying method for applying functions to specific engines."""
476
476 block = block if block is not None else self.block
477 block = block if block is not None else self.block
478
477 queues,targets = self._build_targets(targets)
479 queues,targets = self._build_targets(targets)
478 print queues
480 print queues
479 bufs = ss.pack_apply_message(f,args,kwargs)
481 bufs = ss.pack_apply_message(f,args,kwargs)
482 if isinstance(after, Dependency):
483 after = after.as_dict()
484 elif after is None:
485 after = []
486 if isinstance(follow, Dependency):
487 follow = follow.as_dict()
488 elif follow is None:
489 follow = []
490 subheader = dict(after=after, follow=follow)
480 content = dict(bound=bound)
491 content = dict(bound=bound)
481 msg_ids = []
492 msg_ids = []
482 for queue in queues:
493 for queue in queues:
483 msg = self.session.send(self.queue_socket, "apply_request",
494 msg = self.session.send(self.queue_socket, "apply_request",
484 content=content, buffers=bufs,ident=queue)
495 content=content, buffers=bufs,ident=queue, subheader=subheader)
485 msg_id = msg['msg_id']
496 msg_id = msg['msg_id']
486 self.outstanding.add(msg_id)
497 self.outstanding.add(msg_id)
487 self.history.append(msg_id)
498 self.history.append(msg_id)
@@ -501,7 +512,8 class Client(object):
501 result[target] = self.results[mid]
512 result[target] = self.results[mid]
502 return result
513 return result
503
514
504 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None):
515 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
516 after=None, follow=None):
505 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
517 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
506
518
507 if self.block is False:
519 if self.block is False:
@@ -509,17 +521,22 class Client(object):
509 else:
521 else:
510 returns actual result of f(*args, **kwargs)
522 returns actual result of f(*args, **kwargs)
511 """
523 """
524 # enforce types of f,args,kwrags
512 args = args if args is not None else []
525 args = args if args is not None else []
513 kwargs = kwargs if kwargs is not None else {}
526 kwargs = kwargs if kwargs is not None else {}
527 if not callable(f):
528 raise TypeError("f must be callable, not %s"%type(f))
514 if not isinstance(args, (tuple, list)):
529 if not isinstance(args, (tuple, list)):
515 raise TypeError("args must be tuple or list, not %s"%type(args))
530 raise TypeError("args must be tuple or list, not %s"%type(args))
516 if not isinstance(kwargs, dict):
531 if not isinstance(kwargs, dict):
517 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
532 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
533
534 options = dict(bound=bound, block=block, after=after, follow=follow)
535
518 if targets is None:
536 if targets is None:
519 return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
537 return self._apply_balanced(f, args, kwargs, **options)
520 else:
538 else:
521 return self._apply_direct(f, args, kwargs,
539 return self._apply_direct(f, args, kwargs, targets=targets, **options)
522 bound=bound,block=block, targets=targets)
523
540
524 def push(self, ns, targets=None, block=None):
541 def push(self, ns, targets=None, block=None):
525 """push the contents of `ns` into the namespace on `target`"""
542 """push the contents of `ns` into the namespace on `target`"""
@@ -598,5 +615,26 class Client(object):
598 # else:
615 # else:
599 # break
616 # break
600 return msg['content']
617 return msg['content']
618
619 class AsynClient(Client):
620 """An Asynchronous client, using the Tornado Event Loop"""
621 io_loop = None
622 queue_stream = None
623 notifier_stream = None
624
625 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
626 Client.__init__(self, addr, context, username, debug)
627 if io_loop is None:
628 io_loop = ioloop.IOLoop.instance()
629 self.io_loop = io_loop
630
631 self.queue_stream = zmqstream.ZMQStream(self.queue_socket, io_loop)
632 self.control_stream = zmqstream.ZMQStream(self.control_socket, io_loop)
633 self.task_stream = zmqstream.ZMQStream(self.task_socket, io_loop)
634 self.notification_stream = zmqstream.ZMQStream(self.notification_socket, io_loop)
601
635
636 def spin(self):
637 for stream in (self.queue_stream, self.notifier_stream,
638 self.task_stream, self.control_stream):
639 stream.flush()
602 No newline at end of file
640
@@ -50,6 +50,55 class dependent(object):
50 raise UnmetDependency()
50 raise UnmetDependency()
51 return self.f(*args, **kwargs)
51 return self.f(*args, **kwargs)
52
52
53 def _require(*names):
54 for name in names:
55 try:
56 __import__(name)
57 except ImportError:
58 return False
59 return True
53
60
54 __all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies']
61 def require(*names):
62 return depend(_require, *names)
63
64 class Dependency(set):
65 """An object for representing a set of dependencies.
66
67 Subclassed from set()."""
68
69 mode='all'
70
71 def __init__(self, dependencies=[], mode='all'):
72 if isinstance(dependencies, dict):
73 # load from dict
74 dependencies = dependencies.get('dependencies', [])
75 mode = dependencies.get('mode', mode)
76 set.__init__(self, dependencies)
77 self.mode = mode.lower()
78 if self.mode not in ('any', 'all'):
79 raise NotImplementedError("Only any|all supported, not %r"%mode)
80
81 def check(self, completed):
82 if len(self) == 0:
83 return True
84 if self.mode == 'all':
85 for dep in self:
86 if dep not in completed:
87 return False
88 return True
89 elif self.mode == 'any':
90 for com in completed:
91 if com in self.dependencies:
92 return True
93 return False
94
95 def as_dict(self):
96 """Represent this dependency as a dict. For json compatibility."""
97 return dict(
98 dependencies=list(self),
99 mode=self.mode
100 )
101
102
103 __all__ = ['UnmetDependency', 'depend', 'require', 'Dependency']
55
104
General Comments 0
You need to be logged in to leave comments. Login now