##// END OF EJS Templates
basic LoadBalancedView, RemoteFunction
MinRK -
Show More
@@ -1,818 +1,854
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from __future__ import print_function
13 from __future__ import print_function
14
14
15 import time
15 import time
16 from pprint import pprint
16 from pprint import pprint
17
17
18 import zmq
18 import zmq
19 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
20
20
21 from IPython.external.decorator import decorator
21 from IPython.external.decorator import decorator
22
22
23 import streamsession as ss
23 import streamsession as ss
24 from remotenamespace import RemoteNamespace
24 # from remotenamespace import RemoteNamespace
25 from view import DirectView
25 from view import DirectView, LoadBalancedView
26 from dependency import Dependency, depend, require
26 from dependency import Dependency, depend, require
27
27
28 def _push(ns):
28 def _push(ns):
29 globals().update(ns)
29 globals().update(ns)
30
30
31 def _pull(keys):
31 def _pull(keys):
32 g = globals()
32 g = globals()
33 if isinstance(keys, (list,tuple, set)):
33 if isinstance(keys, (list,tuple, set)):
34 for key in keys:
35 if not g.has_key(key):
36 raise NameError("name '%s' is not defined"%key)
34 return map(g.get, keys)
37 return map(g.get, keys)
35 else:
38 else:
39 if not g.has_key(keys):
40 raise NameError("name '%s' is not defined"%keys)
36 return g.get(keys)
41 return g.get(keys)
37
42
38 def _clear():
43 def _clear():
39 globals().clear()
44 globals().clear()
40
45
41 def execute(code):
46 def execute(code):
42 exec code in globals()
47 exec code in globals()
43
48
44 #--------------------------------------------------------------------------
49 #--------------------------------------------------------------------------
45 # Decorators for Client methods
50 # Decorators for Client methods
46 #--------------------------------------------------------------------------
51 #--------------------------------------------------------------------------
47
52
48 @decorator
53 @decorator
49 def spinfirst(f, self, *args, **kwargs):
54 def spinfirst(f, self, *args, **kwargs):
50 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
51 self.spin()
56 self.spin()
52 return f(self, *args, **kwargs)
57 return f(self, *args, **kwargs)
53
58
54 @decorator
59 @decorator
55 def defaultblock(f, self, *args, **kwargs):
60 def defaultblock(f, self, *args, **kwargs):
56 """Default to self.block; preserve self.block."""
61 """Default to self.block; preserve self.block."""
57 block = kwargs.get('block',None)
62 block = kwargs.get('block',None)
58 block = self.block if block is None else block
63 block = self.block if block is None else block
59 saveblock = self.block
64 saveblock = self.block
60 self.block = block
65 self.block = block
61 ret = f(self, *args, **kwargs)
66 ret = f(self, *args, **kwargs)
62 self.block = saveblock
67 self.block = saveblock
63 return ret
68 return ret
64
69
70 def remote(client, block=None, targets=None):
71 """Turn a function into a remote function.
72
73 This method can be used for map:
74
75 >>> @remote(client,block=True)
76 def func(a)
77 """
78 def remote_function(f):
79 return RemoteFunction(client, f, block, targets)
80 return remote_function
81
65 #--------------------------------------------------------------------------
82 #--------------------------------------------------------------------------
66 # Classes
83 # Classes
67 #--------------------------------------------------------------------------
84 #--------------------------------------------------------------------------
68
85
86 class RemoteFunction(object):
87 """Turn an existing function into a remote function"""
88
89 def __init__(self, client, f, block=None, targets=None):
90 self.client = client
91 self.func = f
92 self.block=block
93 self.targets=targets
94
95 def __call__(self, *args, **kwargs):
96 return self.client.apply(self.func, args=args, kwargs=kwargs,
97 block=self.block, targets=self.targets)
98
69
99
70 class AbortedTask(object):
100 class AbortedTask(object):
71 """A basic wrapper object describing an aborted task."""
101 """A basic wrapper object describing an aborted task."""
72 def __init__(self, msg_id):
102 def __init__(self, msg_id):
73 self.msg_id = msg_id
103 self.msg_id = msg_id
74
104
75 class ControllerError(Exception):
105 class ControllerError(Exception):
76 def __init__(self, etype, evalue, tb):
106 def __init__(self, etype, evalue, tb):
77 self.etype = etype
107 self.etype = etype
78 self.evalue = evalue
108 self.evalue = evalue
79 self.traceback=tb
109 self.traceback=tb
80
110
81 class Client(object):
111 class Client(object):
82 """A semi-synchronous client to the IPython ZMQ controller
112 """A semi-synchronous client to the IPython ZMQ controller
83
113
84 Parameters
114 Parameters
85 ----------
115 ----------
86
116
87 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101
117 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
88 The address of the controller's registration socket.
118 The address of the controller's registration socket.
89
119
90
120
91 Attributes
121 Attributes
92 ----------
122 ----------
93 ids : set of int engine IDs
123 ids : set of int engine IDs
94 requesting the ids attribute always synchronizes
124 requesting the ids attribute always synchronizes
95 the registration state. To request ids without synchronization,
125 the registration state. To request ids without synchronization,
96 use semi-private _ids.
126 use semi-private _ids.
97
127
98 history : list of msg_ids
128 history : list of msg_ids
99 a list of msg_ids, keeping track of all the execution
129 a list of msg_ids, keeping track of all the execution
100 messages you have submitted in order.
130 messages you have submitted in order.
101
131
102 outstanding : set of msg_ids
132 outstanding : set of msg_ids
103 a set of msg_ids that have been submitted, but whose
133 a set of msg_ids that have been submitted, but whose
104 results have not yet been received.
134 results have not yet been received.
105
135
106 results : dict
136 results : dict
107 a dict of all our results, keyed by msg_id
137 a dict of all our results, keyed by msg_id
108
138
109 block : bool
139 block : bool
110 determines default behavior when block not specified
140 determines default behavior when block not specified
111 in execution methods
141 in execution methods
112
142
113 Methods
143 Methods
114 -------
144 -------
115 spin : flushes incoming results and registration state changes
145 spin : flushes incoming results and registration state changes
116 control methods spin, and requesting `ids` also ensures up to date
146 control methods spin, and requesting `ids` also ensures up to date
117
147
118 barrier : wait on one or more msg_ids
148 barrier : wait on one or more msg_ids
119
149
120 execution methods: apply/apply_bound/apply_to/applu_bount
150 execution methods: apply/apply_bound/apply_to/applu_bount
121 legacy: execute, run
151 legacy: execute, run
122
152
123 query methods: queue_status, get_result, purge
153 query methods: queue_status, get_result, purge
124
154
125 control methods: abort, kill
155 control methods: abort, kill
126
156
127 """
157 """
128
158
129
159
130 _connected=False
160 _connected=False
131 _engines=None
161 _engines=None
132 _addr='tcp://127.0.0.1:10101'
162 _addr='tcp://127.0.0.1:10101'
133 _registration_socket=None
163 _registration_socket=None
134 _query_socket=None
164 _query_socket=None
135 _control_socket=None
165 _control_socket=None
136 _notification_socket=None
166 _notification_socket=None
137 _mux_socket=None
167 _mux_socket=None
138 _task_socket=None
168 _task_socket=None
139 block = False
169 block = False
140 outstanding=None
170 outstanding=None
141 results = None
171 results = None
142 history = None
172 history = None
143 debug = False
173 debug = False
144
174
145 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False):
175 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False):
146 if context is None:
176 if context is None:
147 context = zmq.Context()
177 context = zmq.Context()
148 self.context = context
178 self.context = context
149 self._addr = addr
179 self._addr = addr
150 if username is None:
180 if username is None:
151 self.session = ss.StreamSession()
181 self.session = ss.StreamSession()
152 else:
182 else:
153 self.session = ss.StreamSession(username)
183 self.session = ss.StreamSession(username)
154 self._registration_socket = self.context.socket(zmq.PAIR)
184 self._registration_socket = self.context.socket(zmq.PAIR)
155 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
185 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
156 self._registration_socket.connect(addr)
186 self._registration_socket.connect(addr)
157 self._engines = {}
187 self._engines = {}
158 self._ids = set()
188 self._ids = set()
159 self.outstanding=set()
189 self.outstanding=set()
160 self.results = {}
190 self.results = {}
161 self.history = []
191 self.history = []
162 self.debug = debug
192 self.debug = debug
163 self.session.debug = debug
193 self.session.debug = debug
164
194
165 self._notification_handlers = {'registration_notification' : self._register_engine,
195 self._notification_handlers = {'registration_notification' : self._register_engine,
166 'unregistration_notification' : self._unregister_engine,
196 'unregistration_notification' : self._unregister_engine,
167 }
197 }
168 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
198 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
169 'apply_reply' : self._handle_apply_reply}
199 'apply_reply' : self._handle_apply_reply}
170 self._connect()
200 self._connect()
171
201
172
202
173 @property
203 @property
174 def ids(self):
204 def ids(self):
175 """Always up to date ids property."""
205 """Always up to date ids property."""
176 self._flush_notifications()
206 self._flush_notifications()
177 return self._ids
207 return self._ids
178
208
179 def _update_engines(self, engines):
209 def _update_engines(self, engines):
180 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
210 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
181 for k,v in engines.iteritems():
211 for k,v in engines.iteritems():
182 eid = int(k)
212 eid = int(k)
183 self._engines[eid] = bytes(v) # force not unicode
213 self._engines[eid] = bytes(v) # force not unicode
184 self._ids.add(eid)
214 self._ids.add(eid)
185
215
186 def _build_targets(self, targets):
216 def _build_targets(self, targets):
187 """Turn valid target IDs or 'all' into two lists:
217 """Turn valid target IDs or 'all' into two lists:
188 (int_ids, uuids).
218 (int_ids, uuids).
189 """
219 """
190 if targets is None:
220 if targets is None:
191 targets = self._ids
221 targets = self._ids
192 elif isinstance(targets, str):
222 elif isinstance(targets, str):
193 if targets.lower() == 'all':
223 if targets.lower() == 'all':
194 targets = self._ids
224 targets = self._ids
195 else:
225 else:
196 raise TypeError("%r not valid str target, must be 'all'"%(targets))
226 raise TypeError("%r not valid str target, must be 'all'"%(targets))
197 elif isinstance(targets, int):
227 elif isinstance(targets, int):
198 targets = [targets]
228 targets = [targets]
199 return [self._engines[t] for t in targets], list(targets)
229 return [self._engines[t] for t in targets], list(targets)
200
230
201 def _connect(self):
231 def _connect(self):
202 """setup all our socket connections to the controller. This is called from
232 """setup all our socket connections to the controller. This is called from
203 __init__."""
233 __init__."""
204 if self._connected:
234 if self._connected:
205 return
235 return
206 self._connected=True
236 self._connected=True
207 self.session.send(self._registration_socket, 'connection_request')
237 self.session.send(self._registration_socket, 'connection_request')
208 idents,msg = self.session.recv(self._registration_socket,mode=0)
238 idents,msg = self.session.recv(self._registration_socket,mode=0)
209 if self.debug:
239 if self.debug:
210 pprint(msg)
240 pprint(msg)
211 msg = ss.Message(msg)
241 msg = ss.Message(msg)
212 content = msg.content
242 content = msg.content
213 if content.status == 'ok':
243 if content.status == 'ok':
214 if content.queue:
244 if content.queue:
215 self._mux_socket = self.context.socket(zmq.PAIR)
245 self._mux_socket = self.context.socket(zmq.PAIR)
216 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
246 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
217 self._mux_socket.connect(content.queue)
247 self._mux_socket.connect(content.queue)
218 if content.task:
248 if content.task:
219 self._task_socket = self.context.socket(zmq.PAIR)
249 self._task_socket = self.context.socket(zmq.PAIR)
220 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
250 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
221 self._task_socket.connect(content.task)
251 self._task_socket.connect(content.task)
222 if content.notification:
252 if content.notification:
223 self._notification_socket = self.context.socket(zmq.SUB)
253 self._notification_socket = self.context.socket(zmq.SUB)
224 self._notification_socket.connect(content.notification)
254 self._notification_socket.connect(content.notification)
225 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
255 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
226 if content.query:
256 if content.query:
227 self._query_socket = self.context.socket(zmq.PAIR)
257 self._query_socket = self.context.socket(zmq.PAIR)
228 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
258 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
229 self._query_socket.connect(content.query)
259 self._query_socket.connect(content.query)
230 if content.control:
260 if content.control:
231 self._control_socket = self.context.socket(zmq.PAIR)
261 self._control_socket = self.context.socket(zmq.PAIR)
232 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
262 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
233 self._control_socket.connect(content.control)
263 self._control_socket.connect(content.control)
234 self._update_engines(dict(content.engines))
264 self._update_engines(dict(content.engines))
235
265
236 else:
266 else:
237 self._connected = False
267 self._connected = False
238 raise Exception("Failed to connect!")
268 raise Exception("Failed to connect!")
239
269
240 #--------------------------------------------------------------------------
270 #--------------------------------------------------------------------------
241 # handlers and callbacks for incoming messages
271 # handlers and callbacks for incoming messages
242 #--------------------------------------------------------------------------
272 #--------------------------------------------------------------------------
243
273
244 def _register_engine(self, msg):
274 def _register_engine(self, msg):
245 """Register a new engine, and update our connection info."""
275 """Register a new engine, and update our connection info."""
246 content = msg['content']
276 content = msg['content']
247 eid = content['id']
277 eid = content['id']
248 d = {eid : content['queue']}
278 d = {eid : content['queue']}
249 self._update_engines(d)
279 self._update_engines(d)
250 self._ids.add(int(eid))
280 self._ids.add(int(eid))
251
281
252 def _unregister_engine(self, msg):
282 def _unregister_engine(self, msg):
253 """Unregister an engine that has died."""
283 """Unregister an engine that has died."""
254 content = msg['content']
284 content = msg['content']
255 eid = int(content['id'])
285 eid = int(content['id'])
256 if eid in self._ids:
286 if eid in self._ids:
257 self._ids.remove(eid)
287 self._ids.remove(eid)
258 self._engines.pop(eid)
288 self._engines.pop(eid)
259
289
260 def _handle_execute_reply(self, msg):
290 def _handle_execute_reply(self, msg):
261 """Save the reply to an execute_request into our results."""
291 """Save the reply to an execute_request into our results."""
262 parent = msg['parent_header']
292 parent = msg['parent_header']
263 msg_id = parent['msg_id']
293 msg_id = parent['msg_id']
264 if msg_id not in self.outstanding:
294 if msg_id not in self.outstanding:
265 print("got unknown result: %s"%msg_id)
295 print("got unknown result: %s"%msg_id)
266 else:
296 else:
267 self.outstanding.remove(msg_id)
297 self.outstanding.remove(msg_id)
268 self.results[msg_id] = ss.unwrap_exception(msg['content'])
298 self.results[msg_id] = ss.unwrap_exception(msg['content'])
269
299
270 def _handle_apply_reply(self, msg):
300 def _handle_apply_reply(self, msg):
271 """Save the reply to an apply_request into our results."""
301 """Save the reply to an apply_request into our results."""
272 parent = msg['parent_header']
302 parent = msg['parent_header']
273 msg_id = parent['msg_id']
303 msg_id = parent['msg_id']
274 if msg_id not in self.outstanding:
304 if msg_id not in self.outstanding:
275 print ("got unknown result: %s"%msg_id)
305 print ("got unknown result: %s"%msg_id)
276 else:
306 else:
277 self.outstanding.remove(msg_id)
307 self.outstanding.remove(msg_id)
278 content = msg['content']
308 content = msg['content']
279 if content['status'] == 'ok':
309 if content['status'] == 'ok':
280 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
310 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
281 elif content['status'] == 'aborted':
311 elif content['status'] == 'aborted':
282 self.results[msg_id] = AbortedTask(msg_id)
312 self.results[msg_id] = AbortedTask(msg_id)
283 elif content['status'] == 'resubmitted':
313 elif content['status'] == 'resubmitted':
284 pass # handle resubmission
314 # TODO: handle resubmission
315 pass
285 else:
316 else:
286 self.results[msg_id] = ss.unwrap_exception(content)
317 self.results[msg_id] = ss.unwrap_exception(content)
287
318
288 def _flush_notifications(self):
319 def _flush_notifications(self):
289 """Flush notifications of engine registrations waiting
320 """Flush notifications of engine registrations waiting
290 in ZMQ queue."""
321 in ZMQ queue."""
291 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
322 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
292 while msg is not None:
323 while msg is not None:
293 if self.debug:
324 if self.debug:
294 pprint(msg)
325 pprint(msg)
295 msg = msg[-1]
326 msg = msg[-1]
296 msg_type = msg['msg_type']
327 msg_type = msg['msg_type']
297 handler = self._notification_handlers.get(msg_type, None)
328 handler = self._notification_handlers.get(msg_type, None)
298 if handler is None:
329 if handler is None:
299 raise Exception("Unhandled message type: %s"%msg.msg_type)
330 raise Exception("Unhandled message type: %s"%msg.msg_type)
300 else:
331 else:
301 handler(msg)
332 handler(msg)
302 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
333 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
303
334
304 def _flush_results(self, sock):
335 def _flush_results(self, sock):
305 """Flush task or queue results waiting in ZMQ queue."""
336 """Flush task or queue results waiting in ZMQ queue."""
306 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
337 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
307 while msg is not None:
338 while msg is not None:
308 if self.debug:
339 if self.debug:
309 pprint(msg)
340 pprint(msg)
310 msg = msg[-1]
341 msg = msg[-1]
311 msg_type = msg['msg_type']
342 msg_type = msg['msg_type']
312 handler = self._queue_handlers.get(msg_type, None)
343 handler = self._queue_handlers.get(msg_type, None)
313 if handler is None:
344 if handler is None:
314 raise Exception("Unhandled message type: %s"%msg.msg_type)
345 raise Exception("Unhandled message type: %s"%msg.msg_type)
315 else:
346 else:
316 handler(msg)
347 handler(msg)
317 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
348 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
318
349
319 def _flush_control(self, sock):
350 def _flush_control(self, sock):
320 """Flush replies from the control channel waiting
351 """Flush replies from the control channel waiting
321 in the ZMQ queue."""
352 in the ZMQ queue.
353
354 Currently: ignore them."""
322 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
355 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
323 while msg is not None:
356 while msg is not None:
324 if self.debug:
357 if self.debug:
325 pprint(msg)
358 pprint(msg)
326 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
359 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
327
360
328 #--------------------------------------------------------------------------
361 #--------------------------------------------------------------------------
329 # getitem
362 # getitem
330 #--------------------------------------------------------------------------
363 #--------------------------------------------------------------------------
331
364
332 def __getitem__(self, key):
365 def __getitem__(self, key):
333 """Dict access returns DirectView multiplexer objects."""
366 """Dict access returns DirectView multiplexer objects or,
367 if key is None, a LoadBalancedView."""
368 if key is None:
369 return LoadBalancedView(self)
334 if isinstance(key, int):
370 if isinstance(key, int):
335 if key not in self.ids:
371 if key not in self.ids:
336 raise IndexError("No such engine: %i"%key)
372 raise IndexError("No such engine: %i"%key)
337 return DirectView(self, key)
373 return DirectView(self, key)
338
374
339 if isinstance(key, slice):
375 if isinstance(key, slice):
340 indices = range(len(self.ids))[key]
376 indices = range(len(self.ids))[key]
341 ids = sorted(self._ids)
377 ids = sorted(self._ids)
342 key = [ ids[i] for i in indices ]
378 key = [ ids[i] for i in indices ]
343 # newkeys = sorted(self._ids)[thekeys[k]]
379 # newkeys = sorted(self._ids)[thekeys[k]]
344
380
345 if isinstance(key, (tuple, list, xrange)):
381 if isinstance(key, (tuple, list, xrange)):
346 _,targets = self._build_targets(list(key))
382 _,targets = self._build_targets(list(key))
347 return DirectView(self, targets)
383 return DirectView(self, targets)
348 else:
384 else:
349 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
385 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
350
386
351 #--------------------------------------------------------------------------
387 #--------------------------------------------------------------------------
352 # Begin public methods
388 # Begin public methods
353 #--------------------------------------------------------------------------
389 #--------------------------------------------------------------------------
354
390
355 def spin(self):
391 def spin(self):
356 """Flush any registration notifications and execution results
392 """Flush any registration notifications and execution results
357 waiting in the ZMQ queue.
393 waiting in the ZMQ queue.
358 """
394 """
359 if self._notification_socket:
395 if self._notification_socket:
360 self._flush_notifications()
396 self._flush_notifications()
361 if self._mux_socket:
397 if self._mux_socket:
362 self._flush_results(self._mux_socket)
398 self._flush_results(self._mux_socket)
363 if self._task_socket:
399 if self._task_socket:
364 self._flush_results(self._task_socket)
400 self._flush_results(self._task_socket)
365 if self._control_socket:
401 if self._control_socket:
366 self._flush_control(self._control_socket)
402 self._flush_control(self._control_socket)
367
403
368 def barrier(self, msg_ids=None, timeout=-1):
404 def barrier(self, msg_ids=None, timeout=-1):
369 """waits on one or more `msg_ids`, for up to `timeout` seconds.
405 """waits on one or more `msg_ids`, for up to `timeout` seconds.
370
406
371 Parameters
407 Parameters
372 ----------
408 ----------
373 msg_ids : int, str, or list of ints and/or strs
409 msg_ids : int, str, or list of ints and/or strs
374 ints are indices to self.history
410 ints are indices to self.history
375 strs are msg_ids
411 strs are msg_ids
376 default: wait on all outstanding messages
412 default: wait on all outstanding messages
377 timeout : float
413 timeout : float
378 a time in seconds, after which to give up.
414 a time in seconds, after which to give up.
379 default is -1, which means no timeout
415 default is -1, which means no timeout
380
416
381 Returns
417 Returns
382 -------
418 -------
383 True : when all msg_ids are done
419 True : when all msg_ids are done
384 False : timeout reached, some msg_ids still outstanding
420 False : timeout reached, some msg_ids still outstanding
385 """
421 """
386 tic = time.time()
422 tic = time.time()
387 if msg_ids is None:
423 if msg_ids is None:
388 theids = self.outstanding
424 theids = self.outstanding
389 else:
425 else:
390 if isinstance(msg_ids, (int, str)):
426 if isinstance(msg_ids, (int, str)):
391 msg_ids = [msg_ids]
427 msg_ids = [msg_ids]
392 theids = set()
428 theids = set()
393 for msg_id in msg_ids:
429 for msg_id in msg_ids:
394 if isinstance(msg_id, int):
430 if isinstance(msg_id, int):
395 msg_id = self.history[msg_id]
431 msg_id = self.history[msg_id]
396 theids.add(msg_id)
432 theids.add(msg_id)
397 self.spin()
433 self.spin()
398 while theids.intersection(self.outstanding):
434 while theids.intersection(self.outstanding):
399 if timeout >= 0 and ( time.time()-tic ) > timeout:
435 if timeout >= 0 and ( time.time()-tic ) > timeout:
400 break
436 break
401 time.sleep(1e-3)
437 time.sleep(1e-3)
402 self.spin()
438 self.spin()
403 return len(theids.intersection(self.outstanding)) == 0
439 return len(theids.intersection(self.outstanding)) == 0
404
440
405 #--------------------------------------------------------------------------
441 #--------------------------------------------------------------------------
406 # Control methods
442 # Control methods
407 #--------------------------------------------------------------------------
443 #--------------------------------------------------------------------------
408
444
409 @spinfirst
445 @spinfirst
410 @defaultblock
446 @defaultblock
411 def clear(self, targets=None, block=None):
447 def clear(self, targets=None, block=None):
412 """Clear the namespace in target(s)."""
448 """Clear the namespace in target(s)."""
413 targets = self._build_targets(targets)[0]
449 targets = self._build_targets(targets)[0]
414 for t in targets:
450 for t in targets:
415 self.session.send(self._control_socket, 'clear_request', content={},ident=t)
451 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
416 error = False
452 error = False
417 if self.block:
453 if self.block:
418 for i in range(len(targets)):
454 for i in range(len(targets)):
419 idents,msg = self.session.recv(self._control_socket,0)
455 idents,msg = self.session.recv(self._control_socket,0)
420 if self.debug:
456 if self.debug:
421 pprint(msg)
457 pprint(msg)
422 if msg['content']['status'] != 'ok':
458 if msg['content']['status'] != 'ok':
423 error = msg['content']
459 error = ss.unwrap_exception(msg['content'])
424 if error:
460 if error:
425 return error
461 return error
426
462
427
463
428 @spinfirst
464 @spinfirst
429 @defaultblock
465 @defaultblock
430 def abort(self, msg_ids = None, targets=None, block=None):
466 def abort(self, msg_ids = None, targets=None, block=None):
431 """Abort the execution queues of target(s)."""
467 """Abort the execution queues of target(s)."""
432 targets = self._build_targets(targets)[0]
468 targets = self._build_targets(targets)[0]
433 if isinstance(msg_ids, basestring):
469 if isinstance(msg_ids, basestring):
434 msg_ids = [msg_ids]
470 msg_ids = [msg_ids]
435 content = dict(msg_ids=msg_ids)
471 content = dict(msg_ids=msg_ids)
436 for t in targets:
472 for t in targets:
437 self.session.send(self._control_socket, 'abort_request',
473 self.session.send(self._control_socket, 'abort_request',
438 content=content, ident=t)
474 content=content, ident=t)
439 error = False
475 error = False
440 if self.block:
476 if self.block:
441 for i in range(len(targets)):
477 for i in range(len(targets)):
442 idents,msg = self.session.recv(self._control_socket,0)
478 idents,msg = self.session.recv(self._control_socket,0)
443 if self.debug:
479 if self.debug:
444 pprint(msg)
480 pprint(msg)
445 if msg['content']['status'] != 'ok':
481 if msg['content']['status'] != 'ok':
446 error = msg['content']
482 error = ss.unwrap_exception(msg['content'])
447 if error:
483 if error:
448 return error
484 return error
449
485
450 @spinfirst
486 @spinfirst
451 @defaultblock
487 @defaultblock
452 def kill(self, targets=None, block=None):
488 def kill(self, targets=None, block=None):
453 """Terminates one or more engine processes."""
489 """Terminates one or more engine processes."""
454 targets = self._build_targets(targets)[0]
490 targets = self._build_targets(targets)[0]
455 for t in targets:
491 for t in targets:
456 self.session.send(self._control_socket, 'kill_request', content={},ident=t)
492 self.session.send(self._control_socket, 'kill_request', content={},ident=t)
457 error = False
493 error = False
458 if self.block:
494 if self.block:
459 for i in range(len(targets)):
495 for i in range(len(targets)):
460 idents,msg = self.session.recv(self._control_socket,0)
496 idents,msg = self.session.recv(self._control_socket,0)
461 if self.debug:
497 if self.debug:
462 pprint(msg)
498 pprint(msg)
463 if msg['content']['status'] != 'ok':
499 if msg['content']['status'] != 'ok':
464 error = msg['content']
500 error = ss.unwrap_exception(msg['content'])
465 if error:
501 if error:
466 return error
502 return error
467
503
468 #--------------------------------------------------------------------------
504 #--------------------------------------------------------------------------
469 # Execution methods
505 # Execution methods
470 #--------------------------------------------------------------------------
506 #--------------------------------------------------------------------------
471
507
472 @defaultblock
508 @defaultblock
473 def execute(self, code, targets='all', block=None):
509 def execute(self, code, targets='all', block=None):
474 """Executes `code` on `targets` in blocking or nonblocking manner.
510 """Executes `code` on `targets` in blocking or nonblocking manner.
475
511
476 Parameters
512 Parameters
477 ----------
513 ----------
478 code : str
514 code : str
479 the code string to be executed
515 the code string to be executed
480 targets : int/str/list of ints/strs
516 targets : int/str/list of ints/strs
481 the engines on which to execute
517 the engines on which to execute
482 default : all
518 default : all
483 block : bool
519 block : bool
484 whether or not to wait until done to return
520 whether or not to wait until done to return
485 default: self.block
521 default: self.block
486 """
522 """
487 # block = self.block if block is None else block
523 # block = self.block if block is None else block
488 # saveblock = self.block
524 # saveblock = self.block
489 # self.block = block
525 # self.block = block
490 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
526 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
491 # self.block = saveblock
527 # self.block = saveblock
492 return result
528 return result
493
529
494 def run(self, code, block=None):
530 def run(self, code, block=None):
495 """Runs `code` on an engine.
531 """Runs `code` on an engine.
496
532
497 Calls to this are load-balanced.
533 Calls to this are load-balanced.
498
534
499 Parameters
535 Parameters
500 ----------
536 ----------
501 code : str
537 code : str
502 the code string to be executed
538 the code string to be executed
503 block : bool
539 block : bool
504 whether or not to wait until done
540 whether or not to wait until done
505
541
506 """
542 """
507 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
543 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
508 return result
544 return result
509
545
510 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
546 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
511 after=None, follow=None):
547 after=None, follow=None):
512 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
548 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
513
549
514 This is the central execution command for the client.
550 This is the central execution command for the client.
515
551
516 Parameters
552 Parameters
517 ----------
553 ----------
518
554
519 f : function
555 f : function
520 The fuction to be called remotely
556 The fuction to be called remotely
521 args : tuple/list
557 args : tuple/list
522 The positional arguments passed to `f`
558 The positional arguments passed to `f`
523 kwargs : dict
559 kwargs : dict
524 The keyword arguments passed to `f`
560 The keyword arguments passed to `f`
525 bound : bool (default: True)
561 bound : bool (default: True)
526 Whether to execute in the Engine(s) namespace, or in a clean
562 Whether to execute in the Engine(s) namespace, or in a clean
527 namespace not affecting the engine.
563 namespace not affecting the engine.
528 block : bool (default: self.block)
564 block : bool (default: self.block)
529 Whether to wait for the result, or return immediately.
565 Whether to wait for the result, or return immediately.
530 False:
566 False:
531 returns msg_id(s)
567 returns msg_id(s)
532 if multiple targets:
568 if multiple targets:
533 list of ids
569 list of ids
534 True:
570 True:
535 returns actual result(s) of f(*args, **kwargs)
571 returns actual result(s) of f(*args, **kwargs)
536 if multiple targets:
572 if multiple targets:
537 dict of results, by engine ID
573 dict of results, by engine ID
538 targets : int,list of ints, 'all', None
574 targets : int,list of ints, 'all', None
539 Specify the destination of the job.
575 Specify the destination of the job.
540 if None:
576 if None:
541 Submit via Task queue for load-balancing.
577 Submit via Task queue for load-balancing.
542 if 'all':
578 if 'all':
543 Run on all active engines
579 Run on all active engines
544 if list:
580 if list:
545 Run on each specified engine
581 Run on each specified engine
546 if int:
582 if int:
547 Run on single engine
583 Run on single engine
548
584
549 after : Dependency or collection of msg_ids
585 after : Dependency or collection of msg_ids
550 Only for load-balanced execution (targets=None)
586 Only for load-balanced execution (targets=None)
551 Specify a list of msg_ids as a time-based dependency.
587 Specify a list of msg_ids as a time-based dependency.
552 This job will only be run *after* the dependencies
588 This job will only be run *after* the dependencies
553 have been met.
589 have been met.
554
590
555 follow : Dependency or collection of msg_ids
591 follow : Dependency or collection of msg_ids
556 Only for load-balanced execution (targets=None)
592 Only for load-balanced execution (targets=None)
557 Specify a list of msg_ids as a location-based dependency.
593 Specify a list of msg_ids as a location-based dependency.
558 This job will only be run on an engine where this dependency
594 This job will only be run on an engine where this dependency
559 is met.
595 is met.
560
596
561 Returns
597 Returns
562 -------
598 -------
563 if block is False:
599 if block is False:
564 if single target:
600 if single target:
565 return msg_id
601 return msg_id
566 else:
602 else:
567 return list of msg_ids
603 return list of msg_ids
568 ? (should this be dict like block=True) ?
604 ? (should this be dict like block=True) ?
569 else:
605 else:
570 if single target:
606 if single target:
571 return result of f(*args, **kwargs)
607 return result of f(*args, **kwargs)
572 else:
608 else:
573 return dict of results, keyed by engine
609 return dict of results, keyed by engine
574 """
610 """
575
611
576 # defaults:
612 # defaults:
577 block = block if block is not None else self.block
613 block = block if block is not None else self.block
578 args = args if args is not None else []
614 args = args if args is not None else []
579 kwargs = kwargs if kwargs is not None else {}
615 kwargs = kwargs if kwargs is not None else {}
580
616
581 # enforce types of f,args,kwrags
617 # enforce types of f,args,kwrags
582 if not callable(f):
618 if not callable(f):
583 raise TypeError("f must be callable, not %s"%type(f))
619 raise TypeError("f must be callable, not %s"%type(f))
584 if not isinstance(args, (tuple, list)):
620 if not isinstance(args, (tuple, list)):
585 raise TypeError("args must be tuple or list, not %s"%type(args))
621 raise TypeError("args must be tuple or list, not %s"%type(args))
586 if not isinstance(kwargs, dict):
622 if not isinstance(kwargs, dict):
587 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
623 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
588
624
589 options = dict(bound=bound, block=block, after=after, follow=follow)
625 options = dict(bound=bound, block=block, after=after, follow=follow)
590
626
591 if targets is None:
627 if targets is None:
592 return self._apply_balanced(f, args, kwargs, **options)
628 return self._apply_balanced(f, args, kwargs, **options)
593 else:
629 else:
594 return self._apply_direct(f, args, kwargs, targets=targets, **options)
630 return self._apply_direct(f, args, kwargs, targets=targets, **options)
595
631
596 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
632 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
597 after=None, follow=None):
633 after=None, follow=None):
598 """The underlying method for applying functions in a load balanced
634 """The underlying method for applying functions in a load balanced
599 manner, via the task queue."""
635 manner, via the task queue."""
600 if isinstance(after, Dependency):
636 if isinstance(after, Dependency):
601 after = after.as_dict()
637 after = after.as_dict()
602 elif after is None:
638 elif after is None:
603 after = []
639 after = []
604 if isinstance(follow, Dependency):
640 if isinstance(follow, Dependency):
605 follow = follow.as_dict()
641 follow = follow.as_dict()
606 elif follow is None:
642 elif follow is None:
607 follow = []
643 follow = []
608 subheader = dict(after=after, follow=follow)
644 subheader = dict(after=after, follow=follow)
609
645
610 bufs = ss.pack_apply_message(f,args,kwargs)
646 bufs = ss.pack_apply_message(f,args,kwargs)
611 content = dict(bound=bound)
647 content = dict(bound=bound)
612 msg = self.session.send(self._task_socket, "apply_request",
648 msg = self.session.send(self._task_socket, "apply_request",
613 content=content, buffers=bufs, subheader=subheader)
649 content=content, buffers=bufs, subheader=subheader)
614 msg_id = msg['msg_id']
650 msg_id = msg['msg_id']
615 self.outstanding.add(msg_id)
651 self.outstanding.add(msg_id)
616 self.history.append(msg_id)
652 self.history.append(msg_id)
617 if block:
653 if block:
618 self.barrier(msg_id)
654 self.barrier(msg_id)
619 return self.results[msg_id]
655 return self.results[msg_id]
620 else:
656 else:
621 return msg_id
657 return msg_id
622
658
623 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
659 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
624 after=None, follow=None):
660 after=None, follow=None):
625 """Then underlying method for applying functions to specific engines
661 """Then underlying method for applying functions to specific engines
626 via the MUX queue."""
662 via the MUX queue."""
627
663
628 queues,targets = self._build_targets(targets)
664 queues,targets = self._build_targets(targets)
629 bufs = ss.pack_apply_message(f,args,kwargs)
665 bufs = ss.pack_apply_message(f,args,kwargs)
630 if isinstance(after, Dependency):
666 if isinstance(after, Dependency):
631 after = after.as_dict()
667 after = after.as_dict()
632 elif after is None:
668 elif after is None:
633 after = []
669 after = []
634 if isinstance(follow, Dependency):
670 if isinstance(follow, Dependency):
635 follow = follow.as_dict()
671 follow = follow.as_dict()
636 elif follow is None:
672 elif follow is None:
637 follow = []
673 follow = []
638 subheader = dict(after=after, follow=follow)
674 subheader = dict(after=after, follow=follow)
639 content = dict(bound=bound)
675 content = dict(bound=bound)
640 msg_ids = []
676 msg_ids = []
641 for queue in queues:
677 for queue in queues:
642 msg = self.session.send(self._mux_socket, "apply_request",
678 msg = self.session.send(self._mux_socket, "apply_request",
643 content=content, buffers=bufs,ident=queue, subheader=subheader)
679 content=content, buffers=bufs,ident=queue, subheader=subheader)
644 msg_id = msg['msg_id']
680 msg_id = msg['msg_id']
645 self.outstanding.add(msg_id)
681 self.outstanding.add(msg_id)
646 self.history.append(msg_id)
682 self.history.append(msg_id)
647 msg_ids.append(msg_id)
683 msg_ids.append(msg_id)
648 if block:
684 if block:
649 self.barrier(msg_ids)
685 self.barrier(msg_ids)
650 else:
686 else:
651 if len(msg_ids) == 1:
687 if len(msg_ids) == 1:
652 return msg_ids[0]
688 return msg_ids[0]
653 else:
689 else:
654 return msg_ids
690 return msg_ids
655 if len(msg_ids) == 1:
691 if len(msg_ids) == 1:
656 return self.results[msg_ids[0]]
692 return self.results[msg_ids[0]]
657 else:
693 else:
658 result = {}
694 result = {}
659 for target,mid in zip(targets, msg_ids):
695 for target,mid in zip(targets, msg_ids):
660 result[target] = self.results[mid]
696 result[target] = self.results[mid]
661 return result
697 return result
662
698
663 #--------------------------------------------------------------------------
699 #--------------------------------------------------------------------------
664 # Data movement
700 # Data movement
665 #--------------------------------------------------------------------------
701 #--------------------------------------------------------------------------
666
702
667 @defaultblock
703 @defaultblock
668 def push(self, ns, targets=None, block=None):
704 def push(self, ns, targets=None, block=None):
669 """Push the contents of `ns` into the namespace on `target`"""
705 """Push the contents of `ns` into the namespace on `target`"""
670 if not isinstance(ns, dict):
706 if not isinstance(ns, dict):
671 raise TypeError("Must be a dict, not %s"%type(ns))
707 raise TypeError("Must be a dict, not %s"%type(ns))
672 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
708 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
673 return result
709 return result
674
710
675 @defaultblock
711 @defaultblock
676 def pull(self, keys, targets=None, block=True):
712 def pull(self, keys, targets=None, block=True):
677 """Pull objects from `target`'s namespace by `keys`"""
713 """Pull objects from `target`'s namespace by `keys`"""
678 if isinstance(keys, str):
714 if isinstance(keys, str):
679 pass
715 pass
680 elif isistance(keys, (list,tuple,set)):
716 elif isistance(keys, (list,tuple,set)):
681 for key in keys:
717 for key in keys:
682 if not isinstance(key, str):
718 if not isinstance(key, str):
683 raise TypeError
719 raise TypeError
684 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
720 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
685 return result
721 return result
686
722
687 #--------------------------------------------------------------------------
723 #--------------------------------------------------------------------------
688 # Query methods
724 # Query methods
689 #--------------------------------------------------------------------------
725 #--------------------------------------------------------------------------
690
726
691 @spinfirst
727 @spinfirst
692 def get_results(self, msg_ids, status_only=False):
728 def get_results(self, msg_ids, status_only=False):
693 """Returns the result of the execute or task request with `msg_ids`.
729 """Returns the result of the execute or task request with `msg_ids`.
694
730
695 Parameters
731 Parameters
696 ----------
732 ----------
697 msg_ids : list of ints or msg_ids
733 msg_ids : list of ints or msg_ids
698 if int:
734 if int:
699 Passed as index to self.history for convenience.
735 Passed as index to self.history for convenience.
700 status_only : bool (default: False)
736 status_only : bool (default: False)
701 if False:
737 if False:
702 return the actual results
738 return the actual results
703 """
739 """
704 if not isinstance(msg_ids, (list,tuple)):
740 if not isinstance(msg_ids, (list,tuple)):
705 msg_ids = [msg_ids]
741 msg_ids = [msg_ids]
706 theids = []
742 theids = []
707 for msg_id in msg_ids:
743 for msg_id in msg_ids:
708 if isinstance(msg_id, int):
744 if isinstance(msg_id, int):
709 msg_id = self.history[msg_id]
745 msg_id = self.history[msg_id]
710 if not isinstance(msg_id, str):
746 if not isinstance(msg_id, str):
711 raise TypeError("msg_ids must be str, not %r"%msg_id)
747 raise TypeError("msg_ids must be str, not %r"%msg_id)
712 theids.append(msg_id)
748 theids.append(msg_id)
713
749
714 completed = []
750 completed = []
715 local_results = {}
751 local_results = {}
716 for msg_id in list(theids):
752 for msg_id in list(theids):
717 if msg_id in self.results:
753 if msg_id in self.results:
718 completed.append(msg_id)
754 completed.append(msg_id)
719 local_results[msg_id] = self.results[msg_id]
755 local_results[msg_id] = self.results[msg_id]
720 theids.remove(msg_id)
756 theids.remove(msg_id)
721
757
722 if msg_ids: # some not locally cached
758 if theids: # some not locally cached
723 content = dict(msg_ids=theids, status_only=status_only)
759 content = dict(msg_ids=theids, status_only=status_only)
724 msg = self.session.send(self._query_socket, "result_request", content=content)
760 msg = self.session.send(self._query_socket, "result_request", content=content)
725 zmq.select([self._query_socket], [], [])
761 zmq.select([self._query_socket], [], [])
726 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
762 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
727 if self.debug:
763 if self.debug:
728 pprint(msg)
764 pprint(msg)
729 content = msg['content']
765 content = msg['content']
730 if content['status'] != 'ok':
766 if content['status'] != 'ok':
731 raise ss.unwrap_exception(content)
767 raise ss.unwrap_exception(content)
732 else:
768 else:
733 content = dict(completed=[],pending=[])
769 content = dict(completed=[],pending=[])
734 if not status_only:
770 if not status_only:
735 # load cached results into result:
771 # load cached results into result:
736 content['completed'].extend(completed)
772 content['completed'].extend(completed)
737 content.update(local_results)
773 content.update(local_results)
738 # update cache with results:
774 # update cache with results:
739 for msg_id in msg_ids:
775 for msg_id in msg_ids:
740 if msg_id in content['completed']:
776 if msg_id in content['completed']:
741 self.results[msg_id] = content[msg_id]
777 self.results[msg_id] = content[msg_id]
742 return content
778 return content
743
779
744 @spinfirst
780 @spinfirst
745 def queue_status(self, targets=None, verbose=False):
781 def queue_status(self, targets=None, verbose=False):
746 """Fetch the status of engine queues.
782 """Fetch the status of engine queues.
747
783
748 Parameters
784 Parameters
749 ----------
785 ----------
750 targets : int/str/list of ints/strs
786 targets : int/str/list of ints/strs
751 the engines on which to execute
787 the engines on which to execute
752 default : all
788 default : all
753 verbose : bool
789 verbose : bool
754 whether to return lengths only, or lists of ids for each element
790 whether to return lengths only, or lists of ids for each element
755 """
791 """
756 targets = self._build_targets(targets)[1]
792 targets = self._build_targets(targets)[1]
757 content = dict(targets=targets, verbose=verbose)
793 content = dict(targets=targets, verbose=verbose)
758 self.session.send(self._query_socket, "queue_request", content=content)
794 self.session.send(self._query_socket, "queue_request", content=content)
759 idents,msg = self.session.recv(self._query_socket, 0)
795 idents,msg = self.session.recv(self._query_socket, 0)
760 if self.debug:
796 if self.debug:
761 pprint(msg)
797 pprint(msg)
762 content = msg['content']
798 content = msg['content']
763 status = content.pop('status')
799 status = content.pop('status')
764 if status != 'ok':
800 if status != 'ok':
765 raise ss.unwrap_exception(content)
801 raise ss.unwrap_exception(content)
766 return content
802 return content
767
803
768 @spinfirst
804 @spinfirst
769 def purge_results(self, msg_ids=[], targets=[]):
805 def purge_results(self, msg_ids=[], targets=[]):
770 """Tell the controller to forget results.
806 """Tell the controller to forget results.
771
807
772 Individual results can be purged by msg_id, or the entire
808 Individual results can be purged by msg_id, or the entire
773 history of specific targets can
809 history of specific targets can
774
810
775 Parameters
811 Parameters
776 ----------
812 ----------
777 targets : int/str/list of ints/strs
813 targets : int/str/list of ints/strs
778 the targets
814 the targets
779 default : None
815 default : None
780 """
816 """
781 if not targets and not msg_ids:
817 if not targets and not msg_ids:
782 raise ValueError
818 raise ValueError
783 if targets:
819 if targets:
784 targets = self._build_targets(targets)[1]
820 targets = self._build_targets(targets)[1]
785 content = dict(targets=targets, msg_ids=msg_ids)
821 content = dict(targets=targets, msg_ids=msg_ids)
786 self.session.send(self._query_socket, "purge_request", content=content)
822 self.session.send(self._query_socket, "purge_request", content=content)
787 idents, msg = self.session.recv(self._query_socket, 0)
823 idents, msg = self.session.recv(self._query_socket, 0)
788 if self.debug:
824 if self.debug:
789 pprint(msg)
825 pprint(msg)
790 content = msg['content']
826 content = msg['content']
791 if content['status'] != 'ok':
827 if content['status'] != 'ok':
792 raise ss.unwrap_exception(content)
828 raise ss.unwrap_exception(content)
793
829
794 class AsynClient(Client):
830 class AsynClient(Client):
795 """An Asynchronous client, using the Tornado Event Loop.
831 """An Asynchronous client, using the Tornado Event Loop.
796 !!!unfinished!!!"""
832 !!!unfinished!!!"""
797 io_loop = None
833 io_loop = None
798 _queue_stream = None
834 _queue_stream = None
799 _notifier_stream = None
835 _notifier_stream = None
800 _task_stream = None
836 _task_stream = None
801 _control_stream = None
837 _control_stream = None
802
838
803 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
839 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
804 Client.__init__(self, addr, context, username, debug)
840 Client.__init__(self, addr, context, username, debug)
805 if io_loop is None:
841 if io_loop is None:
806 io_loop = ioloop.IOLoop.instance()
842 io_loop = ioloop.IOLoop.instance()
807 self.io_loop = io_loop
843 self.io_loop = io_loop
808
844
809 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
845 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
810 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
846 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
811 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
847 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
812 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
848 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
813
849
814 def spin(self):
850 def spin(self):
815 for stream in (self.queue_stream, self.notifier_stream,
851 for stream in (self.queue_stream, self.notifier_stream,
816 self.task_stream, self.control_stream):
852 self.task_stream, self.control_stream):
817 stream.flush()
853 stream.flush()
818 No newline at end of file
854
@@ -1,200 +1,220
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Views"""
2 """Views"""
3
3
4 from IPython.external.decorator import decorator
4 from IPython.external.decorator import decorator
5
5
6
6
7 @decorator
7 @decorator
8 def myblock(f, self, *args, **kwargs):
8 def myblock(f, self, *args, **kwargs):
9 block = self.client.block
9 block = self.client.block
10 self.client.block = self.block
10 self.client.block = self.block
11 ret = f(self, *args, **kwargs)
11 ret = f(self, *args, **kwargs)
12 self.client.block = block
12 self.client.block = block
13 return ret
13 return ret
14
14
15 @decorator
15 @decorator
16 def save_ids(f, self, *args, **kwargs):
16 def save_ids(f, self, *args, **kwargs):
17 ret = f(self, *args, **kwargs)
17 ret = f(self, *args, **kwargs)
18 msg_ids = self.client.history[-self._ntargets:]
18 msg_ids = self.client.history[-self._ntargets:]
19 self.history.extend(msg_ids)
19 self.history.extend(msg_ids)
20 map(self.outstanding.add, msg_ids)
20 map(self.outstanding.add, msg_ids)
21 return ret
21 return ret
22
22
23 @decorator
23 @decorator
24 def sync_results(f, self, *args, **kwargs):
24 def sync_results(f, self, *args, **kwargs):
25 ret = f(self, *args, **kwargs)
25 ret = f(self, *args, **kwargs)
26 delta = self.outstanding.difference(self.client.outstanding)
26 delta = self.outstanding.difference(self.client.outstanding)
27 completed = self.outstanding.intersection(delta)
27 completed = self.outstanding.intersection(delta)
28 self.outstanding = self.outstanding.difference(completed)
28 self.outstanding = self.outstanding.difference(completed)
29 for msg_id in completed:
29 for msg_id in completed:
30 self.results[msg_id] = self.client.results[msg_id]
30 self.results[msg_id] = self.client.results[msg_id]
31 return ret
31 return ret
32
32
33 @decorator
33 @decorator
34 def spin_after(f, self, *args, **kwargs):
34 def spin_after(f, self, *args, **kwargs):
35 ret = f(self, *args, **kwargs)
35 ret = f(self, *args, **kwargs)
36 self.spin()
36 self.spin()
37 return ret
37 return ret
38
38
39
39
40 class View(object):
40 class View(object):
41 """Base View class"""
41 """Base View class"""
42 _targets = None
42 _targets = None
43 _ntargets = None
43 _ntargets = None
44 block=None
44 block=None
45 bound=None
45 history=None
46 history=None
46
47
47 def __init__(self, client, targets):
48 def __init__(self, client, targets=None):
48 self.client = client
49 self.client = client
49 self._targets = targets
50 self._targets = targets
50 self._ntargets = 1 if isinstance(targets, int) else len(targets)
51 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
51 self.block = client.block
52 self.block = client.block
53 self.bound=True
52 self.history = []
54 self.history = []
53 self.outstanding = set()
55 self.outstanding = set()
54 self.results = {}
56 self.results = {}
55
57
56 def __repr__(self):
58 def __repr__(self):
57 strtargets = str(self._targets)
59 strtargets = str(self._targets)
58 if len(strtargets) > 16:
60 if len(strtargets) > 16:
59 strtargets = strtargets[:12]+'...]'
61 strtargets = strtargets[:12]+'...]'
60 return "<%s %s>"%(self.__class__.__name__, strtargets)
62 return "<%s %s>"%(self.__class__.__name__, strtargets)
61
63
62 @property
64 @property
63 def targets(self):
65 def targets(self):
64 return self._targets
66 return self._targets
65
67
66 @targets.setter
68 @targets.setter
67 def targets(self, value):
69 def targets(self, value):
68 raise TypeError("Cannot set my targets argument after construction!")
70 raise TypeError("Cannot set my targets argument after construction!")
69
71
70 @sync_results
72 @sync_results
71 def spin(self):
73 def spin(self):
72 """spin the client, and sync"""
74 """spin the client, and sync"""
73 self.client.spin()
75 self.client.spin()
74
76
75 @sync_results
77 @sync_results
76 @save_ids
78 @save_ids
77 def apply(self, f, *args, **kwargs):
79 def apply(self, f, *args, **kwargs):
78 """calls f(*args, **kwargs) on remote engines, returning the result.
80 """calls f(*args, **kwargs) on remote engines, returning the result.
79
81
80 This method does not involve the engine's namespace.
82 This method does not involve the engine's namespace.
81
83
82 if self.block is False:
84 if self.block is False:
83 returns msg_id
85 returns msg_id
84 else:
86 else:
85 returns actual result of f(*args, **kwargs)
87 returns actual result of f(*args, **kwargs)
86 """
88 """
87 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
89 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
88
90
89 @save_ids
91 @save_ids
90 def apply_async(self, f, *args, **kwargs):
92 def apply_async(self, f, *args, **kwargs):
91 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
93 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
92
94
93 This method does not involve the engine's namespace.
95 This method does not involve the engine's namespace.
94
96
95 returns msg_id
97 returns msg_id
96 """
98 """
97 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
99 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
98
100
99 @spin_after
101 @spin_after
100 @save_ids
102 @save_ids
101 def apply_sync(self, f, *args, **kwargs):
103 def apply_sync(self, f, *args, **kwargs):
102 """calls f(*args, **kwargs) on remote engines in a blocking manner,
104 """calls f(*args, **kwargs) on remote engines in a blocking manner,
103 returning the result.
105 returning the result.
104
106
105 This method does not involve the engine's namespace.
107 This method does not involve the engine's namespace.
106
108
107 returns: actual result of f(*args, **kwargs)
109 returns: actual result of f(*args, **kwargs)
108 """
110 """
109 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
111 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
110
112
111 @sync_results
113 @sync_results
112 @save_ids
114 @save_ids
113 def apply_bound(self, f, *args, **kwargs):
115 def apply_bound(self, f, *args, **kwargs):
114 """calls f(*args, **kwargs) bound to engine namespace(s).
116 """calls f(*args, **kwargs) bound to engine namespace(s).
115
117
116 if self.block is False:
118 if self.block is False:
117 returns msg_id
119 returns msg_id
118 else:
120 else:
119 returns actual result of f(*args, **kwargs)
121 returns actual result of f(*args, **kwargs)
120
122
121 This method has access to the targets' globals
123 This method has access to the targets' globals
122
124
123 """
125 """
124 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
126 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
125
127
126 @sync_results
128 @sync_results
127 @save_ids
129 @save_ids
128 def apply_async_bound(self, f, *args, **kwargs):
130 def apply_async_bound(self, f, *args, **kwargs):
129 """calls f(*args, **kwargs) bound to engine namespace(s)
131 """calls f(*args, **kwargs) bound to engine namespace(s)
130 in a nonblocking manner.
132 in a nonblocking manner.
131
133
132 returns: msg_id
134 returns: msg_id
133
135
134 This method has access to the targets' globals
136 This method has access to the targets' globals
135
137
136 """
138 """
137 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
139 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
138
140
139 @spin_after
141 @spin_after
140 @save_ids
142 @save_ids
141 def apply_sync_bound(self, f, *args, **kwargs):
143 def apply_sync_bound(self, f, *args, **kwargs):
142 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
144 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
143
145
144 returns: actual result of f(*args, **kwargs)
146 returns: actual result of f(*args, **kwargs)
145
147
146 This method has access to the targets' globals
148 This method has access to the targets' globals
147
149
148 """
150 """
149 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
151 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
150
152
153 def abort(self, msg_ids=None, block=None):
154 """Abort jobs on my engines.
155
156 Parameters
157 ----------
158
159 msg_ids : None, str, list of strs, optional
160 if None: abort all jobs.
161 else: abort specific msg_id(s).
162 """
163 block = block if block is not None else self.block
164 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
165
166 def queue_status(self, verbose=False):
167 """Fetch the Queue status of my engines"""
168 return self.client.queue_status(targets=self.targets, verbose=verbose)
169
170 def purge_results(self, msg_ids=[],targets=[]):
171 """Instruct the controller to forget specific results."""
172 if targets is None or targets == 'all':
173 targets = self.targets
174 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
175
151
176
152 class DirectView(View):
177 class DirectView(View):
153 """Direct Multiplexer View"""
178 """Direct Multiplexer View"""
154
179
155 def update(self, ns):
180 def update(self, ns):
156 """update remote namespace with dict `ns`"""
181 """update remote namespace with dict `ns`"""
157 return self.client.push(ns, targets=self.targets, block=self.block)
182 return self.client.push(ns, targets=self.targets, block=self.block)
158
183
184 push = update
185
159 def get(self, key_s):
186 def get(self, key_s):
160 """get object(s) by `key_s` from remote namespace
187 """get object(s) by `key_s` from remote namespace
161 will return one object if it is a key.
188 will return one object if it is a key.
162 It also takes a list of keys, and will return a list of objects."""
189 It also takes a list of keys, and will return a list of objects."""
163 # block = block if block is not None else self.block
190 # block = block if block is not None else self.block
164 return self.client.pull(key_s, block=self.block, targets=self.targets)
191 return self.client.pull(key_s, block=True, targets=self.targets)
165
192
166 push = update
193 def pull(self, key_s, block=True):
167 pull = get
194 """get object(s) by `key_s` from remote namespace
195 will return one object if it is a key.
196 It also takes a list of keys, and will return a list of objects."""
197 block = block if block is not None else self.block
198 return self.client.pull(key_s, block=block, targets=self.targets)
168
199
169 def __getitem__(self, key):
200 def __getitem__(self, key):
170 return self.get(key)
201 return self.get(key)
171
202
172 def __setitem__(self,key,value):
203 def __setitem__(self,key, value):
173 self.update({key:value})
204 self.update({key:value})
174
205
175 def clear(self, block=False):
206 def clear(self, block=False):
176 """Clear the remote namespaces on my engines."""
207 """Clear the remote namespaces on my engines."""
177 block = block if block is not None else self.block
208 block = block if block is not None else self.block
178 return self.client.clear(targets=self.targets,block=block)
209 return self.client.clear(targets=self.targets, block=block)
179
210
180 def kill(self, block=True):
211 def kill(self, block=True):
181 """Kill my engines."""
212 """Kill my engines."""
182 block = block if block is not None else self.block
213 block = block if block is not None else self.block
183 return self.client.kill(targets=self.targets,block=block)
214 return self.client.kill(targets=self.targets, block=block)
184
215
185 def abort(self, msg_ids=None, block=None):
186 """Abort jobs on my engines.
187
188 Parameters
189 ----------
190
191 msg_ids : None, str, list of strs, optional
192 if None: abort all jobs.
193 else: abort specific msg_id(s).
194 """
195 block = block if block is not None else self.block
196 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
197
198 class LoadBalancedView(View):
216 class LoadBalancedView(View):
199 _targets=None
217 def __repr__(self):
218 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
219
200 No newline at end of file
220
General Comments 0
You need to be logged in to leave comments. Login now