##// END OF EJS Templates
scheduler progress
MinRK -
Show More
@@ -1,15 +1,24 b''
1 #!/usr/bin/env python
2 1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
3 8
4 import time
9 #-----------------------------------------------------------------------------
10 # Imports
11 #-----------------------------------------------------------------------------
5 12
13 import time
6 14 from pprint import pprint
7 15
16 import zmq
17 from zmq.eventloop import ioloop, zmqstream
18
8 19 from IPython.external.decorator import decorator
9 20
10 21 import streamsession as ss
11 import zmq
12 from zmq.eventloop import ioloop, zmqstream
13 22 from remotenamespace import RemoteNamespace
14 23 from view import DirectView
15 24 from dependency import Dependency, depend, require
@@ -357,7 +366,6 b' class Client(object):'
357 366 def clear(self, targets=None, block=None):
358 367 """clear the namespace in target(s)"""
359 368 targets = self._build_targets(targets)[0]
360 print targets
361 369 for t in targets:
362 370 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
363 371 error = False
@@ -377,7 +385,6 b' class Client(object):'
377 385 def abort(self, msg_ids = None, targets=None, block=None):
378 386 """abort the Queues of target(s)"""
379 387 targets = self._build_targets(targets)[0]
380 print targets
381 388 if isinstance(msg_ids, basestring):
382 389 msg_ids = [msg_ids]
383 390 content = dict(msg_ids=msg_ids)
@@ -400,7 +407,6 b' class Client(object):'
400 407 def kill(self, targets=None, block=None):
401 408 """Terminates one or more engine processes."""
402 409 targets = self._build_targets(targets)[0]
403 print targets
404 410 for t in targets:
405 411 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
406 412 error = False
@@ -456,11 +462,20 b' class Client(object):'
456 462 """the underlying method for applying functions in a load balanced
457 463 manner."""
458 464 block = block if block is not None else self.block
465 if isinstance(after, Dependency):
466 after = after.as_dict()
467 elif after is None:
468 after = []
469 if isinstance(follow, Dependency):
470 follow = follow.as_dict()
471 elif follow is None:
472 follow = []
473 subheader = dict(after=after, follow=follow)
459 474
460 475 bufs = ss.pack_apply_message(f,args,kwargs)
461 476 content = dict(bound=bound)
462 477 msg = self.session.send(self.task_socket, "apply_request",
463 content=content, buffers=bufs)
478 content=content, buffers=bufs, subheader=subheader)
464 479 msg_id = msg['msg_id']
465 480 self.outstanding.add(msg_id)
466 481 self.history.append(msg_id)
@@ -477,7 +492,6 b' class Client(object):'
477 492 block = block if block is not None else self.block
478 493
479 494 queues,targets = self._build_targets(targets)
480 print queues
481 495 bufs = ss.pack_apply_message(f,args,kwargs)
482 496 if isinstance(after, Dependency):
483 497 after = after.as_dict()
@@ -1,11 +1,9 b''
1 1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 2 """The IPython Controller with 0MQ
5 3 This is the master object that handles connections from engines, clients, and
6 4 """
7 5 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
9 7 #
10 8 # Distributed under the terms of the BSD License. The full license is in
11 9 # the file COPYING, distributed as part of this software.
@@ -216,10 +214,10 b' class Controller(object):'
216 214 newid += 1
217 215 return newid
218 216
219
220 217 #-----------------------------------------------------------------------------
221 218 # message validation
222 219 #-----------------------------------------------------------------------------
220
223 221 def _validate_targets(self, targets):
224 222 """turn any valid targets argument into a list of integer ids"""
225 223 if targets is None:
@@ -262,7 +260,7 b' class Controller(object):'
262 260
263 261
264 262 #-----------------------------------------------------------------------------
265 # dispatch methods (1 per socket)
263 # dispatch methods (1 per stream)
266 264 #-----------------------------------------------------------------------------
267 265
268 266 def dispatch_register_request(self, msg):
@@ -463,7 +461,7 b' class Controller(object):'
463 461 if msg_id in self.mia:
464 462 self.mia.remove(msg_id)
465 463 else:
466 logger.debug("task:: unknown task %s finished"%msg_id)
464 logger.debug("task::unknown task %s finished"%msg_id)
467 465
468 466 def save_task_destination(self, idents, msg):
469 467 try:
@@ -479,7 +477,7 b' class Controller(object):'
479 477 if queue_id == engine_uuid:
480 478 break
481 479
482 logger.info("task:: task %s arrived on %s"%(msg_id, eid))
480 logger.info("task::task %s arrived on %s"%(msg_id, eid))
483 481 if msg_id in self.mia:
484 482 self.mia.remove(msg_id)
485 483 else:
@@ -24,6 +24,7 b' from IPython.external.decorator import decorator'
24 24 def logged(f,self,*args,**kwargs):
25 25 print ("#--------------------")
26 26 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
27 print ("#--")
27 28 return f(self,*args, **kwargs)
28 29
29 30 #----------------------------------------------------------------------
@@ -91,7 +92,7 b' def leastload(loads):'
91 92 # Classes
92 93 #---------------------------------------------------------------------
93 94 class TaskScheduler(object):
94 """Simple Python TaskScheduler object.
95 """Python TaskScheduler object.
95 96
96 97 This is the simplest object that supports msg_id based
97 98 DAG dependencies. *Only* task msg_ids are checked, not
@@ -136,6 +137,7 b' class TaskScheduler(object):'
136 137 self.completed = {}
137 138 self.pending = {}
138 139 self.all_done = set()
140 self.blacklist = {}
139 141
140 142 self.targets = []
141 143 self.loads = []
@@ -221,7 +223,6 b' class TaskScheduler(object):'
221 223 return
222 224
223 225 msg = self.session.unpack_message(msg, content=False, copy=False)
224 print idents,msg
225 226 header = msg['header']
226 227 msg_id = header['msg_id']
227 228 after = Dependency(header.get('after', []))
@@ -233,7 +234,6 b' class TaskScheduler(object):'
233 234 after = Dependency([])
234 235
235 236 follow = Dependency(header.get('follow', []))
236 print raw_msg
237 237 if len(after) == 0:
238 238 # time deps already met, try to run
239 239 if not self.maybe_run(msg_id, raw_msg, follow):
@@ -268,9 +268,11 b' class TaskScheduler(object):'
268 268 self.depending[msg_id] = (msg_id,msg,after,follow)
269 269 # track the ids in both follow/after, but not those already completed
270 270 for dep_id in after.union(follow).difference(self.all_done):
271 print dep_id
271 272 if dep_id not in self.dependencies:
272 273 self.dependencies[dep_id] = set()
273 274 self.dependencies[dep_id].add(msg_id)
275
274 276 @logged
275 277 def submit_task(self, msg_id, msg, follow=None, indices=None):
276 278 """submit a task to any of a subset of our targets"""
@@ -283,8 +285,8 b' class TaskScheduler(object):'
283 285 idx = indices[idx]
284 286 target = self.targets[idx]
285 287 print target, map(str, msg[:3])
286 self.engine_stream.socket.send(target, flags=zmq.SNDMORE, copy=False)
287 self.engine_stream.socket.send_multipart(msg, copy=False)
288 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
289 self.engine_stream.send_multipart(msg, copy=False)
288 290 self.add_job(idx)
289 291 self.pending[target][msg_id] = (msg, follow)
290 292
@@ -305,7 +307,7 b' class TaskScheduler(object):'
305 307 # send to monitor
306 308 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
307 309 else:
308 self.handle_unmet_dependency(self, idents, msg['parent_header'])
310 self.handle_unmet_dependency(idents, msg['parent_header'])
309 311
310 312 @logged
311 313 def handle_result_success(self, idents, parent, raw_msg):
@@ -331,7 +333,7 b' class TaskScheduler(object):'
331 333 self.blacklist[msg_id] = set()
332 334 self.blacklist[msg_id].add(engine)
333 335 raw_msg,follow = self.pending[engine].pop(msg_id)
334 if not self.maybe_run(raw_msg, follow):
336 if not self.maybe_run(msg_id, raw_msg, follow):
335 337 # resubmit failed, put it back in our dependency tree
336 338 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
337 339 pass
@@ -350,7 +352,8 b' class TaskScheduler(object):'
350 352 if self.maybe_run(msg_id, raw_msg, follow):
351 353 self.depending.pop(job)
352 354 for mid in follow:
353 self.dependencies[mid].remove(msg_id)
355 if mid in self.dependencies:
356 self.dependencies[mid].remove(msg_id)
354 357
355 358 #----------------------------------------------------------------------
356 359 # methods to be overridden by subclasses
@@ -438,10 +438,11 b' class StreamSession(object):'
438 438 return idents, msg
439 439
440 440 def unpack_message(self, msg, content=True, copy=True):
441 """return a message object from the format
441 """Return a message object from the format
442 442 sent by self.send.
443 443
444 parameters:
444 Parameters:
445 -----------
445 446
446 447 content : bool (True)
447 448 whether to unpack the content dict (True),
General Comments 0
You need to be logged in to leave comments. Login now