Show More
@@ -0,0 +1,401 | |||||
|
1 | #---------------------------------------------------------------------- | |||
|
2 | # Imports | |||
|
3 | #---------------------------------------------------------------------- | |||
|
4 | ||||
|
5 | from random import randint,random | |||
|
6 | ||||
|
7 | try: | |||
|
8 | import numpy | |||
|
9 | except ImportError: | |||
|
10 | numpy = None | |||
|
11 | ||||
|
12 | import zmq | |||
|
13 | from zmq.eventloop import ioloop, zmqstream | |||
|
14 | ||||
|
15 | # local imports | |||
|
16 | from IPython.zmq.log import logger # a Logger object | |||
|
17 | from client import Client | |||
|
18 | from dependency import Dependency | |||
|
19 | import streamsession as ss | |||
|
20 | ||||
|
21 | from IPython.external.decorator import decorator | |||
|
22 | ||||
|
23 | @decorator | |||
|
24 | def logged(f,self,*args,**kwargs): | |||
|
25 | print ("#--------------------") | |||
|
26 | print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |||
|
27 | return f(self,*args, **kwargs) | |||
|
28 | ||||
|
29 | #---------------------------------------------------------------------- | |||
|
30 | # Chooser functions | |||
|
31 | #---------------------------------------------------------------------- | |||
|
32 | ||||
|
33 | def plainrandom(loads): | |||
|
34 | """Plain random pick.""" | |||
|
35 | n = len(loads) | |||
|
36 | return randint(0,n-1) | |||
|
37 | ||||
|
38 | def lru(loads): | |||
|
39 | """Always pick the front of the line. | |||
|
40 | ||||
|
41 | The content of loads is ignored. | |||
|
42 | ||||
|
43 | Assumes LRU ordering of loads, with oldest first. | |||
|
44 | """ | |||
|
45 | return 0 | |||
|
46 | ||||
|
47 | def twobin(loads): | |||
|
48 | """Pick two at random, use the LRU of the two. | |||
|
49 | ||||
|
50 | The content of loads is ignored. | |||
|
51 | ||||
|
52 | Assumes LRU ordering of loads, with oldest first. | |||
|
53 | """ | |||
|
54 | n = len(loads) | |||
|
55 | a = randint(0,n-1) | |||
|
56 | b = randint(0,n-1) | |||
|
57 | return min(a,b) | |||
|
58 | ||||
|
59 | def weighted(loads): | |||
|
60 | """Pick two at random using inverse load as weight. | |||
|
61 | ||||
|
62 | Return the less loaded of the two. | |||
|
63 | """ | |||
|
64 | # weight 0 a million times more than 1: | |||
|
65 | weights = 1./(1e-6+numpy.array(loads)) | |||
|
66 | sums = weights.cumsum() | |||
|
67 | t = sums[-1] | |||
|
68 | x = random()*t | |||
|
69 | y = random()*t | |||
|
70 | idx = 0 | |||
|
71 | idy = 0 | |||
|
72 | while sums[idx] < x: | |||
|
73 | idx += 1 | |||
|
74 | while sums[idy] < y: | |||
|
75 | idy += 1 | |||
|
76 | if weights[idy] > weights[idx]: | |||
|
77 | return idy | |||
|
78 | else: | |||
|
79 | return idx | |||
|
80 | ||||
|
81 | def leastload(loads): | |||
|
82 | """Always choose the lowest load. | |||
|
83 | ||||
|
84 | If the lowest load occurs more than once, the first | |||
|
85 | occurance will be used. If loads has LRU ordering, this means | |||
|
86 | the LRU of those with the lowest load is chosen. | |||
|
87 | """ | |||
|
88 | return loads.index(min(loads)) | |||
|
89 | ||||
|
90 | #--------------------------------------------------------------------- | |||
|
91 | # Classes | |||
|
92 | #--------------------------------------------------------------------- | |||
|
93 | class TaskScheduler(object): | |||
|
94 | """Simple Python TaskScheduler object. | |||
|
95 | ||||
|
96 | This is the simplest object that supports msg_id based | |||
|
97 | DAG dependencies. *Only* task msg_ids are checked, not | |||
|
98 | msg_ids of jobs submitted via the MUX queue. | |||
|
99 | ||||
|
100 | """ | |||
|
101 | ||||
|
102 | scheme = leastload # function for determining the destination | |||
|
103 | client_stream = None # client-facing stream | |||
|
104 | engine_stream = None # engine-facing stream | |||
|
105 | mon_stream = None # controller-facing stream | |||
|
106 | dependencies = None # dict by msg_id of [ msg_ids that depend on key ] | |||
|
107 | depending = None # dict by msg_id of (msg_id, raw_msg, after, follow) | |||
|
108 | pending = None # dict by engine_uuid of submitted tasks | |||
|
109 | completed = None # dict by engine_uuid of completed tasks | |||
|
110 | clients = None # dict by msg_id for who submitted the task | |||
|
111 | targets = None # list of target IDENTs | |||
|
112 | loads = None # list of engine loads | |||
|
113 | all_done = None # set of all completed tasks | |||
|
114 | blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency | |||
|
115 | ||||
|
116 | ||||
|
117 | def __init__(self, client_stream, engine_stream, mon_stream, | |||
|
118 | notifier_stream, scheme=None, io_loop=None): | |||
|
119 | if io_loop is None: | |||
|
120 | io_loop = ioloop.IOLoop.instance() | |||
|
121 | self.io_loop = io_loop | |||
|
122 | self.client_stream = client_stream | |||
|
123 | self.engine_stream = engine_stream | |||
|
124 | self.mon_stream = mon_stream | |||
|
125 | self.notifier_stream = notifier_stream | |||
|
126 | ||||
|
127 | if scheme is not None: | |||
|
128 | self.scheme = scheme | |||
|
129 | else: | |||
|
130 | self.scheme = TaskScheduler.scheme | |||
|
131 | ||||
|
132 | self.session = ss.StreamSession(username="TaskScheduler") | |||
|
133 | ||||
|
134 | self.dependencies = {} | |||
|
135 | self.depending = {} | |||
|
136 | self.completed = {} | |||
|
137 | self.pending = {} | |||
|
138 | self.all_done = set() | |||
|
139 | ||||
|
140 | self.targets = [] | |||
|
141 | self.loads = [] | |||
|
142 | ||||
|
143 | engine_stream.on_recv(self.dispatch_result, copy=False) | |||
|
144 | self._notification_handlers = dict( | |||
|
145 | registration_notification = self._register_engine, | |||
|
146 | unregistration_notification = self._unregister_engine | |||
|
147 | ) | |||
|
148 | self.notifier_stream.on_recv(self.dispatch_notification) | |||
|
149 | ||||
|
150 | def resume_receiving(self): | |||
|
151 | """resume accepting jobs""" | |||
|
152 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | |||
|
153 | ||||
|
154 | def stop_receiving(self): | |||
|
155 | self.client_stream.on_recv(None) | |||
|
156 | ||||
|
157 | #----------------------------------------------------------------------- | |||
|
158 | # [Un]Registration Handling | |||
|
159 | #----------------------------------------------------------------------- | |||
|
160 | ||||
|
161 | def dispatch_notification(self, msg): | |||
|
162 | """dispatch register/unregister events.""" | |||
|
163 | idents,msg = self.session.feed_identities(msg) | |||
|
164 | msg = self.session.unpack_message(msg) | |||
|
165 | msg_type = msg['msg_type'] | |||
|
166 | handler = self._notification_handlers.get(msg_type, None) | |||
|
167 | if handler is None: | |||
|
168 | raise Exception("Unhandled message type: %s"%msg_type) | |||
|
169 | else: | |||
|
170 | try: | |||
|
171 | handler(str(msg['content']['queue'])) | |||
|
172 | except KeyError: | |||
|
173 | logger.error("task::Invalid notification msg: %s"%msg) | |||
|
174 | @logged | |||
|
175 | def _register_engine(self, uid): | |||
|
176 | """new engine became available""" | |||
|
177 | # head of the line: | |||
|
178 | self.targets.insert(0,uid) | |||
|
179 | self.loads.insert(0,0) | |||
|
180 | # initialize sets | |||
|
181 | self.completed[uid] = set() | |||
|
182 | self.pending[uid] = {} | |||
|
183 | if len(self.targets) == 1: | |||
|
184 | self.resume_receiving() | |||
|
185 | ||||
|
186 | def _unregister_engine(self, uid): | |||
|
187 | """existing engine became unavailable""" | |||
|
188 | # handle any potentially finished tasks: | |||
|
189 | if len(self.targets) == 1: | |||
|
190 | self.stop_receiving() | |||
|
191 | self.engine_stream.flush() | |||
|
192 | ||||
|
193 | self.completed.pop(uid) | |||
|
194 | lost = self.pending.pop(uid) | |||
|
195 | ||||
|
196 | idx = self.targets.index(uid) | |||
|
197 | self.targets.pop(idx) | |||
|
198 | self.loads.pop(idx) | |||
|
199 | ||||
|
200 | self.handle_stranded_tasks(lost) | |||
|
201 | ||||
|
202 | def handle_stranded_tasks(self, lost): | |||
|
203 | """deal with jobs resident in an engine that died.""" | |||
|
204 | # TODO: resubmit the tasks? | |||
|
205 | for msg_id in lost: | |||
|
206 | pass | |||
|
207 | ||||
|
208 | ||||
|
209 | #----------------------------------------------------------------------- | |||
|
210 | # Job Submission | |||
|
211 | #----------------------------------------------------------------------- | |||
|
212 | @logged | |||
|
213 | def dispatch_submission(self, raw_msg): | |||
|
214 | """dispatch job submission""" | |||
|
215 | # ensure targets up to date: | |||
|
216 | self.notifier_stream.flush() | |||
|
217 | try: | |||
|
218 | idents, msg = self.session.feed_identities(raw_msg, copy=False) | |||
|
219 | except Exception, e: | |||
|
220 | logger.error("task::Invaid msg: %s"%msg) | |||
|
221 | return | |||
|
222 | ||||
|
223 | msg = self.session.unpack_message(msg, content=False, copy=False) | |||
|
224 | print idents,msg | |||
|
225 | header = msg['header'] | |||
|
226 | msg_id = header['msg_id'] | |||
|
227 | after = Dependency(header.get('after', [])) | |||
|
228 | if after.mode == 'all': | |||
|
229 | after.difference_update(self.all_done) | |||
|
230 | if after.check(self.all_done): | |||
|
231 | # recast as empty set, if we are already met, | |||
|
232 | # to prevent | |||
|
233 | after = Dependency([]) | |||
|
234 | ||||
|
235 | follow = Dependency(header.get('follow', [])) | |||
|
236 | print raw_msg | |||
|
237 | if len(after) == 0: | |||
|
238 | # time deps already met, try to run | |||
|
239 | if not self.maybe_run(msg_id, raw_msg, follow): | |||
|
240 | # can't run yet | |||
|
241 | self.save_unmet(msg_id, raw_msg, after, follow) | |||
|
242 | else: | |||
|
243 | self.save_unmet(msg_id, raw_msg, after, follow) | |||
|
244 | # send to monitor | |||
|
245 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) | |||
|
246 | @logged | |||
|
247 | def maybe_run(self, msg_id, raw_msg, follow=None): | |||
|
248 | """check location dependencies, and run if they are met.""" | |||
|
249 | ||||
|
250 | if follow: | |||
|
251 | def can_run(idx): | |||
|
252 | target = self.targets[idx] | |||
|
253 | return target not in self.blacklist.get(msg_id, []) and\ | |||
|
254 | follow.check(self.completed[target]) | |||
|
255 | ||||
|
256 | indices = filter(can_run, range(len(self.targets))) | |||
|
257 | if not indices: | |||
|
258 | return False | |||
|
259 | else: | |||
|
260 | indices = None | |||
|
261 | ||||
|
262 | self.submit_task(msg_id, raw_msg, indices) | |||
|
263 | return True | |||
|
264 | ||||
|
265 | @logged | |||
|
266 | def save_unmet(self, msg_id, msg, after, follow): | |||
|
267 | """Save a message for later submission when its dependencies are met.""" | |||
|
268 | self.depending[msg_id] = (msg_id,msg,after,follow) | |||
|
269 | # track the ids in both follow/after, but not those already completed | |||
|
270 | for dep_id in after.union(follow).difference(self.all_done): | |||
|
271 | if dep_id not in self.dependencies: | |||
|
272 | self.dependencies[dep_id] = set() | |||
|
273 | self.dependencies[dep_id].add(msg_id) | |||
|
274 | @logged | |||
|
275 | def submit_task(self, msg_id, msg, follow=None, indices=None): | |||
|
276 | """submit a task to any of a subset of our targets""" | |||
|
277 | if indices: | |||
|
278 | loads = [self.loads[i] for i in indices] | |||
|
279 | else: | |||
|
280 | loads = self.loads | |||
|
281 | idx = self.scheme(loads) | |||
|
282 | if indices: | |||
|
283 | idx = indices[idx] | |||
|
284 | target = self.targets[idx] | |||
|
285 | print target, map(str, msg[:3]) | |||
|
286 | self.engine_stream.socket.send(target, flags=zmq.SNDMORE, copy=False) | |||
|
287 | self.engine_stream.socket.send_multipart(msg, copy=False) | |||
|
288 | self.add_job(idx) | |||
|
289 | self.pending[target][msg_id] = (msg, follow) | |||
|
290 | ||||
|
291 | #----------------------------------------------------------------------- | |||
|
292 | # Result Handling | |||
|
293 | #----------------------------------------------------------------------- | |||
|
294 | @logged | |||
|
295 | def dispatch_result(self, raw_msg): | |||
|
296 | try: | |||
|
297 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |||
|
298 | except Exception, e: | |||
|
299 | logger.error("task::Invaid result: %s"%msg) | |||
|
300 | return | |||
|
301 | msg = self.session.unpack_message(msg, content=False, copy=False) | |||
|
302 | header = msg['header'] | |||
|
303 | if header.get('dependencies_met', True): | |||
|
304 | self.handle_result_success(idents, msg['parent_header'], raw_msg) | |||
|
305 | # send to monitor | |||
|
306 | self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) | |||
|
307 | else: | |||
|
308 | self.handle_unmet_dependency(self, idents, msg['parent_header']) | |||
|
309 | ||||
|
310 | @logged | |||
|
311 | def handle_result_success(self, idents, parent, raw_msg): | |||
|
312 | # first, relay result to client | |||
|
313 | engine = idents[0] | |||
|
314 | client = idents[1] | |||
|
315 | # swap_ids for XREP-XREP mirror | |||
|
316 | raw_msg[:2] = [client,engine] | |||
|
317 | print map(str, raw_msg[:4]) | |||
|
318 | self.client_stream.send_multipart(raw_msg, copy=False) | |||
|
319 | # now, update our data structures | |||
|
320 | msg_id = parent['msg_id'] | |||
|
321 | self.pending[engine].pop(msg_id) | |||
|
322 | self.completed[engine].add(msg_id) | |||
|
323 | ||||
|
324 | self.update_dependencies(msg_id) | |||
|
325 | ||||
|
326 | @logged | |||
|
327 | def handle_unmet_dependency(self, idents, parent): | |||
|
328 | engine = idents[0] | |||
|
329 | msg_id = parent['msg_id'] | |||
|
330 | if msg_id not in self.blacklist: | |||
|
331 | self.blacklist[msg_id] = set() | |||
|
332 | self.blacklist[msg_id].add(engine) | |||
|
333 | raw_msg,follow = self.pending[engine].pop(msg_id) | |||
|
334 | if not self.maybe_run(raw_msg, follow): | |||
|
335 | # resubmit failed, put it back in our dependency tree | |||
|
336 | self.save_unmet(msg_id, raw_msg, Dependency(), follow) | |||
|
337 | pass | |||
|
338 | @logged | |||
|
339 | def update_dependencies(self, dep_id): | |||
|
340 | """dep_id just finished. Update our dependency | |||
|
341 | table and submit any jobs that just became runable.""" | |||
|
342 | if dep_id not in self.dependencies: | |||
|
343 | return | |||
|
344 | jobs = self.dependencies.pop(dep_id) | |||
|
345 | for job in jobs: | |||
|
346 | msg_id, raw_msg, after, follow = self.depending[job] | |||
|
347 | if msg_id in after: | |||
|
348 | after.remove(msg_id) | |||
|
349 | if not after: # time deps met | |||
|
350 | if self.maybe_run(msg_id, raw_msg, follow): | |||
|
351 | self.depending.pop(job) | |||
|
352 | for mid in follow: | |||
|
353 | self.dependencies[mid].remove(msg_id) | |||
|
354 | ||||
|
355 | #---------------------------------------------------------------------- | |||
|
356 | # methods to be overridden by subclasses | |||
|
357 | #---------------------------------------------------------------------- | |||
|
358 | ||||
|
359 | def add_job(self, idx): | |||
|
360 | """Called after self.targets[idx] just got the job with header. | |||
|
361 | Override with subclasses. The default ordering is simple LRU. | |||
|
362 | The default loads are the number of outstanding jobs.""" | |||
|
363 | self.loads[idx] += 1 | |||
|
364 | for lis in (self.targets, self.loads): | |||
|
365 | lis.append(lis.pop(idx)) | |||
|
366 | ||||
|
367 | ||||
|
368 | def finish_job(self, idx): | |||
|
369 | """Called after self.targets[idx] just finished a job. | |||
|
370 | Override with subclasses.""" | |||
|
371 | self.loads[idx] -= 1 | |||
|
372 | ||||
|
373 | ||||
|
374 | ||||
|
375 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'): | |||
|
376 | from zmq.eventloop import ioloop | |||
|
377 | from zmq.eventloop.zmqstream import ZMQStream | |||
|
378 | ||||
|
379 | ctx = zmq.Context() | |||
|
380 | loop = ioloop.IOLoop() | |||
|
381 | ||||
|
382 | scheme = globals().get(scheme) | |||
|
383 | ||||
|
384 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) | |||
|
385 | ins.bind(in_addr) | |||
|
386 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) | |||
|
387 | outs.bind(out_addr) | |||
|
388 | mons = ZMQStream(ctx.socket(zmq.PUB),loop) | |||
|
389 | mons.connect(mon_addr) | |||
|
390 | nots = ZMQStream(ctx.socket(zmq.SUB),loop) | |||
|
391 | nots.setsockopt(zmq.SUBSCRIBE, '') | |||
|
392 | nots.connect(not_addr) | |||
|
393 | ||||
|
394 | scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop) | |||
|
395 | ||||
|
396 | loop.start() | |||
|
397 | ||||
|
398 | ||||
|
399 | if __name__ == '__main__': | |||
|
400 | iface = 'tcp://127.0.0.1:%i' | |||
|
401 | launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348) |
@@ -2,19 +2,17 | |||||
2 | """A semi-synchronous Client for the ZMQ controller""" |
|
2 | """A semi-synchronous Client for the ZMQ controller""" | |
3 |
|
3 | |||
4 | import time |
|
4 | import time | |
5 | import threading |
|
|||
6 |
|
5 | |||
7 | from pprint import pprint |
|
6 | from pprint import pprint | |
8 |
|
7 | |||
9 | from functools import wraps |
|
|||
10 |
|
||||
11 | from IPython.external.decorator import decorator |
|
8 | from IPython.external.decorator import decorator | |
12 |
|
9 | |||
13 | import streamsession as ss |
|
10 | import streamsession as ss | |
14 | import zmq |
|
11 | import zmq | |
15 |
|
12 | from zmq.eventloop import ioloop, zmqstream | ||
16 | from remotenamespace import RemoteNamespace |
|
13 | from remotenamespace import RemoteNamespace | |
17 | from view import DirectView |
|
14 | from view import DirectView | |
|
15 | from dependency import Dependency, depend, require | |||
18 |
|
16 | |||
19 | def _push(ns): |
|
17 | def _push(ns): | |
20 | globals().update(ns) |
|
18 | globals().update(ns) | |
@@ -147,13 +145,13 class Client(object): | |||||
147 | self.history = [] |
|
145 | self.history = [] | |
148 | self.debug = debug |
|
146 | self.debug = debug | |
149 | self.session.debug = debug |
|
147 | self.session.debug = debug | |
150 | self._connect() |
|
|||
151 |
|
148 | |||
152 | self._notification_handlers = {'registration_notification' : self._register_engine, |
|
149 | self._notification_handlers = {'registration_notification' : self._register_engine, | |
153 | 'unregistration_notification' : self._unregister_engine, |
|
150 | 'unregistration_notification' : self._unregister_engine, | |
154 | } |
|
151 | } | |
155 | self._queue_handlers = {'execute_reply' : self._handle_execute_reply, |
|
152 | self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | |
156 | 'apply_reply' : self._handle_apply_reply} |
|
153 | 'apply_reply' : self._handle_apply_reply} | |
|
154 | self._connect() | |||
157 |
|
155 | |||
158 |
|
156 | |||
159 | @property |
|
157 | @property | |
@@ -453,7 +451,8 class Client(object): | |||||
453 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) |
|
451 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) | |
454 | return result |
|
452 | return result | |
455 |
|
453 | |||
456 |
def _apply_balanced(self, f, args, kwargs, bound=True, block=None |
|
454 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, | |
|
455 | after=None, follow=None): | |||
457 | """the underlying method for applying functions in a load balanced |
|
456 | """the underlying method for applying functions in a load balanced | |
458 | manner.""" |
|
457 | manner.""" | |
459 | block = block if block is not None else self.block |
|
458 | block = block if block is not None else self.block | |
@@ -471,17 +470,29 class Client(object): | |||||
471 | else: |
|
470 | else: | |
472 | return msg_id |
|
471 | return msg_id | |
473 |
|
472 | |||
474 |
def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None |
|
473 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None, | |
|
474 | after=None, follow=None): | |||
475 | """Then underlying method for applying functions to specific engines.""" |
|
475 | """Then underlying method for applying functions to specific engines.""" | |
|
476 | ||||
476 | block = block if block is not None else self.block |
|
477 | block = block if block is not None else self.block | |
|
478 | ||||
477 | queues,targets = self._build_targets(targets) |
|
479 | queues,targets = self._build_targets(targets) | |
478 | print queues |
|
480 | print queues | |
479 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
481 | bufs = ss.pack_apply_message(f,args,kwargs) | |
|
482 | if isinstance(after, Dependency): | |||
|
483 | after = after.as_dict() | |||
|
484 | elif after is None: | |||
|
485 | after = [] | |||
|
486 | if isinstance(follow, Dependency): | |||
|
487 | follow = follow.as_dict() | |||
|
488 | elif follow is None: | |||
|
489 | follow = [] | |||
|
490 | subheader = dict(after=after, follow=follow) | |||
480 | content = dict(bound=bound) |
|
491 | content = dict(bound=bound) | |
481 | msg_ids = [] |
|
492 | msg_ids = [] | |
482 | for queue in queues: |
|
493 | for queue in queues: | |
483 | msg = self.session.send(self.queue_socket, "apply_request", |
|
494 | msg = self.session.send(self.queue_socket, "apply_request", | |
484 | content=content, buffers=bufs,ident=queue) |
|
495 | content=content, buffers=bufs,ident=queue, subheader=subheader) | |
485 | msg_id = msg['msg_id'] |
|
496 | msg_id = msg['msg_id'] | |
486 | self.outstanding.add(msg_id) |
|
497 | self.outstanding.add(msg_id) | |
487 | self.history.append(msg_id) |
|
498 | self.history.append(msg_id) | |
@@ -501,7 +512,8 class Client(object): | |||||
501 | result[target] = self.results[mid] |
|
512 | result[target] = self.results[mid] | |
502 | return result |
|
513 | return result | |
503 |
|
514 | |||
504 |
def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None |
|
515 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, | |
|
516 | after=None, follow=None): | |||
505 | """calls f(*args, **kwargs) on a remote engine(s), returning the result. |
|
517 | """calls f(*args, **kwargs) on a remote engine(s), returning the result. | |
506 |
|
518 | |||
507 | if self.block is False: |
|
519 | if self.block is False: | |
@@ -509,17 +521,22 class Client(object): | |||||
509 | else: |
|
521 | else: | |
510 | returns actual result of f(*args, **kwargs) |
|
522 | returns actual result of f(*args, **kwargs) | |
511 | """ |
|
523 | """ | |
|
524 | # enforce types of f,args,kwrags | |||
512 | args = args if args is not None else [] |
|
525 | args = args if args is not None else [] | |
513 | kwargs = kwargs if kwargs is not None else {} |
|
526 | kwargs = kwargs if kwargs is not None else {} | |
|
527 | if not callable(f): | |||
|
528 | raise TypeError("f must be callable, not %s"%type(f)) | |||
514 | if not isinstance(args, (tuple, list)): |
|
529 | if not isinstance(args, (tuple, list)): | |
515 | raise TypeError("args must be tuple or list, not %s"%type(args)) |
|
530 | raise TypeError("args must be tuple or list, not %s"%type(args)) | |
516 | if not isinstance(kwargs, dict): |
|
531 | if not isinstance(kwargs, dict): | |
517 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
532 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |
|
533 | ||||
|
534 | options = dict(bound=bound, block=block, after=after, follow=follow) | |||
|
535 | ||||
518 | if targets is None: |
|
536 | if targets is None: | |
519 |
return self._apply_balanced(f,args,kwargs, |
|
537 | return self._apply_balanced(f, args, kwargs, **options) | |
520 | else: |
|
538 | else: | |
521 | return self._apply_direct(f, args, kwargs, |
|
539 | return self._apply_direct(f, args, kwargs, targets=targets, **options) | |
522 | bound=bound,block=block, targets=targets) |
|
|||
523 |
|
540 | |||
524 | def push(self, ns, targets=None, block=None): |
|
541 | def push(self, ns, targets=None, block=None): | |
525 | """push the contents of `ns` into the namespace on `target`""" |
|
542 | """push the contents of `ns` into the namespace on `target`""" | |
@@ -598,5 +615,26 class Client(object): | |||||
598 | # else: |
|
615 | # else: | |
599 | # break |
|
616 | # break | |
600 | return msg['content'] |
|
617 | return msg['content'] | |
|
618 | ||||
|
619 | class AsynClient(Client): | |||
|
620 | """An Asynchronous client, using the Tornado Event Loop""" | |||
|
621 | io_loop = None | |||
|
622 | queue_stream = None | |||
|
623 | notifier_stream = None | |||
|
624 | ||||
|
625 | def __init__(self, addr, context=None, username=None, debug=False, io_loop=None): | |||
|
626 | Client.__init__(self, addr, context, username, debug) | |||
|
627 | if io_loop is None: | |||
|
628 | io_loop = ioloop.IOLoop.instance() | |||
|
629 | self.io_loop = io_loop | |||
|
630 | ||||
|
631 | self.queue_stream = zmqstream.ZMQStream(self.queue_socket, io_loop) | |||
|
632 | self.control_stream = zmqstream.ZMQStream(self.control_socket, io_loop) | |||
|
633 | self.task_stream = zmqstream.ZMQStream(self.task_socket, io_loop) | |||
|
634 | self.notification_stream = zmqstream.ZMQStream(self.notification_socket, io_loop) | |||
601 |
|
635 | |||
|
636 | def spin(self): | |||
|
637 | for stream in (self.queue_stream, self.notifier_stream, | |||
|
638 | self.task_stream, self.control_stream): | |||
|
639 | stream.flush() | |||
602 | No newline at end of file |
|
640 |
@@ -50,6 +50,55 class dependent(object): | |||||
50 | raise UnmetDependency() |
|
50 | raise UnmetDependency() | |
51 | return self.f(*args, **kwargs) |
|
51 | return self.f(*args, **kwargs) | |
52 |
|
52 | |||
|
53 | def _require(*names): | |||
|
54 | for name in names: | |||
|
55 | try: | |||
|
56 | __import__(name) | |||
|
57 | except ImportError: | |||
|
58 | return False | |||
|
59 | return True | |||
53 |
|
60 | |||
54 | __all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies'] |
|
61 | def require(*names): | |
|
62 | return depend(_require, *names) | |||
|
63 | ||||
|
64 | class Dependency(set): | |||
|
65 | """An object for representing a set of dependencies. | |||
|
66 | ||||
|
67 | Subclassed from set().""" | |||
|
68 | ||||
|
69 | mode='all' | |||
|
70 | ||||
|
71 | def __init__(self, dependencies=[], mode='all'): | |||
|
72 | if isinstance(dependencies, dict): | |||
|
73 | # load from dict | |||
|
74 | dependencies = dependencies.get('dependencies', []) | |||
|
75 | mode = dependencies.get('mode', mode) | |||
|
76 | set.__init__(self, dependencies) | |||
|
77 | self.mode = mode.lower() | |||
|
78 | if self.mode not in ('any', 'all'): | |||
|
79 | raise NotImplementedError("Only any|all supported, not %r"%mode) | |||
|
80 | ||||
|
81 | def check(self, completed): | |||
|
82 | if len(self) == 0: | |||
|
83 | return True | |||
|
84 | if self.mode == 'all': | |||
|
85 | for dep in self: | |||
|
86 | if dep not in completed: | |||
|
87 | return False | |||
|
88 | return True | |||
|
89 | elif self.mode == 'any': | |||
|
90 | for com in completed: | |||
|
91 | if com in self.dependencies: | |||
|
92 | return True | |||
|
93 | return False | |||
|
94 | ||||
|
95 | def as_dict(self): | |||
|
96 | """Represent this dependency as a dict. For json compatibility.""" | |||
|
97 | return dict( | |||
|
98 | dependencies=list(self), | |||
|
99 | mode=self.mode | |||
|
100 | ) | |||
|
101 | ||||
|
102 | ||||
|
103 | __all__ = ['UnmetDependency', 'depend', 'require', 'Dependency'] | |||
55 |
|
104 |
General Comments 0
You need to be logged in to leave comments.
Login now