##// END OF EJS Templates
update with new client registration reply
MinRK -
Show More
@@ -30,7 +30,7 b' import streamsession as ss'
30 from clusterdir import ClusterDir, ClusterDirError
30 from clusterdir import ClusterDir, ClusterDirError
31 # from remotenamespace import RemoteNamespace
31 # from remotenamespace import RemoteNamespace
32 from view import DirectView, LoadBalancedView
32 from view import DirectView, LoadBalancedView
33 from dependency import Dependency, depend, require
33 from dependency import Dependency, depend, require, dependent
34 import error
34 import error
35 import map as Map
35 import map as Map
36 from asyncresult import AsyncResult, AsyncMapResult
36 from asyncresult import AsyncResult, AsyncMapResult
@@ -893,13 +893,11 b' class Client(object):'
893 block : bool (default: self.block)
893 block : bool (default: self.block)
894 Whether to wait for the result, or return immediately.
894 Whether to wait for the result, or return immediately.
895 False:
895 False:
896 returns msg_id(s)
896 returns AsyncResult
897 if multiple targets:
898 list of ids
899 True:
897 True:
900 returns actual result(s) of f(*args, **kwargs)
898 returns actual result(s) of f(*args, **kwargs)
901 if multiple targets:
899 if multiple targets:
902 dict of results, by engine ID
900 list of results, matching `targets`
903 targets : int,list of ints, 'all', None
901 targets : int,list of ints, 'all', None
904 Specify the destination of the job.
902 Specify the destination of the job.
905 if None:
903 if None:
@@ -930,16 +928,13 b' class Client(object):'
930 Returns
928 Returns
931 -------
929 -------
932 if block is False:
930 if block is False:
933 if single target:
931 return AsyncResult wrapping msg_ids
934 return msg_id
932 output of AsyncResult.get() is identical to that of `apply(...block=True)`
935 else:
936 return list of msg_ids
937 ? (should this be dict like block=True) ?
938 else:
933 else:
939 if single target:
934 if single target:
940 return result of f(*args, **kwargs)
935 return result of `f(*args, **kwargs)`
941 else:
936 else:
942 return dict of results, keyed by engine
937 return list of results, matching `targets`
943 """
938 """
944
939
945 # defaults:
940 # defaults:
@@ -977,6 +972,18 b' class Client(object):'
977 after=None, follow=None, timeout=None):
972 after=None, follow=None, timeout=None):
978 """The underlying method for applying functions in a load balanced
973 """The underlying method for applying functions in a load balanced
979 manner, via the task queue."""
974 manner, via the task queue."""
975
976 if self._task_scheme == 'pure':
977 # pure zmq scheme doesn't support dependencies
978 msg = "Pure ZMQ scheduler doesn't support dependencies"
979 if (follow or after):
980 # hard fail on DAG dependencies
981 raise RuntimeError(msg)
982 if isinstance(f, dependent):
983 # soft warn on functional dependencies
984 warnings.warn(msg, RuntimeWarning)
985
986
980 subheader = dict(after=after, follow=follow, timeout=timeout)
987 subheader = dict(after=after, follow=follow, timeout=timeout)
981 bufs = ss.pack_apply_message(f,args,kwargs)
988 bufs = ss.pack_apply_message(f,args,kwargs)
982 content = dict(bound=bound)
989 content = dict(bound=bound)
@@ -100,7 +100,7 b' class ControllerFactory(HubFactory):'
100
100
101 else:
101 else:
102 self.log.info("task::using Python %s Task scheduler"%self.scheme)
102 self.log.info("task::using Python %s Task scheduler"%self.scheme)
103 sargs = (self.client_info['task'], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
103 sargs = (self.client_info['task'][1], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
104 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
104 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
105 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
105 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
106 q.daemon=True
106 q.daemon=True
@@ -86,7 +86,7 b' Message type: ``connection_reply``::'
86 'status' : 'ok', # or 'error'
86 'status' : 'ok', # or 'error'
87 # if ok:
87 # if ok:
88 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
88 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
89 'task' : 'tcp...', # addr for task queue, or None if no task queue running
89 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple)
90 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc.
90 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc.
91 'control' : 'tcp...', # addr for control methods, like abort, etc.
91 'control' : 'tcp...', # addr for control methods, like abort, etc.
92 }
92 }
General Comments 0
You need to be logged in to leave comments. Login now