##// END OF EJS Templates
scheduler progress
MinRK -
Show More
@@ -1,15 +1,24 b''
1 #!/usr/bin/env python
2 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
3
8
4 import time
9 #-----------------------------------------------------------------------------
10 # Imports
11 #-----------------------------------------------------------------------------
5
12
13 import time
6 from pprint import pprint
14 from pprint import pprint
7
15
16 import zmq
17 from zmq.eventloop import ioloop, zmqstream
18
8 from IPython.external.decorator import decorator
19 from IPython.external.decorator import decorator
9
20
10 import streamsession as ss
21 import streamsession as ss
11 import zmq
12 from zmq.eventloop import ioloop, zmqstream
13 from remotenamespace import RemoteNamespace
22 from remotenamespace import RemoteNamespace
14 from view import DirectView
23 from view import DirectView
15 from dependency import Dependency, depend, require
24 from dependency import Dependency, depend, require
@@ -357,7 +366,6 b' class Client(object):'
357 def clear(self, targets=None, block=None):
366 def clear(self, targets=None, block=None):
358 """clear the namespace in target(s)"""
367 """clear the namespace in target(s)"""
359 targets = self._build_targets(targets)[0]
368 targets = self._build_targets(targets)[0]
360 print targets
361 for t in targets:
369 for t in targets:
362 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
370 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
363 error = False
371 error = False
@@ -377,7 +385,6 b' class Client(object):'
377 def abort(self, msg_ids = None, targets=None, block=None):
385 def abort(self, msg_ids = None, targets=None, block=None):
378 """abort the Queues of target(s)"""
386 """abort the Queues of target(s)"""
379 targets = self._build_targets(targets)[0]
387 targets = self._build_targets(targets)[0]
380 print targets
381 if isinstance(msg_ids, basestring):
388 if isinstance(msg_ids, basestring):
382 msg_ids = [msg_ids]
389 msg_ids = [msg_ids]
383 content = dict(msg_ids=msg_ids)
390 content = dict(msg_ids=msg_ids)
@@ -400,7 +407,6 b' class Client(object):'
400 def kill(self, targets=None, block=None):
407 def kill(self, targets=None, block=None):
401 """Terminates one or more engine processes."""
408 """Terminates one or more engine processes."""
402 targets = self._build_targets(targets)[0]
409 targets = self._build_targets(targets)[0]
403 print targets
404 for t in targets:
410 for t in targets:
405 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
411 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
406 error = False
412 error = False
@@ -456,11 +462,20 b' class Client(object):'
456 """the underlying method for applying functions in a load balanced
462 """the underlying method for applying functions in a load balanced
457 manner."""
463 manner."""
458 block = block if block is not None else self.block
464 block = block if block is not None else self.block
465 if isinstance(after, Dependency):
466 after = after.as_dict()
467 elif after is None:
468 after = []
469 if isinstance(follow, Dependency):
470 follow = follow.as_dict()
471 elif follow is None:
472 follow = []
473 subheader = dict(after=after, follow=follow)
459
474
460 bufs = ss.pack_apply_message(f,args,kwargs)
475 bufs = ss.pack_apply_message(f,args,kwargs)
461 content = dict(bound=bound)
476 content = dict(bound=bound)
462 msg = self.session.send(self.task_socket, "apply_request",
477 msg = self.session.send(self.task_socket, "apply_request",
463 content=content, buffers=bufs)
478 content=content, buffers=bufs, subheader=subheader)
464 msg_id = msg['msg_id']
479 msg_id = msg['msg_id']
465 self.outstanding.add(msg_id)
480 self.outstanding.add(msg_id)
466 self.history.append(msg_id)
481 self.history.append(msg_id)
@@ -477,7 +492,6 b' class Client(object):'
477 block = block if block is not None else self.block
492 block = block if block is not None else self.block
478
493
479 queues,targets = self._build_targets(targets)
494 queues,targets = self._build_targets(targets)
480 print queues
481 bufs = ss.pack_apply_message(f,args,kwargs)
495 bufs = ss.pack_apply_message(f,args,kwargs)
482 if isinstance(after, Dependency):
496 if isinstance(after, Dependency):
483 after = after.as_dict()
497 after = after.as_dict()
@@ -1,11 +1,9 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
5 This is the master object that handles connections from engines, clients, and
3 This is the master object that handles connections from engines, clients, and
6 """
4 """
7 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
9 #
7 #
10 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
@@ -216,10 +214,10 b' class Controller(object):'
216 newid += 1
214 newid += 1
217 return newid
215 return newid
218
216
219
220 #-----------------------------------------------------------------------------
217 #-----------------------------------------------------------------------------
221 # message validation
218 # message validation
222 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
220
223 def _validate_targets(self, targets):
221 def _validate_targets(self, targets):
224 """turn any valid targets argument into a list of integer ids"""
222 """turn any valid targets argument into a list of integer ids"""
225 if targets is None:
223 if targets is None:
@@ -262,7 +260,7 b' class Controller(object):'
262
260
263
261
264 #-----------------------------------------------------------------------------
262 #-----------------------------------------------------------------------------
265 # dispatch methods (1 per socket)
263 # dispatch methods (1 per stream)
266 #-----------------------------------------------------------------------------
264 #-----------------------------------------------------------------------------
267
265
268 def dispatch_register_request(self, msg):
266 def dispatch_register_request(self, msg):
@@ -463,7 +461,7 b' class Controller(object):'
463 if msg_id in self.mia:
461 if msg_id in self.mia:
464 self.mia.remove(msg_id)
462 self.mia.remove(msg_id)
465 else:
463 else:
466 logger.debug("task:: unknown task %s finished"%msg_id)
464 logger.debug("task::unknown task %s finished"%msg_id)
467
465
468 def save_task_destination(self, idents, msg):
466 def save_task_destination(self, idents, msg):
469 try:
467 try:
@@ -479,7 +477,7 b' class Controller(object):'
479 if queue_id == engine_uuid:
477 if queue_id == engine_uuid:
480 break
478 break
481
479
482 logger.info("task:: task %s arrived on %s"%(msg_id, eid))
480 logger.info("task::task %s arrived on %s"%(msg_id, eid))
483 if msg_id in self.mia:
481 if msg_id in self.mia:
484 self.mia.remove(msg_id)
482 self.mia.remove(msg_id)
485 else:
483 else:
@@ -24,6 +24,7 b' from IPython.external.decorator import decorator'
24 def logged(f,self,*args,**kwargs):
24 def logged(f,self,*args,**kwargs):
25 print ("#--------------------")
25 print ("#--------------------")
26 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
26 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
27 print ("#--")
27 return f(self,*args, **kwargs)
28 return f(self,*args, **kwargs)
28
29
29 #----------------------------------------------------------------------
30 #----------------------------------------------------------------------
@@ -91,7 +92,7 b' def leastload(loads):'
91 # Classes
92 # Classes
92 #---------------------------------------------------------------------
93 #---------------------------------------------------------------------
93 class TaskScheduler(object):
94 class TaskScheduler(object):
94 """Simple Python TaskScheduler object.
95 """Python TaskScheduler object.
95
96
96 This is the simplest object that supports msg_id based
97 This is the simplest object that supports msg_id based
97 DAG dependencies. *Only* task msg_ids are checked, not
98 DAG dependencies. *Only* task msg_ids are checked, not
@@ -136,6 +137,7 b' class TaskScheduler(object):'
136 self.completed = {}
137 self.completed = {}
137 self.pending = {}
138 self.pending = {}
138 self.all_done = set()
139 self.all_done = set()
140 self.blacklist = {}
139
141
140 self.targets = []
142 self.targets = []
141 self.loads = []
143 self.loads = []
@@ -221,7 +223,6 b' class TaskScheduler(object):'
221 return
223 return
222
224
223 msg = self.session.unpack_message(msg, content=False, copy=False)
225 msg = self.session.unpack_message(msg, content=False, copy=False)
224 print idents,msg
225 header = msg['header']
226 header = msg['header']
226 msg_id = header['msg_id']
227 msg_id = header['msg_id']
227 after = Dependency(header.get('after', []))
228 after = Dependency(header.get('after', []))
@@ -233,7 +234,6 b' class TaskScheduler(object):'
233 after = Dependency([])
234 after = Dependency([])
234
235
235 follow = Dependency(header.get('follow', []))
236 follow = Dependency(header.get('follow', []))
236 print raw_msg
237 if len(after) == 0:
237 if len(after) == 0:
238 # time deps already met, try to run
238 # time deps already met, try to run
239 if not self.maybe_run(msg_id, raw_msg, follow):
239 if not self.maybe_run(msg_id, raw_msg, follow):
@@ -268,9 +268,11 b' class TaskScheduler(object):'
268 self.depending[msg_id] = (msg_id,msg,after,follow)
268 self.depending[msg_id] = (msg_id,msg,after,follow)
269 # track the ids in both follow/after, but not those already completed
269 # track the ids in both follow/after, but not those already completed
270 for dep_id in after.union(follow).difference(self.all_done):
270 for dep_id in after.union(follow).difference(self.all_done):
271 print dep_id
271 if dep_id not in self.dependencies:
272 if dep_id not in self.dependencies:
272 self.dependencies[dep_id] = set()
273 self.dependencies[dep_id] = set()
273 self.dependencies[dep_id].add(msg_id)
274 self.dependencies[dep_id].add(msg_id)
275
274 @logged
276 @logged
275 def submit_task(self, msg_id, msg, follow=None, indices=None):
277 def submit_task(self, msg_id, msg, follow=None, indices=None):
276 """submit a task to any of a subset of our targets"""
278 """submit a task to any of a subset of our targets"""
@@ -283,8 +285,8 b' class TaskScheduler(object):'
283 idx = indices[idx]
285 idx = indices[idx]
284 target = self.targets[idx]
286 target = self.targets[idx]
285 print target, map(str, msg[:3])
287 print target, map(str, msg[:3])
286 self.engine_stream.socket.send(target, flags=zmq.SNDMORE, copy=False)
288 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
287 self.engine_stream.socket.send_multipart(msg, copy=False)
289 self.engine_stream.send_multipart(msg, copy=False)
288 self.add_job(idx)
290 self.add_job(idx)
289 self.pending[target][msg_id] = (msg, follow)
291 self.pending[target][msg_id] = (msg, follow)
290
292
@@ -305,7 +307,7 b' class TaskScheduler(object):'
305 # send to monitor
307 # send to monitor
306 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
308 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
307 else:
309 else:
308 self.handle_unmet_dependency(self, idents, msg['parent_header'])
310 self.handle_unmet_dependency(idents, msg['parent_header'])
309
311
310 @logged
312 @logged
311 def handle_result_success(self, idents, parent, raw_msg):
313 def handle_result_success(self, idents, parent, raw_msg):
@@ -331,7 +333,7 b' class TaskScheduler(object):'
331 self.blacklist[msg_id] = set()
333 self.blacklist[msg_id] = set()
332 self.blacklist[msg_id].add(engine)
334 self.blacklist[msg_id].add(engine)
333 raw_msg,follow = self.pending[engine].pop(msg_id)
335 raw_msg,follow = self.pending[engine].pop(msg_id)
334 if not self.maybe_run(raw_msg, follow):
336 if not self.maybe_run(msg_id, raw_msg, follow):
335 # resubmit failed, put it back in our dependency tree
337 # resubmit failed, put it back in our dependency tree
336 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
338 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
337 pass
339 pass
@@ -350,7 +352,8 b' class TaskScheduler(object):'
350 if self.maybe_run(msg_id, raw_msg, follow):
352 if self.maybe_run(msg_id, raw_msg, follow):
351 self.depending.pop(job)
353 self.depending.pop(job)
352 for mid in follow:
354 for mid in follow:
353 self.dependencies[mid].remove(msg_id)
355 if mid in self.dependencies:
356 self.dependencies[mid].remove(msg_id)
354
357
355 #----------------------------------------------------------------------
358 #----------------------------------------------------------------------
356 # methods to be overridden by subclasses
359 # methods to be overridden by subclasses
@@ -438,10 +438,11 b' class StreamSession(object):'
438 return idents, msg
438 return idents, msg
439
439
440 def unpack_message(self, msg, content=True, copy=True):
440 def unpack_message(self, msg, content=True, copy=True):
441 """return a message object from the format
441 """Return a message object from the format
442 sent by self.send.
442 sent by self.send.
443
443
444 parameters:
444 Parameters:
445 -----------
445
446
446 content : bool (True)
447 content : bool (True)
447 whether to unpack the content dict (True),
448 whether to unpack the content dict (True),
General Comments 0
You need to be logged in to leave comments. Login now