From 7f42766d5cb46d1d28a837add0fddcaaec306c36 2011-04-08 00:38:08 From: MinRK Date: 2011-04-08 00:38:08 Subject: [PATCH] scheduler progress --- diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index ae1cbcb..daf29af 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -1,15 +1,24 @@ -#!/usr/bin/env python """A semi-synchronous Client for the ZMQ controller""" +#----------------------------------------------------------------------------- +# Copyright (C) 2010 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- -import time +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- +import time from pprint import pprint +import zmq +from zmq.eventloop import ioloop, zmqstream + from IPython.external.decorator import decorator import streamsession as ss -import zmq -from zmq.eventloop import ioloop, zmqstream from remotenamespace import RemoteNamespace from view import DirectView from dependency import Dependency, depend, require @@ -357,7 +366,6 @@ class Client(object): def clear(self, targets=None, block=None): """clear the namespace in target(s)""" targets = self._build_targets(targets)[0] - print targets for t in targets: self.session.send(self.control_socket, 'clear_request', content={},ident=t) error = False @@ -377,7 +385,6 @@ class Client(object): def abort(self, msg_ids = None, targets=None, block=None): """abort the Queues of target(s)""" targets = self._build_targets(targets)[0] - print targets if isinstance(msg_ids, basestring): msg_ids = [msg_ids] content = dict(msg_ids=msg_ids) @@ -400,7 +407,6 @@ class Client(object): def kill(self, targets=None, block=None): """Terminates one or more engine processes.""" targets = self._build_targets(targets)[0] - print targets for t in targets: self.session.send(self.control_socket, 'kill_request', content={},ident=t) error = False @@ -456,11 +462,20 @@ class Client(object): """the underlying method for applying functions in a load balanced manner.""" block = block if block is not None else self.block + if isinstance(after, Dependency): + after = after.as_dict() + elif after is None: + after = [] + if isinstance(follow, Dependency): + follow = follow.as_dict() + elif follow is None: + follow = [] + subheader = dict(after=after, follow=follow) bufs = ss.pack_apply_message(f,args,kwargs) content = dict(bound=bound) msg = self.session.send(self.task_socket, "apply_request", - content=content, buffers=bufs) + content=content, buffers=bufs, subheader=subheader) msg_id = msg['msg_id'] self.outstanding.add(msg_id) self.history.append(msg_id) @@ -477,7 +492,6 @@ class Client(object): block = block if block is not None else self.block queues,targets = self._build_targets(targets) - print queues bufs = ss.pack_apply_message(f,args,kwargs) if isinstance(after, Dependency): after = after.as_dict() diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index 4cab848..8238bc9 100644 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -1,11 +1,9 @@ #!/usr/bin/env python -# encoding: utf-8 - """The IPython Controller with 0MQ This is the master object that handles connections from engines, clients, and """ #----------------------------------------------------------------------------- -# Copyright (C) 2008-2009 The IPython Development Team +# Copyright (C) 2010 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. @@ -216,10 +214,10 @@ class Controller(object): newid += 1 return newid - #----------------------------------------------------------------------------- # message validation #----------------------------------------------------------------------------- + def _validate_targets(self, targets): """turn any valid targets argument into a list of integer ids""" if targets is None: @@ -262,7 +260,7 @@ class Controller(object): #----------------------------------------------------------------------------- - # dispatch methods (1 per socket) + # dispatch methods (1 per stream) #----------------------------------------------------------------------------- def dispatch_register_request(self, msg): @@ -463,7 +461,7 @@ class Controller(object): if msg_id in self.mia: self.mia.remove(msg_id) else: - logger.debug("task:: unknown task %s finished"%msg_id) + logger.debug("task::unknown task %s finished"%msg_id) def save_task_destination(self, idents, msg): try: @@ -479,7 +477,7 @@ class Controller(object): if queue_id == engine_uuid: break - logger.info("task:: task %s arrived on %s"%(msg_id, eid)) + logger.info("task::task %s arrived on %s"%(msg_id, eid)) if msg_id in self.mia: self.mia.remove(msg_id) else: diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 8d2e388..f0fc41a 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -24,6 +24,7 @@ from IPython.external.decorator import decorator def logged(f,self,*args,**kwargs): print ("#--------------------") print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) + print ("#--") return f(self,*args, **kwargs) #---------------------------------------------------------------------- @@ -91,7 +92,7 @@ def leastload(loads): # Classes #--------------------------------------------------------------------- class TaskScheduler(object): - """Simple Python TaskScheduler object. + """Python TaskScheduler object. This is the simplest object that supports msg_id based DAG dependencies. *Only* task msg_ids are checked, not @@ -136,6 +137,7 @@ class TaskScheduler(object): self.completed = {} self.pending = {} self.all_done = set() + self.blacklist = {} self.targets = [] self.loads = [] @@ -221,7 +223,6 @@ class TaskScheduler(object): return msg = self.session.unpack_message(msg, content=False, copy=False) - print idents,msg header = msg['header'] msg_id = header['msg_id'] after = Dependency(header.get('after', [])) @@ -233,7 +234,6 @@ class TaskScheduler(object): after = Dependency([]) follow = Dependency(header.get('follow', [])) - print raw_msg if len(after) == 0: # time deps already met, try to run if not self.maybe_run(msg_id, raw_msg, follow): @@ -268,9 +268,11 @@ class TaskScheduler(object): self.depending[msg_id] = (msg_id,msg,after,follow) # track the ids in both follow/after, but not those already completed for dep_id in after.union(follow).difference(self.all_done): + print dep_id if dep_id not in self.dependencies: self.dependencies[dep_id] = set() self.dependencies[dep_id].add(msg_id) + @logged def submit_task(self, msg_id, msg, follow=None, indices=None): """submit a task to any of a subset of our targets""" @@ -283,8 +285,8 @@ class TaskScheduler(object): idx = indices[idx] target = self.targets[idx] print target, map(str, msg[:3]) - self.engine_stream.socket.send(target, flags=zmq.SNDMORE, copy=False) - self.engine_stream.socket.send_multipart(msg, copy=False) + self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) + self.engine_stream.send_multipart(msg, copy=False) self.add_job(idx) self.pending[target][msg_id] = (msg, follow) @@ -305,7 +307,7 @@ class TaskScheduler(object): # send to monitor self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) else: - self.handle_unmet_dependency(self, idents, msg['parent_header']) + self.handle_unmet_dependency(idents, msg['parent_header']) @logged def handle_result_success(self, idents, parent, raw_msg): @@ -331,7 +333,7 @@ class TaskScheduler(object): self.blacklist[msg_id] = set() self.blacklist[msg_id].add(engine) raw_msg,follow = self.pending[engine].pop(msg_id) - if not self.maybe_run(raw_msg, follow): + if not self.maybe_run(msg_id, raw_msg, follow): # resubmit failed, put it back in our dependency tree self.save_unmet(msg_id, raw_msg, Dependency(), follow) pass @@ -350,7 +352,8 @@ class TaskScheduler(object): if self.maybe_run(msg_id, raw_msg, follow): self.depending.pop(job) for mid in follow: - self.dependencies[mid].remove(msg_id) + if mid in self.dependencies: + self.dependencies[mid].remove(msg_id) #---------------------------------------------------------------------- # methods to be overridden by subclasses diff --git a/IPython/zmq/parallel/streamsession.py b/IPython/zmq/parallel/streamsession.py index 12f9160..5862d53 100644 --- a/IPython/zmq/parallel/streamsession.py +++ b/IPython/zmq/parallel/streamsession.py @@ -438,10 +438,11 @@ class StreamSession(object): return idents, msg def unpack_message(self, msg, content=True, copy=True): - """return a message object from the format + """Return a message object from the format sent by self.send. - parameters: + Parameters: + ----------- content : bool (True) whether to unpack the content dict (True),