##// END OF EJS Templates
quiet down scheduler printing, fix dep_id check in update_dependencies
MinRK -
Show More
@@ -1,422 +1,423 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7
7
8 #----------------------------------------------------------------------
8 #----------------------------------------------------------------------
9 # Imports
9 # Imports
10 #----------------------------------------------------------------------
10 #----------------------------------------------------------------------
11
11
12 from __future__ import print_function
12 from __future__ import print_function
13 from random import randint,random
13 from random import randint,random
14
14
15 try:
15 try:
16 import numpy
16 import numpy
17 except ImportError:
17 except ImportError:
18 numpy = None
18 numpy = None
19
19
20 import zmq
20 import zmq
21 from zmq.eventloop import ioloop, zmqstream
21 from zmq.eventloop import ioloop, zmqstream
22
22
23 # local imports
23 # local imports
24 from IPython.zmq.log import logger # a Logger object
24 from IPython.zmq.log import logger # a Logger object
25 from client import Client
25 from client import Client
26 from dependency import Dependency
26 from dependency import Dependency
27 import streamsession as ss
27 import streamsession as ss
28
28
29 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
30
30
31 @decorator
31 @decorator
32 def logged(f,self,*args,**kwargs):
32 def logged(f,self,*args,**kwargs):
33 print ("#--------------------")
33 # print ("#--------------------")
34 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 print ("#--")
35 # print ("#--")
36 return f(self,*args, **kwargs)
36 return f(self,*args, **kwargs)
37
37
38 #----------------------------------------------------------------------
38 #----------------------------------------------------------------------
39 # Chooser functions
39 # Chooser functions
40 #----------------------------------------------------------------------
40 #----------------------------------------------------------------------
41
41
42 def plainrandom(loads):
42 def plainrandom(loads):
43 """Plain random pick."""
43 """Plain random pick."""
44 n = len(loads)
44 n = len(loads)
45 return randint(0,n-1)
45 return randint(0,n-1)
46
46
47 def lru(loads):
47 def lru(loads):
48 """Always pick the front of the line.
48 """Always pick the front of the line.
49
49
50 The content of `loads` is ignored.
50 The content of `loads` is ignored.
51
51
52 Assumes LRU ordering of loads, with oldest first.
52 Assumes LRU ordering of loads, with oldest first.
53 """
53 """
54 return 0
54 return 0
55
55
56 def twobin(loads):
56 def twobin(loads):
57 """Pick two at random, use the LRU of the two.
57 """Pick two at random, use the LRU of the two.
58
58
59 The content of loads is ignored.
59 The content of loads is ignored.
60
60
61 Assumes LRU ordering of loads, with oldest first.
61 Assumes LRU ordering of loads, with oldest first.
62 """
62 """
63 n = len(loads)
63 n = len(loads)
64 a = randint(0,n-1)
64 a = randint(0,n-1)
65 b = randint(0,n-1)
65 b = randint(0,n-1)
66 return min(a,b)
66 return min(a,b)
67
67
68 def weighted(loads):
68 def weighted(loads):
69 """Pick two at random using inverse load as weight.
69 """Pick two at random using inverse load as weight.
70
70
71 Return the less loaded of the two.
71 Return the less loaded of the two.
72 """
72 """
73 # weight 0 a million times more than 1:
73 # weight 0 a million times more than 1:
74 weights = 1./(1e-6+numpy.array(loads))
74 weights = 1./(1e-6+numpy.array(loads))
75 sums = weights.cumsum()
75 sums = weights.cumsum()
76 t = sums[-1]
76 t = sums[-1]
77 x = random()*t
77 x = random()*t
78 y = random()*t
78 y = random()*t
79 idx = 0
79 idx = 0
80 idy = 0
80 idy = 0
81 while sums[idx] < x:
81 while sums[idx] < x:
82 idx += 1
82 idx += 1
83 while sums[idy] < y:
83 while sums[idy] < y:
84 idy += 1
84 idy += 1
85 if weights[idy] > weights[idx]:
85 if weights[idy] > weights[idx]:
86 return idy
86 return idy
87 else:
87 else:
88 return idx
88 return idx
89
89
90 def leastload(loads):
90 def leastload(loads):
91 """Always choose the lowest load.
91 """Always choose the lowest load.
92
92
93 If the lowest load occurs more than once, the first
93 If the lowest load occurs more than once, the first
94 occurance will be used. If loads has LRU ordering, this means
94 occurance will be used. If loads has LRU ordering, this means
95 the LRU of those with the lowest load is chosen.
95 the LRU of those with the lowest load is chosen.
96 """
96 """
97 return loads.index(min(loads))
97 return loads.index(min(loads))
98
98
99 #---------------------------------------------------------------------
99 #---------------------------------------------------------------------
100 # Classes
100 # Classes
101 #---------------------------------------------------------------------
101 #---------------------------------------------------------------------
102 class TaskScheduler(object):
102 class TaskScheduler(object):
103 """Python TaskScheduler object.
103 """Python TaskScheduler object.
104
104
105 This is the simplest object that supports msg_id based
105 This is the simplest object that supports msg_id based
106 DAG dependencies. *Only* task msg_ids are checked, not
106 DAG dependencies. *Only* task msg_ids are checked, not
107 msg_ids of jobs submitted via the MUX queue.
107 msg_ids of jobs submitted via the MUX queue.
108
108
109 """
109 """
110
110
111 scheme = leastload # function for determining the destination
111 scheme = leastload # function for determining the destination
112 client_stream = None # client-facing stream
112 client_stream = None # client-facing stream
113 engine_stream = None # engine-facing stream
113 engine_stream = None # engine-facing stream
114 mon_stream = None # controller-facing stream
114 mon_stream = None # controller-facing stream
115 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
115 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
116 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
116 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
117 pending = None # dict by engine_uuid of submitted tasks
117 pending = None # dict by engine_uuid of submitted tasks
118 completed = None # dict by engine_uuid of completed tasks
118 completed = None # dict by engine_uuid of completed tasks
119 clients = None # dict by msg_id for who submitted the task
119 clients = None # dict by msg_id for who submitted the task
120 targets = None # list of target IDENTs
120 targets = None # list of target IDENTs
121 loads = None # list of engine loads
121 loads = None # list of engine loads
122 all_done = None # set of all completed tasks
122 all_done = None # set of all completed tasks
123 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
123 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
124
124
125
125
126 def __init__(self, client_stream, engine_stream, mon_stream,
126 def __init__(self, client_stream, engine_stream, mon_stream,
127 notifier_stream, scheme=None, io_loop=None):
127 notifier_stream, scheme=None, io_loop=None):
128 if io_loop is None:
128 if io_loop is None:
129 io_loop = ioloop.IOLoop.instance()
129 io_loop = ioloop.IOLoop.instance()
130 self.io_loop = io_loop
130 self.io_loop = io_loop
131 self.client_stream = client_stream
131 self.client_stream = client_stream
132 self.engine_stream = engine_stream
132 self.engine_stream = engine_stream
133 self.mon_stream = mon_stream
133 self.mon_stream = mon_stream
134 self.notifier_stream = notifier_stream
134 self.notifier_stream = notifier_stream
135
135
136 if scheme is not None:
136 if scheme is not None:
137 self.scheme = scheme
137 self.scheme = scheme
138 else:
138 else:
139 self.scheme = TaskScheduler.scheme
139 self.scheme = TaskScheduler.scheme
140
140
141 self.session = ss.StreamSession(username="TaskScheduler")
141 self.session = ss.StreamSession(username="TaskScheduler")
142
142
143 self.dependencies = {}
143 self.dependencies = {}
144 self.depending = {}
144 self.depending = {}
145 self.completed = {}
145 self.completed = {}
146 self.pending = {}
146 self.pending = {}
147 self.all_done = set()
147 self.all_done = set()
148 self.blacklist = {}
148 self.blacklist = {}
149
149
150 self.targets = []
150 self.targets = []
151 self.loads = []
151 self.loads = []
152
152
153 engine_stream.on_recv(self.dispatch_result, copy=False)
153 engine_stream.on_recv(self.dispatch_result, copy=False)
154 self._notification_handlers = dict(
154 self._notification_handlers = dict(
155 registration_notification = self._register_engine,
155 registration_notification = self._register_engine,
156 unregistration_notification = self._unregister_engine
156 unregistration_notification = self._unregister_engine
157 )
157 )
158 self.notifier_stream.on_recv(self.dispatch_notification)
158 self.notifier_stream.on_recv(self.dispatch_notification)
159
159
160 def resume_receiving(self):
160 def resume_receiving(self):
161 """Resume accepting jobs."""
161 """Resume accepting jobs."""
162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
163
163
164 def stop_receiving(self):
164 def stop_receiving(self):
165 """Stop accepting jobs while there are no engines.
165 """Stop accepting jobs while there are no engines.
166 Leave them in the ZMQ queue."""
166 Leave them in the ZMQ queue."""
167 self.client_stream.on_recv(None)
167 self.client_stream.on_recv(None)
168
168
169 #-----------------------------------------------------------------------
169 #-----------------------------------------------------------------------
170 # [Un]Registration Handling
170 # [Un]Registration Handling
171 #-----------------------------------------------------------------------
171 #-----------------------------------------------------------------------
172
172
173 def dispatch_notification(self, msg):
173 def dispatch_notification(self, msg):
174 """dispatch register/unregister events."""
174 """dispatch register/unregister events."""
175 idents,msg = self.session.feed_identities(msg)
175 idents,msg = self.session.feed_identities(msg)
176 msg = self.session.unpack_message(msg)
176 msg = self.session.unpack_message(msg)
177 msg_type = msg['msg_type']
177 msg_type = msg['msg_type']
178 handler = self._notification_handlers.get(msg_type, None)
178 handler = self._notification_handlers.get(msg_type, None)
179 if handler is None:
179 if handler is None:
180 raise Exception("Unhandled message type: %s"%msg_type)
180 raise Exception("Unhandled message type: %s"%msg_type)
181 else:
181 else:
182 try:
182 try:
183 handler(str(msg['content']['queue']))
183 handler(str(msg['content']['queue']))
184 except KeyError:
184 except KeyError:
185 logger.error("task::Invalid notification msg: %s"%msg)
185 logger.error("task::Invalid notification msg: %s"%msg)
186 @logged
186 @logged
187 def _register_engine(self, uid):
187 def _register_engine(self, uid):
188 """New engine with ident `uid` became available."""
188 """New engine with ident `uid` became available."""
189 # head of the line:
189 # head of the line:
190 self.targets.insert(0,uid)
190 self.targets.insert(0,uid)
191 self.loads.insert(0,0)
191 self.loads.insert(0,0)
192 # initialize sets
192 # initialize sets
193 self.completed[uid] = set()
193 self.completed[uid] = set()
194 self.pending[uid] = {}
194 self.pending[uid] = {}
195 if len(self.targets) == 1:
195 if len(self.targets) == 1:
196 self.resume_receiving()
196 self.resume_receiving()
197
197
198 def _unregister_engine(self, uid):
198 def _unregister_engine(self, uid):
199 """Existing engine with ident `uid` became unavailable."""
199 """Existing engine with ident `uid` became unavailable."""
200 if len(self.targets) == 1:
200 if len(self.targets) == 1:
201 # this was our only engine
201 # this was our only engine
202 self.stop_receiving()
202 self.stop_receiving()
203
203
204 # handle any potentially finished tasks:
204 # handle any potentially finished tasks:
205 self.engine_stream.flush()
205 self.engine_stream.flush()
206
206
207 self.completed.pop(uid)
207 self.completed.pop(uid)
208 lost = self.pending.pop(uid)
208 lost = self.pending.pop(uid)
209
209
210 idx = self.targets.index(uid)
210 idx = self.targets.index(uid)
211 self.targets.pop(idx)
211 self.targets.pop(idx)
212 self.loads.pop(idx)
212 self.loads.pop(idx)
213
213
214 self.handle_stranded_tasks(lost)
214 self.handle_stranded_tasks(lost)
215
215
216 def handle_stranded_tasks(self, lost):
216 def handle_stranded_tasks(self, lost):
217 """Deal with jobs resident in an engine that died."""
217 """Deal with jobs resident in an engine that died."""
218 # TODO: resubmit the tasks?
218 # TODO: resubmit the tasks?
219 for msg_id in lost:
219 for msg_id in lost:
220 pass
220 pass
221
221
222
222
223 #-----------------------------------------------------------------------
223 #-----------------------------------------------------------------------
224 # Job Submission
224 # Job Submission
225 #-----------------------------------------------------------------------
225 #-----------------------------------------------------------------------
226 @logged
226 @logged
227 def dispatch_submission(self, raw_msg):
227 def dispatch_submission(self, raw_msg):
228 """Dispatch job submission to appropriate handlers."""
228 """Dispatch job submission to appropriate handlers."""
229 # ensure targets up to date:
229 # ensure targets up to date:
230 self.notifier_stream.flush()
230 self.notifier_stream.flush()
231 try:
231 try:
232 idents, msg = self.session.feed_identities(raw_msg, copy=False)
232 idents, msg = self.session.feed_identities(raw_msg, copy=False)
233 except Exception as e:
233 except Exception as e:
234 logger.error("task::Invaid msg: %s"%msg)
234 logger.error("task::Invaid msg: %s"%msg)
235 return
235 return
236
236
237 # send to monitor
238 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
239
237 msg = self.session.unpack_message(msg, content=False, copy=False)
240 msg = self.session.unpack_message(msg, content=False, copy=False)
238 header = msg['header']
241 header = msg['header']
239 msg_id = header['msg_id']
242 msg_id = header['msg_id']
240
243
241 # time dependencies
244 # time dependencies
242 after = Dependency(header.get('after', []))
245 after = Dependency(header.get('after', []))
243 if after.mode == 'all':
246 if after.mode == 'all':
244 after.difference_update(self.all_done)
247 after.difference_update(self.all_done)
245 if after.check(self.all_done):
248 if after.check(self.all_done):
246 # recast as empty set, if `after` already met,
249 # recast as empty set, if `after` already met,
247 # to prevent unnecessary set comparisons
250 # to prevent unnecessary set comparisons
248 after = Dependency([])
251 after = Dependency([])
249
252
250 # location dependencies
253 # location dependencies
251 follow = Dependency(header.get('follow', []))
254 follow = Dependency(header.get('follow', []))
252 if len(after) == 0:
255 if len(after) == 0:
253 # time deps already met, try to run
256 # time deps already met, try to run
254 if not self.maybe_run(msg_id, raw_msg, follow):
257 if not self.maybe_run(msg_id, raw_msg, follow):
255 # can't run yet
258 # can't run yet
256 self.save_unmet(msg_id, raw_msg, after, follow)
259 self.save_unmet(msg_id, raw_msg, after, follow)
257 else:
260 else:
258 self.save_unmet(msg_id, raw_msg, after, follow)
261 self.save_unmet(msg_id, raw_msg, after, follow)
259 # send to monitor
260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261
262
262 @logged
263 @logged
263 def maybe_run(self, msg_id, raw_msg, follow=None):
264 def maybe_run(self, msg_id, raw_msg, follow=None):
264 """check location dependencies, and run if they are met."""
265 """check location dependencies, and run if they are met."""
265
266
266 if follow:
267 if follow:
267 def can_run(idx):
268 def can_run(idx):
268 target = self.targets[idx]
269 target = self.targets[idx]
269 return target not in self.blacklist.get(msg_id, []) and\
270 return target not in self.blacklist.get(msg_id, []) and\
270 follow.check(self.completed[target])
271 follow.check(self.completed[target])
271
272
272 indices = filter(can_run, range(len(self.targets)))
273 indices = filter(can_run, range(len(self.targets)))
273 if not indices:
274 if not indices:
274 return False
275 return False
275 else:
276 else:
276 indices = None
277 indices = None
277
278
278 self.submit_task(msg_id, raw_msg, indices)
279 self.submit_task(msg_id, raw_msg, indices)
279 return True
280 return True
280
281
281 @logged
282 @logged
282 def save_unmet(self, msg_id, msg, after, follow):
283 def save_unmet(self, msg_id, msg, after, follow):
283 """Save a message for later submission when its dependencies are met."""
284 """Save a message for later submission when its dependencies are met."""
284 self.depending[msg_id] = (msg_id,msg,after,follow)
285 self.depending[msg_id] = (msg_id,msg,after,follow)
285 # track the ids in both follow/after, but not those already completed
286 # track the ids in both follow/after, but not those already completed
286 for dep_id in after.union(follow).difference(self.all_done):
287 for dep_id in after.union(follow).difference(self.all_done):
287 print (dep_id)
288 if dep_id not in self.dependencies:
288 if dep_id not in self.dependencies:
289 self.dependencies[dep_id] = set()
289 self.dependencies[dep_id] = set()
290 self.dependencies[dep_id].add(msg_id)
290 self.dependencies[dep_id].add(msg_id)
291
291
292 @logged
292 @logged
293 def submit_task(self, msg_id, msg, follow=None, indices=None):
293 def submit_task(self, msg_id, msg, follow=None, indices=None):
294 """Submit a task to any of a subset of our targets."""
294 """Submit a task to any of a subset of our targets."""
295 if indices:
295 if indices:
296 loads = [self.loads[i] for i in indices]
296 loads = [self.loads[i] for i in indices]
297 else:
297 else:
298 loads = self.loads
298 loads = self.loads
299 idx = self.scheme(loads)
299 idx = self.scheme(loads)
300 if indices:
300 if indices:
301 idx = indices[idx]
301 idx = indices[idx]
302 target = self.targets[idx]
302 target = self.targets[idx]
303 print (target, map(str, msg[:3]))
303 # print (target, map(str, msg[:3]))
304 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
304 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
305 self.engine_stream.send_multipart(msg, copy=False)
305 self.engine_stream.send_multipart(msg, copy=False)
306 self.add_job(idx)
306 self.add_job(idx)
307 self.pending[target][msg_id] = (msg, follow)
307 self.pending[target][msg_id] = (msg, follow)
308 content = dict(msg_id=msg_id, engine_id=target)
308 content = dict(msg_id=msg_id, engine_id=target)
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
310
310
311 #-----------------------------------------------------------------------
311 #-----------------------------------------------------------------------
312 # Result Handling
312 # Result Handling
313 #-----------------------------------------------------------------------
313 #-----------------------------------------------------------------------
314 @logged
314 @logged
315 def dispatch_result(self, raw_msg):
315 def dispatch_result(self, raw_msg):
316 try:
316 try:
317 idents,msg = self.session.feed_identities(raw_msg, copy=False)
317 idents,msg = self.session.feed_identities(raw_msg, copy=False)
318 except Exception as e:
318 except Exception as e:
319 logger.error("task::Invaid result: %s"%msg)
319 logger.error("task::Invaid result: %s"%msg)
320 return
320 return
321 msg = self.session.unpack_message(msg, content=False, copy=False)
321 msg = self.session.unpack_message(msg, content=False, copy=False)
322 header = msg['header']
322 header = msg['header']
323 if header.get('dependencies_met', True):
323 if header.get('dependencies_met', True):
324 self.handle_result_success(idents, msg['parent_header'], raw_msg)
324 self.handle_result_success(idents, msg['parent_header'], raw_msg)
325 # send to monitor
325 # send to monitor
326 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
326 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
327 else:
327 else:
328 self.handle_unmet_dependency(idents, msg['parent_header'])
328 self.handle_unmet_dependency(idents, msg['parent_header'])
329
329
330 @logged
330 @logged
331 def handle_result_success(self, idents, parent, raw_msg):
331 def handle_result_success(self, idents, parent, raw_msg):
332 # first, relay result to client
332 # first, relay result to client
333 engine = idents[0]
333 engine = idents[0]
334 client = idents[1]
334 client = idents[1]
335 # swap_ids for XREP-XREP mirror
335 # swap_ids for XREP-XREP mirror
336 raw_msg[:2] = [client,engine]
336 raw_msg[:2] = [client,engine]
337 print (map(str, raw_msg[:4]))
337 # print (map(str, raw_msg[:4]))
338 self.client_stream.send_multipart(raw_msg, copy=False)
338 self.client_stream.send_multipart(raw_msg, copy=False)
339 # now, update our data structures
339 # now, update our data structures
340 msg_id = parent['msg_id']
340 msg_id = parent['msg_id']
341 self.pending[engine].pop(msg_id)
341 self.pending[engine].pop(msg_id)
342 self.completed[engine].add(msg_id)
342 self.completed[engine].add(msg_id)
343
343
344 self.update_dependencies(msg_id)
344 self.update_dependencies(msg_id)
345
345
346 @logged
346 @logged
347 def handle_unmet_dependency(self, idents, parent):
347 def handle_unmet_dependency(self, idents, parent):
348 engine = idents[0]
348 engine = idents[0]
349 msg_id = parent['msg_id']
349 msg_id = parent['msg_id']
350 if msg_id not in self.blacklist:
350 if msg_id not in self.blacklist:
351 self.blacklist[msg_id] = set()
351 self.blacklist[msg_id] = set()
352 self.blacklist[msg_id].add(engine)
352 self.blacklist[msg_id].add(engine)
353 raw_msg,follow = self.pending[engine].pop(msg_id)
353 raw_msg,follow = self.pending[engine].pop(msg_id)
354 if not self.maybe_run(msg_id, raw_msg, follow):
354 if not self.maybe_run(msg_id, raw_msg, follow):
355 # resubmit failed, put it back in our dependency tree
355 # resubmit failed, put it back in our dependency tree
356 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
356 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
357 pass
357 pass
358 @logged
358 @logged
359 def update_dependencies(self, dep_id):
359 def update_dependencies(self, dep_id):
360 """dep_id just finished. Update our dependency
360 """dep_id just finished. Update our dependency
361 table and submit any jobs that just became runable."""
361 table and submit any jobs that just became runable."""
362
362 if dep_id not in self.dependencies:
363 if dep_id not in self.dependencies:
363 return
364 return
364 jobs = self.dependencies.pop(dep_id)
365 jobs = self.dependencies.pop(dep_id)
365 for job in jobs:
366 for job in jobs:
366 msg_id, raw_msg, after, follow = self.depending[job]
367 msg_id, raw_msg, after, follow = self.depending[job]
367 if msg_id in after:
368 if dep_id in after:
368 after.remove(msg_id)
369 after.remove(dep_id)
369 if not after: # time deps met
370 if not after: # time deps met, maybe run
370 if self.maybe_run(msg_id, raw_msg, follow):
371 if self.maybe_run(msg_id, raw_msg, follow):
371 self.depending.pop(job)
372 self.depending.pop(job)
372 for mid in follow:
373 for mid in follow:
373 if mid in self.dependencies:
374 if mid in self.dependencies:
374 self.dependencies[mid].remove(msg_id)
375 self.dependencies[mid].remove(msg_id)
375
376
376 #----------------------------------------------------------------------
377 #----------------------------------------------------------------------
377 # methods to be overridden by subclasses
378 # methods to be overridden by subclasses
378 #----------------------------------------------------------------------
379 #----------------------------------------------------------------------
379
380
380 def add_job(self, idx):
381 def add_job(self, idx):
381 """Called after self.targets[idx] just got the job with header.
382 """Called after self.targets[idx] just got the job with header.
382 Override with subclasses. The default ordering is simple LRU.
383 Override with subclasses. The default ordering is simple LRU.
383 The default loads are the number of outstanding jobs."""
384 The default loads are the number of outstanding jobs."""
384 self.loads[idx] += 1
385 self.loads[idx] += 1
385 for lis in (self.targets, self.loads):
386 for lis in (self.targets, self.loads):
386 lis.append(lis.pop(idx))
387 lis.append(lis.pop(idx))
387
388
388
389
389 def finish_job(self, idx):
390 def finish_job(self, idx):
390 """Called after self.targets[idx] just finished a job.
391 """Called after self.targets[idx] just finished a job.
391 Override with subclasses."""
392 Override with subclasses."""
392 self.loads[idx] -= 1
393 self.loads[idx] -= 1
393
394
394
395
395
396
396 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
397 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
397 from zmq.eventloop import ioloop
398 from zmq.eventloop import ioloop
398 from zmq.eventloop.zmqstream import ZMQStream
399 from zmq.eventloop.zmqstream import ZMQStream
399
400
400 ctx = zmq.Context()
401 ctx = zmq.Context()
401 loop = ioloop.IOLoop()
402 loop = ioloop.IOLoop()
402
403
403 scheme = globals().get(scheme)
404 scheme = globals().get(scheme)
404
405
405 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
406 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
406 ins.bind(in_addr)
407 ins.bind(in_addr)
407 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
408 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
408 outs.bind(out_addr)
409 outs.bind(out_addr)
409 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
410 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
410 mons.connect(mon_addr)
411 mons.connect(mon_addr)
411 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
412 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
412 nots.setsockopt(zmq.SUBSCRIBE, '')
413 nots.setsockopt(zmq.SUBSCRIBE, '')
413 nots.connect(not_addr)
414 nots.connect(not_addr)
414
415
415 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
416 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
416
417
417 loop.start()
418 loop.start()
418
419
419
420
420 if __name__ == '__main__':
421 if __name__ == '__main__':
421 iface = 'tcp://127.0.0.1:%i'
422 iface = 'tcp://127.0.0.1:%i'
422 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
423 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
General Comments 0
You need to be logged in to leave comments. Login now