##// END OF EJS Templates
basic LoadBalancedView, RemoteFunction
MinRK -
Show More
@@ -21,8 +21,8 b' from zmq.eventloop import ioloop, zmqstream'
21 from IPython.external.decorator import decorator
21 from IPython.external.decorator import decorator
22
22
23 import streamsession as ss
23 import streamsession as ss
24 from remotenamespace import RemoteNamespace
24 # from remotenamespace import RemoteNamespace
25 from view import DirectView
25 from view import DirectView, LoadBalancedView
26 from dependency import Dependency, depend, require
26 from dependency import Dependency, depend, require
27
27
28 def _push(ns):
28 def _push(ns):
@@ -31,8 +31,13 b' def _push(ns):'
31 def _pull(keys):
31 def _pull(keys):
32 g = globals()
32 g = globals()
33 if isinstance(keys, (list,tuple, set)):
33 if isinstance(keys, (list,tuple, set)):
34 for key in keys:
35 if not g.has_key(key):
36 raise NameError("name '%s' is not defined"%key)
34 return map(g.get, keys)
37 return map(g.get, keys)
35 else:
38 else:
39 if not g.has_key(keys):
40 raise NameError("name '%s' is not defined"%keys)
36 return g.get(keys)
41 return g.get(keys)
37
42
38 def _clear():
43 def _clear():
@@ -62,10 +67,35 b' def defaultblock(f, self, *args, **kwargs):'
62 self.block = saveblock
67 self.block = saveblock
63 return ret
68 return ret
64
69
70 def remote(client, block=None, targets=None):
71 """Turn a function into a remote function.
72
73 This method can be used for map:
74
75 >>> @remote(client,block=True)
76 def func(a)
77 """
78 def remote_function(f):
79 return RemoteFunction(client, f, block, targets)
80 return remote_function
81
65 #--------------------------------------------------------------------------
82 #--------------------------------------------------------------------------
66 # Classes
83 # Classes
67 #--------------------------------------------------------------------------
84 #--------------------------------------------------------------------------
68
85
86 class RemoteFunction(object):
87 """Turn an existing function into a remote function"""
88
89 def __init__(self, client, f, block=None, targets=None):
90 self.client = client
91 self.func = f
92 self.block=block
93 self.targets=targets
94
95 def __call__(self, *args, **kwargs):
96 return self.client.apply(self.func, args=args, kwargs=kwargs,
97 block=self.block, targets=self.targets)
98
69
99
70 class AbortedTask(object):
100 class AbortedTask(object):
71 """A basic wrapper object describing an aborted task."""
101 """A basic wrapper object describing an aborted task."""
@@ -84,7 +114,7 b' class Client(object):'
84 Parameters
114 Parameters
85 ----------
115 ----------
86
116
87 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101
117 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
88 The address of the controller's registration socket.
118 The address of the controller's registration socket.
89
119
90
120
@@ -281,7 +311,8 b' class Client(object):'
281 elif content['status'] == 'aborted':
311 elif content['status'] == 'aborted':
282 self.results[msg_id] = AbortedTask(msg_id)
312 self.results[msg_id] = AbortedTask(msg_id)
283 elif content['status'] == 'resubmitted':
313 elif content['status'] == 'resubmitted':
284 pass # handle resubmission
314 # TODO: handle resubmission
315 pass
285 else:
316 else:
286 self.results[msg_id] = ss.unwrap_exception(content)
317 self.results[msg_id] = ss.unwrap_exception(content)
287
318
@@ -318,7 +349,9 b' class Client(object):'
318
349
319 def _flush_control(self, sock):
350 def _flush_control(self, sock):
320 """Flush replies from the control channel waiting
351 """Flush replies from the control channel waiting
321 in the ZMQ queue."""
352 in the ZMQ queue.
353
354 Currently: ignore them."""
322 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
355 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
323 while msg is not None:
356 while msg is not None:
324 if self.debug:
357 if self.debug:
@@ -330,7 +363,10 b' class Client(object):'
330 #--------------------------------------------------------------------------
363 #--------------------------------------------------------------------------
331
364
332 def __getitem__(self, key):
365 def __getitem__(self, key):
333 """Dict access returns DirectView multiplexer objects."""
366 """Dict access returns DirectView multiplexer objects or,
367 if key is None, a LoadBalancedView."""
368 if key is None:
369 return LoadBalancedView(self)
334 if isinstance(key, int):
370 if isinstance(key, int):
335 if key not in self.ids:
371 if key not in self.ids:
336 raise IndexError("No such engine: %i"%key)
372 raise IndexError("No such engine: %i"%key)
@@ -420,7 +456,7 b' class Client(object):'
420 if self.debug:
456 if self.debug:
421 pprint(msg)
457 pprint(msg)
422 if msg['content']['status'] != 'ok':
458 if msg['content']['status'] != 'ok':
423 error = msg['content']
459 error = ss.unwrap_exception(msg['content'])
424 if error:
460 if error:
425 return error
461 return error
426
462
@@ -443,7 +479,7 b' class Client(object):'
443 if self.debug:
479 if self.debug:
444 pprint(msg)
480 pprint(msg)
445 if msg['content']['status'] != 'ok':
481 if msg['content']['status'] != 'ok':
446 error = msg['content']
482 error = ss.unwrap_exception(msg['content'])
447 if error:
483 if error:
448 return error
484 return error
449
485
@@ -461,7 +497,7 b' class Client(object):'
461 if self.debug:
497 if self.debug:
462 pprint(msg)
498 pprint(msg)
463 if msg['content']['status'] != 'ok':
499 if msg['content']['status'] != 'ok':
464 error = msg['content']
500 error = ss.unwrap_exception(msg['content'])
465 if error:
501 if error:
466 return error
502 return error
467
503
@@ -719,7 +755,7 b' class Client(object):'
719 local_results[msg_id] = self.results[msg_id]
755 local_results[msg_id] = self.results[msg_id]
720 theids.remove(msg_id)
756 theids.remove(msg_id)
721
757
722 if msg_ids: # some not locally cached
758 if theids: # some not locally cached
723 content = dict(msg_ids=theids, status_only=status_only)
759 content = dict(msg_ids=theids, status_only=status_only)
724 msg = self.session.send(self._query_socket, "result_request", content=content)
760 msg = self.session.send(self._query_socket, "result_request", content=content)
725 zmq.select([self._query_socket], [], [])
761 zmq.select([self._query_socket], [], [])
@@ -42,13 +42,15 b' class View(object):'
42 _targets = None
42 _targets = None
43 _ntargets = None
43 _ntargets = None
44 block=None
44 block=None
45 bound=None
45 history=None
46 history=None
46
47
47 def __init__(self, client, targets):
48 def __init__(self, client, targets=None):
48 self.client = client
49 self.client = client
49 self._targets = targets
50 self._targets = targets
50 self._ntargets = 1 if isinstance(targets, int) else len(targets)
51 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
51 self.block = client.block
52 self.block = client.block
53 self.bound=True
52 self.history = []
54 self.history = []
53 self.outstanding = set()
55 self.outstanding = set()
54 self.results = {}
56 self.results = {}
@@ -84,7 +86,7 b' class View(object):'
84 else:
86 else:
85 returns actual result of f(*args, **kwargs)
87 returns actual result of f(*args, **kwargs)
86 """
88 """
87 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
89 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
88
90
89 @save_ids
91 @save_ids
90 def apply_async(self, f, *args, **kwargs):
92 def apply_async(self, f, *args, **kwargs):
@@ -148,6 +150,29 b' class View(object):'
148 """
150 """
149 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
151 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
150
152
153 def abort(self, msg_ids=None, block=None):
154 """Abort jobs on my engines.
155
156 Parameters
157 ----------
158
159 msg_ids : None, str, list of strs, optional
160 if None: abort all jobs.
161 else: abort specific msg_id(s).
162 """
163 block = block if block is not None else self.block
164 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
165
166 def queue_status(self, verbose=False):
167 """Fetch the Queue status of my engines"""
168 return self.client.queue_status(targets=self.targets, verbose=verbose)
169
170 def purge_results(self, msg_ids=[],targets=[]):
171 """Instruct the controller to forget specific results."""
172 if targets is None or targets == 'all':
173 targets = self.targets
174 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
175
151
176
152 class DirectView(View):
177 class DirectView(View):
153 """Direct Multiplexer View"""
178 """Direct Multiplexer View"""
@@ -156,15 +181,21 b' class DirectView(View):'
156 """update remote namespace with dict `ns`"""
181 """update remote namespace with dict `ns`"""
157 return self.client.push(ns, targets=self.targets, block=self.block)
182 return self.client.push(ns, targets=self.targets, block=self.block)
158
183
184 push = update
185
159 def get(self, key_s):
186 def get(self, key_s):
160 """get object(s) by `key_s` from remote namespace
187 """get object(s) by `key_s` from remote namespace
161 will return one object if it is a key.
188 will return one object if it is a key.
162 It also takes a list of keys, and will return a list of objects."""
189 It also takes a list of keys, and will return a list of objects."""
163 # block = block if block is not None else self.block
190 # block = block if block is not None else self.block
164 return self.client.pull(key_s, block=self.block, targets=self.targets)
191 return self.client.pull(key_s, block=True, targets=self.targets)
165
192
166 push = update
193 def pull(self, key_s, block=True):
167 pull = get
194 """get object(s) by `key_s` from remote namespace
195 will return one object if it is a key.
196 It also takes a list of keys, and will return a list of objects."""
197 block = block if block is not None else self.block
198 return self.client.pull(key_s, block=block, targets=self.targets)
168
199
169 def __getitem__(self, key):
200 def __getitem__(self, key):
170 return self.get(key)
201 return self.get(key)
@@ -182,19 +213,8 b' class DirectView(View):'
182 block = block if block is not None else self.block
213 block = block if block is not None else self.block
183 return self.client.kill(targets=self.targets,block=block)
214 return self.client.kill(targets=self.targets, block=block)
184
215
185 def abort(self, msg_ids=None, block=None):
186 """Abort jobs on my engines.
187
188 Parameters
189 ----------
190
191 msg_ids : None, str, list of strs, optional
192 if None: abort all jobs.
193 else: abort specific msg_id(s).
194 """
195 block = block if block is not None else self.block
196 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
197
198 class LoadBalancedView(View):
216 class LoadBalancedView(View):
199 _targets=None
217 def __repr__(self):
218 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
219
200 No newline at end of file
220
General Comments 0
You need to be logged in to leave comments. Login now