##// END OF EJS Templates
basic LoadBalancedView, RemoteFunction
MinRK -
Show More
@@ -21,8 +21,8 b' from zmq.eventloop import ioloop, zmqstream'
21 21 from IPython.external.decorator import decorator
22 22
23 23 import streamsession as ss
24 from remotenamespace import RemoteNamespace
25 from view import DirectView
24 # from remotenamespace import RemoteNamespace
25 from view import DirectView, LoadBalancedView
26 26 from dependency import Dependency, depend, require
27 27
28 28 def _push(ns):
@@ -31,8 +31,13 b' def _push(ns):'
31 31 def _pull(keys):
32 32 g = globals()
33 33 if isinstance(keys, (list,tuple, set)):
34 for key in keys:
35 if not g.has_key(key):
36 raise NameError("name '%s' is not defined"%key)
34 37 return map(g.get, keys)
35 38 else:
39 if not g.has_key(keys):
40 raise NameError("name '%s' is not defined"%keys)
36 41 return g.get(keys)
37 42
38 43 def _clear():
@@ -62,10 +67,35 b' def defaultblock(f, self, *args, **kwargs):'
62 67 self.block = saveblock
63 68 return ret
64 69
70 def remote(client, block=None, targets=None):
71 """Turn a function into a remote function.
72
73 This method can be used for map:
74
75 >>> @remote(client,block=True)
76 def func(a)
77 """
78 def remote_function(f):
79 return RemoteFunction(client, f, block, targets)
80 return remote_function
81
65 82 #--------------------------------------------------------------------------
66 83 # Classes
67 84 #--------------------------------------------------------------------------
68 85
86 class RemoteFunction(object):
87 """Turn an existing function into a remote function"""
88
89 def __init__(self, client, f, block=None, targets=None):
90 self.client = client
91 self.func = f
92 self.block=block
93 self.targets=targets
94
95 def __call__(self, *args, **kwargs):
96 return self.client.apply(self.func, args=args, kwargs=kwargs,
97 block=self.block, targets=self.targets)
98
69 99
70 100 class AbortedTask(object):
71 101 """A basic wrapper object describing an aborted task."""
@@ -84,7 +114,7 b' class Client(object):'
84 114 Parameters
85 115 ----------
86 116
87 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101
117 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
88 118 The address of the controller's registration socket.
89 119
90 120
@@ -281,7 +311,8 b' class Client(object):'
281 311 elif content['status'] == 'aborted':
282 312 self.results[msg_id] = AbortedTask(msg_id)
283 313 elif content['status'] == 'resubmitted':
284 pass # handle resubmission
314 # TODO: handle resubmission
315 pass
285 316 else:
286 317 self.results[msg_id] = ss.unwrap_exception(content)
287 318
@@ -318,7 +349,9 b' class Client(object):'
318 349
319 350 def _flush_control(self, sock):
320 351 """Flush replies from the control channel waiting
321 in the ZMQ queue."""
352 in the ZMQ queue.
353
354 Currently: ignore them."""
322 355 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
323 356 while msg is not None:
324 357 if self.debug:
@@ -330,7 +363,10 b' class Client(object):'
330 363 #--------------------------------------------------------------------------
331 364
332 365 def __getitem__(self, key):
333 """Dict access returns DirectView multiplexer objects."""
366 """Dict access returns DirectView multiplexer objects or,
367 if key is None, a LoadBalancedView."""
368 if key is None:
369 return LoadBalancedView(self)
334 370 if isinstance(key, int):
335 371 if key not in self.ids:
336 372 raise IndexError("No such engine: %i"%key)
@@ -412,7 +448,7 b' class Client(object):'
412 448 """Clear the namespace in target(s)."""
413 449 targets = self._build_targets(targets)[0]
414 450 for t in targets:
415 self.session.send(self._control_socket, 'clear_request', content={},ident=t)
451 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
416 452 error = False
417 453 if self.block:
418 454 for i in range(len(targets)):
@@ -420,7 +456,7 b' class Client(object):'
420 456 if self.debug:
421 457 pprint(msg)
422 458 if msg['content']['status'] != 'ok':
423 error = msg['content']
459 error = ss.unwrap_exception(msg['content'])
424 460 if error:
425 461 return error
426 462
@@ -443,7 +479,7 b' class Client(object):'
443 479 if self.debug:
444 480 pprint(msg)
445 481 if msg['content']['status'] != 'ok':
446 error = msg['content']
482 error = ss.unwrap_exception(msg['content'])
447 483 if error:
448 484 return error
449 485
@@ -461,7 +497,7 b' class Client(object):'
461 497 if self.debug:
462 498 pprint(msg)
463 499 if msg['content']['status'] != 'ok':
464 error = msg['content']
500 error = ss.unwrap_exception(msg['content'])
465 501 if error:
466 502 return error
467 503
@@ -719,7 +755,7 b' class Client(object):'
719 755 local_results[msg_id] = self.results[msg_id]
720 756 theids.remove(msg_id)
721 757
722 if msg_ids: # some not locally cached
758 if theids: # some not locally cached
723 759 content = dict(msg_ids=theids, status_only=status_only)
724 760 msg = self.session.send(self._query_socket, "result_request", content=content)
725 761 zmq.select([self._query_socket], [], [])
@@ -42,13 +42,15 b' class View(object):'
42 42 _targets = None
43 43 _ntargets = None
44 44 block=None
45 bound=None
45 46 history=None
46 47
47 def __init__(self, client, targets):
48 def __init__(self, client, targets=None):
48 49 self.client = client
49 50 self._targets = targets
50 self._ntargets = 1 if isinstance(targets, int) else len(targets)
51 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
51 52 self.block = client.block
53 self.bound=True
52 54 self.history = []
53 55 self.outstanding = set()
54 56 self.results = {}
@@ -84,7 +86,7 b' class View(object):'
84 86 else:
85 87 returns actual result of f(*args, **kwargs)
86 88 """
87 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
89 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
88 90
89 91 @save_ids
90 92 def apply_async(self, f, *args, **kwargs):
@@ -147,6 +149,29 b' class View(object):'
147 149
148 150 """
149 151 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
152
153 def abort(self, msg_ids=None, block=None):
154 """Abort jobs on my engines.
155
156 Parameters
157 ----------
158
159 msg_ids : None, str, list of strs, optional
160 if None: abort all jobs.
161 else: abort specific msg_id(s).
162 """
163 block = block if block is not None else self.block
164 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
165
166 def queue_status(self, verbose=False):
167 """Fetch the Queue status of my engines"""
168 return self.client.queue_status(targets=self.targets, verbose=verbose)
169
170 def purge_results(self, msg_ids=[],targets=[]):
171 """Instruct the controller to forget specific results."""
172 if targets is None or targets == 'all':
173 targets = self.targets
174 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
150 175
151 176
152 177 class DirectView(View):
@@ -156,45 +181,40 b' class DirectView(View):'
156 181 """update remote namespace with dict `ns`"""
157 182 return self.client.push(ns, targets=self.targets, block=self.block)
158 183
184 push = update
185
159 186 def get(self, key_s):
160 187 """get object(s) by `key_s` from remote namespace
161 188 will return one object if it is a key.
162 189 It also takes a list of keys, and will return a list of objects."""
163 190 # block = block if block is not None else self.block
164 return self.client.pull(key_s, block=self.block, targets=self.targets)
191 return self.client.pull(key_s, block=True, targets=self.targets)
165 192
166 push = update
167 pull = get
193 def pull(self, key_s, block=True):
194 """get object(s) by `key_s` from remote namespace
195 will return one object if it is a key.
196 It also takes a list of keys, and will return a list of objects."""
197 block = block if block is not None else self.block
198 return self.client.pull(key_s, block=block, targets=self.targets)
168 199
169 200 def __getitem__(self, key):
170 201 return self.get(key)
171 202
172 def __setitem__(self,key,value):
203 def __setitem__(self,key, value):
173 204 self.update({key:value})
174 205
175 206 def clear(self, block=False):
176 207 """Clear the remote namespaces on my engines."""
177 208 block = block if block is not None else self.block
178 return self.client.clear(targets=self.targets,block=block)
209 return self.client.clear(targets=self.targets, block=block)
179 210
180 211 def kill(self, block=True):
181 212 """Kill my engines."""
182 213 block = block if block is not None else self.block
183 return self.client.kill(targets=self.targets,block=block)
214 return self.client.kill(targets=self.targets, block=block)
184 215
185 def abort(self, msg_ids=None, block=None):
186 """Abort jobs on my engines.
187
188 Parameters
189 ----------
190
191 msg_ids : None, str, list of strs, optional
192 if None: abort all jobs.
193 else: abort specific msg_id(s).
194 """
195 block = block if block is not None else self.block
196 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
197
198 216 class LoadBalancedView(View):
199 _targets=None
217 def __repr__(self):
218 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
219
200 220 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now