##// END OF EJS Templates
major cleanup of client code + purge_request implemented
MinRK -
Show More
This diff has been collapsed as it changes many lines, (562 lines changed) Show them Hide them
@@ -30,7 +30,7 b' def _push(ns):'
30
30
31 def _pull(keys):
31 def _pull(keys):
32 g = globals()
32 g = globals()
33 if isinstance(keys, (list,tuple)):
33 if isinstance(keys, (list,tuple, set)):
34 return map(g.get, keys)
34 return map(g.get, keys)
35 else:
35 else:
36 return g.get(keys)
36 return g.get(keys)
@@ -41,14 +41,19 b' def _clear():'
41 def execute(code):
41 def execute(code):
42 exec code in globals()
42 exec code in globals()
43
43
44 # decorators for methods:
44 #--------------------------------------------------------------------------
45 # Decorators for Client methods
46 #--------------------------------------------------------------------------
47
45 @decorator
48 @decorator
46 def spinfirst(f,self,*args,**kwargs):
49 def spinfirst(f, self, *args, **kwargs):
50 """Call spin() to sync state prior to calling the method."""
47 self.spin()
51 self.spin()
48 return f(self, *args, **kwargs)
52 return f(self, *args, **kwargs)
49
53
50 @decorator
54 @decorator
51 def defaultblock(f, self, *args, **kwargs):
55 def defaultblock(f, self, *args, **kwargs):
56 """Default to self.block; preserve self.block."""
52 block = kwargs.get('block',None)
57 block = kwargs.get('block',None)
53 block = self.block if block is None else block
58 block = self.block if block is None else block
54 saveblock = self.block
59 saveblock = self.block
@@ -57,45 +62,46 b' def defaultblock(f, self, *args, **kwargs):'
57 self.block = saveblock
62 self.block = saveblock
58 return ret
63 return ret
59
64
65 #--------------------------------------------------------------------------
66 # Classes
67 #--------------------------------------------------------------------------
68
69
60 class AbortedTask(object):
70 class AbortedTask(object):
71 """A basic wrapper object describing an aborted task."""
61 def __init__(self, msg_id):
72 def __init__(self, msg_id):
62 self.msg_id = msg_id
73 self.msg_id = msg_id
63 # @decorator
64 # def checktargets(f):
65 # @wraps(f)
66 # def checked_method(self, *args, **kwargs):
67 # self._build_targets(kwargs['targets'])
68 # return f(self, *args, **kwargs)
69 # return checked_method
70
74
71
75 class ControllerError(Exception):
72 # class _ZMQEventLoopThread(threading.Thread):
76 def __init__(self, etype, evalue, tb):
73 #
77 self.etype = etype
74 # def __init__(self, loop):
78 self.evalue = evalue
75 # self.loop = loop
79 self.traceback=tb
76 # threading.Thread.__init__(self)
80
77 #
78 # def run(self):
79 # self.loop.start()
80 #
81 class Client(object):
81 class Client(object):
82 """A semi-synchronous client to the IPython ZMQ controller
82 """A semi-synchronous client to the IPython ZMQ controller
83
83
84 Parameters
85 ----------
86
87 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101
88 The address of the controller's registration socket.
89
90
84 Attributes
91 Attributes
85 ----------
92 ----------
86 ids : set
93 ids : set of int engine IDs
87 a set of engine IDs
88 requesting the ids attribute always synchronizes
94 requesting the ids attribute always synchronizes
89 the registration state. To request ids without synchronization,
95 the registration state. To request ids without synchronization,
90 use _ids
96 use semi-private _ids.
91
97
92 history : list of msg_ids
98 history : list of msg_ids
93 a list of msg_ids, keeping track of all the execution
99 a list of msg_ids, keeping track of all the execution
94 messages you have submitted
100 messages you have submitted in order.
95
101
96 outstanding : set of msg_ids
102 outstanding : set of msg_ids
97 a set of msg_ids that have been submitted, but whose
103 a set of msg_ids that have been submitted, but whose
98 results have not been received
104 results have not yet been received.
99
105
100 results : dict
106 results : dict
101 a dict of all our results, keyed by msg_id
107 a dict of all our results, keyed by msg_id
@@ -111,44 +117,43 b' class Client(object):'
111
117
112 barrier : wait on one or more msg_ids
118 barrier : wait on one or more msg_ids
113
119
114 execution methods: apply/apply_bound/apply_to
120 execution methods: apply/apply_bound/apply_to/applu_bount
115 legacy: execute, run
121 legacy: execute, run
116
122
117 query methods: queue_status, get_result
123 query methods: queue_status, get_result, purge
118
124
119 control methods: abort, kill
125 control methods: abort, kill
120
126
121
122
123 """
127 """
124
128
125
129
126 _connected=False
130 _connected=False
127 _engines=None
131 _engines=None
128 registration_socket=None
132 _addr='tcp://127.0.0.1:10101'
129 query_socket=None
133 _registration_socket=None
130 control_socket=None
134 _query_socket=None
131 notification_socket=None
135 _control_socket=None
132 queue_socket=None
136 _notification_socket=None
133 task_socket=None
137 _mux_socket=None
138 _task_socket=None
134 block = False
139 block = False
135 outstanding=None
140 outstanding=None
136 results = None
141 results = None
137 history = None
142 history = None
138 debug = False
143 debug = False
139
144
140 def __init__(self, addr, context=None, username=None, debug=False):
145 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False):
141 if context is None:
146 if context is None:
142 context = zmq.Context()
147 context = zmq.Context()
143 self.context = context
148 self.context = context
144 self.addr = addr
149 self._addr = addr
145 if username is None:
150 if username is None:
146 self.session = ss.StreamSession()
151 self.session = ss.StreamSession()
147 else:
152 else:
148 self.session = ss.StreamSession(username)
153 self.session = ss.StreamSession(username)
149 self.registration_socket = self.context.socket(zmq.PAIR)
154 self._registration_socket = self.context.socket(zmq.PAIR)
150 self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
155 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
151 self.registration_socket.connect(addr)
156 self._registration_socket.connect(addr)
152 self._engines = {}
157 self._engines = {}
153 self._ids = set()
158 self._ids = set()
154 self.outstanding=set()
159 self.outstanding=set()
@@ -167,16 +172,21 b' class Client(object):'
167
172
168 @property
173 @property
169 def ids(self):
174 def ids(self):
175 """Always up to date ids property."""
170 self._flush_notifications()
176 self._flush_notifications()
171 return self._ids
177 return self._ids
172
178
173 def _update_engines(self, engines):
179 def _update_engines(self, engines):
180 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
174 for k,v in engines.iteritems():
181 for k,v in engines.iteritems():
175 eid = int(k)
182 eid = int(k)
176 self._engines[eid] = bytes(v) # force not unicode
183 self._engines[eid] = bytes(v) # force not unicode
177 self._ids.add(eid)
184 self._ids.add(eid)
178
185
179 def _build_targets(self, targets):
186 def _build_targets(self, targets):
187 """Turn valid target IDs or 'all' into two lists:
188 (int_ids, uuids).
189 """
180 if targets is None:
190 if targets is None:
181 targets = self._ids
191 targets = self._ids
182 elif isinstance(targets, str):
192 elif isinstance(targets, str):
@@ -189,45 +199,50 b' class Client(object):'
189 return [self._engines[t] for t in targets], list(targets)
199 return [self._engines[t] for t in targets], list(targets)
190
200
191 def _connect(self):
201 def _connect(self):
192 """setup all our socket connections to the controller"""
202 """setup all our socket connections to the controller. This is called from
203 __init__."""
193 if self._connected:
204 if self._connected:
194 return
205 return
195 self._connected=True
206 self._connected=True
196 self.session.send(self.registration_socket, 'connection_request')
207 self.session.send(self._registration_socket, 'connection_request')
197 idents,msg = self.session.recv(self.registration_socket,mode=0)
208 idents,msg = self.session.recv(self._registration_socket,mode=0)
198 if self.debug:
209 if self.debug:
199 pprint(msg)
210 pprint(msg)
200 msg = ss.Message(msg)
211 msg = ss.Message(msg)
201 content = msg.content
212 content = msg.content
202 if content.status == 'ok':
213 if content.status == 'ok':
203 if content.queue:
214 if content.queue:
204 self.queue_socket = self.context.socket(zmq.PAIR)
215 self._mux_socket = self.context.socket(zmq.PAIR)
205 self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session)
216 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
206 self.queue_socket.connect(content.queue)
217 self._mux_socket.connect(content.queue)
207 if content.task:
218 if content.task:
208 self.task_socket = self.context.socket(zmq.PAIR)
219 self._task_socket = self.context.socket(zmq.PAIR)
209 self.task_socket.setsockopt(zmq.IDENTITY, self.session.session)
220 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
210 self.task_socket.connect(content.task)
221 self._task_socket.connect(content.task)
211 if content.notification:
222 if content.notification:
212 self.notification_socket = self.context.socket(zmq.SUB)
223 self._notification_socket = self.context.socket(zmq.SUB)
213 self.notification_socket.connect(content.notification)
224 self._notification_socket.connect(content.notification)
214 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
225 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
215 if content.query:
226 if content.query:
216 self.query_socket = self.context.socket(zmq.PAIR)
227 self._query_socket = self.context.socket(zmq.PAIR)
217 self.query_socket.setsockopt(zmq.IDENTITY, self.session.session)
228 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
218 self.query_socket.connect(content.query)
229 self._query_socket.connect(content.query)
219 if content.control:
230 if content.control:
220 self.control_socket = self.context.socket(zmq.PAIR)
231 self._control_socket = self.context.socket(zmq.PAIR)
221 self.control_socket.setsockopt(zmq.IDENTITY, self.session.session)
232 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
222 self.control_socket.connect(content.control)
233 self._control_socket.connect(content.control)
223 self._update_engines(dict(content.engines))
234 self._update_engines(dict(content.engines))
224
235
225 else:
236 else:
226 self._connected = False
237 self._connected = False
227 raise Exception("Failed to connect!")
238 raise Exception("Failed to connect!")
228
239
229 #### handlers and callbacks for incoming messages #######
240 #--------------------------------------------------------------------------
241 # handlers and callbacks for incoming messages
242 #--------------------------------------------------------------------------
243
230 def _register_engine(self, msg):
244 def _register_engine(self, msg):
245 """Register a new engine, and update our connection info."""
231 content = msg['content']
246 content = msg['content']
232 eid = content['id']
247 eid = content['id']
233 d = {eid : content['queue']}
248 d = {eid : content['queue']}
@@ -235,7 +250,7 b' class Client(object):'
235 self._ids.add(int(eid))
250 self._ids.add(int(eid))
236
251
237 def _unregister_engine(self, msg):
252 def _unregister_engine(self, msg):
238 # print 'unregister',msg
253 """Unregister an engine that has died."""
239 content = msg['content']
254 content = msg['content']
240 eid = int(content['id'])
255 eid = int(content['id'])
241 if eid in self._ids:
256 if eid in self._ids:
@@ -243,7 +258,7 b' class Client(object):'
243 self._engines.pop(eid)
258 self._engines.pop(eid)
244
259
245 def _handle_execute_reply(self, msg):
260 def _handle_execute_reply(self, msg):
246 # msg_id = msg['msg_id']
261 """Save the reply to an execute_request into our results."""
247 parent = msg['parent_header']
262 parent = msg['parent_header']
248 msg_id = parent['msg_id']
263 msg_id = parent['msg_id']
249 if msg_id not in self.outstanding:
264 if msg_id not in self.outstanding:
@@ -253,8 +268,7 b' class Client(object):'
253 self.results[msg_id] = ss.unwrap_exception(msg['content'])
268 self.results[msg_id] = ss.unwrap_exception(msg['content'])
254
269
255 def _handle_apply_reply(self, msg):
270 def _handle_apply_reply(self, msg):
256 # pprint(msg)
271 """Save the reply to an apply_request into our results."""
257 # msg_id = msg['msg_id']
258 parent = msg['parent_header']
272 parent = msg['parent_header']
259 msg_id = parent['msg_id']
273 msg_id = parent['msg_id']
260 if msg_id not in self.outstanding:
274 if msg_id not in self.outstanding:
@@ -272,8 +286,9 b' class Client(object):'
272 self.results[msg_id] = ss.unwrap_exception(content)
286 self.results[msg_id] = ss.unwrap_exception(content)
273
287
274 def _flush_notifications(self):
288 def _flush_notifications(self):
275 "flush incoming notifications of engine registrations"
289 """Flush notifications of engine registrations waiting
276 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
290 in ZMQ queue."""
291 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
277 while msg is not None:
292 while msg is not None:
278 if self.debug:
293 if self.debug:
279 pprint(msg)
294 pprint(msg)
@@ -284,10 +299,10 b' class Client(object):'
284 raise Exception("Unhandled message type: %s"%msg.msg_type)
299 raise Exception("Unhandled message type: %s"%msg.msg_type)
285 else:
300 else:
286 handler(msg)
301 handler(msg)
287 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
302 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
288
303
289 def _flush_results(self, sock):
304 def _flush_results(self, sock):
290 "flush incoming task or queue results"
305 """Flush task or queue results waiting in ZMQ queue."""
291 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
306 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
292 while msg is not None:
307 while msg is not None:
293 if self.debug:
308 if self.debug:
@@ -302,16 +317,20 b' class Client(object):'
302 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
317 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
303
318
304 def _flush_control(self, sock):
319 def _flush_control(self, sock):
305 "flush incoming control replies"
320 """Flush replies from the control channel waiting
321 in the ZMQ queue."""
306 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
322 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
307 while msg is not None:
323 while msg is not None:
308 if self.debug:
324 if self.debug:
309 pprint(msg)
325 pprint(msg)
310 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
326 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
311
327
312 ###### get/setitem ########
328 #--------------------------------------------------------------------------
329 # getitem
330 #--------------------------------------------------------------------------
313
331
314 def __getitem__(self, key):
332 def __getitem__(self, key):
333 """Dict access returns DirectView multiplexer objects."""
315 if isinstance(key, int):
334 if isinstance(key, int):
316 if key not in self.ids:
335 if key not in self.ids:
317 raise IndexError("No such engine: %i"%key)
336 raise IndexError("No such engine: %i"%key)
@@ -329,51 +348,75 b' class Client(object):'
329 else:
348 else:
330 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
349 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
331
350
332 ############ begin real methods #############
351 #--------------------------------------------------------------------------
352 # Begin public methods
353 #--------------------------------------------------------------------------
333
354
334 def spin(self):
355 def spin(self):
335 """flush incoming notifications and execution results."""
356 """Flush any registration notifications and execution results
336 if self.notification_socket:
357 waiting in the ZMQ queue.
358 """
359 if self._notification_socket:
337 self._flush_notifications()
360 self._flush_notifications()
338 if self.queue_socket:
361 if self._mux_socket:
339 self._flush_results(self.queue_socket)
362 self._flush_results(self._mux_socket)
340 if self.task_socket:
363 if self._task_socket:
341 self._flush_results(self.task_socket)
364 self._flush_results(self._task_socket)
342 if self.control_socket:
365 if self._control_socket:
343 self._flush_control(self.control_socket)
366 self._flush_control(self._control_socket)
344
367
345 @spinfirst
368 def barrier(self, msg_ids=None, timeout=-1):
346 def queue_status(self, targets=None, verbose=False):
369 """waits on one or more `msg_ids`, for up to `timeout` seconds.
347 """fetch the status of engine queues
348
370
349 Parameters
371 Parameters
350 ----------
372 ----------
351 targets : int/str/list of ints/strs
373 msg_ids : int, str, or list of ints and/or strs
352 the engines on which to execute
374 ints are indices to self.history
353 default : all
375 strs are msg_ids
354 verbose : bool
376 default: wait on all outstanding messages
355 whether to return lengths only, or lists of ids for each element
377 timeout : float
356
378 a time in seconds, after which to give up.
357 """
379 default is -1, which means no timeout
358 targets = self._build_targets(targets)[1]
359 content = dict(targets=targets)
360 self.session.send(self.query_socket, "queue_request", content=content)
361 idents,msg = self.session.recv(self.query_socket, 0)
362 if self.debug:
363 pprint(msg)
364 return msg['content']
365
380
381 Returns
382 -------
383 True : when all msg_ids are done
384 False : timeout reached, some msg_ids still outstanding
385 """
386 tic = time.time()
387 if msg_ids is None:
388 theids = self.outstanding
389 else:
390 if isinstance(msg_ids, (int, str)):
391 msg_ids = [msg_ids]
392 theids = set()
393 for msg_id in msg_ids:
394 if isinstance(msg_id, int):
395 msg_id = self.history[msg_id]
396 theids.add(msg_id)
397 self.spin()
398 while theids.intersection(self.outstanding):
399 if timeout >= 0 and ( time.time()-tic ) > timeout:
400 break
401 time.sleep(1e-3)
402 self.spin()
403 return len(theids.intersection(self.outstanding)) == 0
404
405 #--------------------------------------------------------------------------
406 # Control methods
407 #--------------------------------------------------------------------------
408
366 @spinfirst
409 @spinfirst
367 @defaultblock
410 @defaultblock
368 def clear(self, targets=None, block=None):
411 def clear(self, targets=None, block=None):
369 """clear the namespace in target(s)"""
412 """Clear the namespace in target(s)."""
370 targets = self._build_targets(targets)[0]
413 targets = self._build_targets(targets)[0]
371 for t in targets:
414 for t in targets:
372 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
415 self.session.send(self._control_socket, 'clear_request', content={},ident=t)
373 error = False
416 error = False
374 if self.block:
417 if self.block:
375 for i in range(len(targets)):
418 for i in range(len(targets)):
376 idents,msg = self.session.recv(self.control_socket,0)
419 idents,msg = self.session.recv(self._control_socket,0)
377 if self.debug:
420 if self.debug:
378 pprint(msg)
421 pprint(msg)
379 if msg['content']['status'] != 'ok':
422 if msg['content']['status'] != 'ok':
@@ -385,18 +428,18 b' class Client(object):'
385 @spinfirst
428 @spinfirst
386 @defaultblock
429 @defaultblock
387 def abort(self, msg_ids = None, targets=None, block=None):
430 def abort(self, msg_ids = None, targets=None, block=None):
388 """abort the Queues of target(s)"""
431 """Abort the execution queues of target(s)."""
389 targets = self._build_targets(targets)[0]
432 targets = self._build_targets(targets)[0]
390 if isinstance(msg_ids, basestring):
433 if isinstance(msg_ids, basestring):
391 msg_ids = [msg_ids]
434 msg_ids = [msg_ids]
392 content = dict(msg_ids=msg_ids)
435 content = dict(msg_ids=msg_ids)
393 for t in targets:
436 for t in targets:
394 self.session.send(self.control_socket, 'abort_request',
437 self.session.send(self._control_socket, 'abort_request',
395 content=content, ident=t)
438 content=content, ident=t)
396 error = False
439 error = False
397 if self.block:
440 if self.block:
398 for i in range(len(targets)):
441 for i in range(len(targets)):
399 idents,msg = self.session.recv(self.control_socket,0)
442 idents,msg = self.session.recv(self._control_socket,0)
400 if self.debug:
443 if self.debug:
401 pprint(msg)
444 pprint(msg)
402 if msg['content']['status'] != 'ok':
445 if msg['content']['status'] != 'ok':
@@ -410,21 +453,25 b' class Client(object):'
410 """Terminates one or more engine processes."""
453 """Terminates one or more engine processes."""
411 targets = self._build_targets(targets)[0]
454 targets = self._build_targets(targets)[0]
412 for t in targets:
455 for t in targets:
413 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
456 self.session.send(self._control_socket, 'kill_request', content={},ident=t)
414 error = False
457 error = False
415 if self.block:
458 if self.block:
416 for i in range(len(targets)):
459 for i in range(len(targets)):
417 idents,msg = self.session.recv(self.control_socket,0)
460 idents,msg = self.session.recv(self._control_socket,0)
418 if self.debug:
461 if self.debug:
419 pprint(msg)
462 pprint(msg)
420 if msg['content']['status'] != 'ok':
463 if msg['content']['status'] != 'ok':
421 error = msg['content']
464 error = msg['content']
422 if error:
465 if error:
423 return error
466 return error
424
467
468 #--------------------------------------------------------------------------
469 # Execution methods
470 #--------------------------------------------------------------------------
471
425 @defaultblock
472 @defaultblock
426 def execute(self, code, targets='all', block=None):
473 def execute(self, code, targets='all', block=None):
427 """executes `code` on `targets` in blocking or nonblocking manner.
474 """Executes `code` on `targets` in blocking or nonblocking manner.
428
475
429 Parameters
476 Parameters
430 ----------
477 ----------
@@ -434,7 +481,8 b' class Client(object):'
434 the engines on which to execute
481 the engines on which to execute
435 default : all
482 default : all
436 block : bool
483 block : bool
437 whether or not to wait until done
484 whether or not to wait until done to return
485 default: self.block
438 """
486 """
439 # block = self.block if block is None else block
487 # block = self.block if block is None else block
440 # saveblock = self.block
488 # saveblock = self.block
@@ -444,7 +492,7 b' class Client(object):'
444 return result
492 return result
445
493
446 def run(self, code, block=None):
494 def run(self, code, block=None):
447 """runs `code` on an engine.
495 """Runs `code` on an engine.
448
496
449 Calls to this are load-balanced.
497 Calls to this are load-balanced.
450
498
@@ -459,11 +507,96 b' class Client(object):'
459 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
507 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
460 return result
508 return result
461
509
510 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
511 after=None, follow=None):
512 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
513
514 This is the central execution command for the client.
515
516 Parameters
517 ----------
518
519 f : function
520 The fuction to be called remotely
521 args : tuple/list
522 The positional arguments passed to `f`
523 kwargs : dict
524 The keyword arguments passed to `f`
525 bound : bool (default: True)
526 Whether to execute in the Engine(s) namespace, or in a clean
527 namespace not affecting the engine.
528 block : bool (default: self.block)
529 Whether to wait for the result, or return immediately.
530 False:
531 returns msg_id(s)
532 if multiple targets:
533 list of ids
534 True:
535 returns actual result(s) of f(*args, **kwargs)
536 if multiple targets:
537 dict of results, by engine ID
538 targets : int,list of ints, 'all', None
539 Specify the destination of the job.
540 if None:
541 Submit via Task queue for load-balancing.
542 if 'all':
543 Run on all active engines
544 if list:
545 Run on each specified engine
546 if int:
547 Run on single engine
548
549 after : Dependency or collection of msg_ids
550 Only for load-balanced execution (targets=None)
551 Specify a list of msg_ids as a time-based dependency.
552 This job will only be run *after* the dependencies
553 have been met.
554
555 follow : Dependency or collection of msg_ids
556 Only for load-balanced execution (targets=None)
557 Specify a list of msg_ids as a location-based dependency.
558 This job will only be run on an engine where this dependency
559 is met.
560
561 Returns
562 -------
563 if block is False:
564 if single target:
565 return msg_id
566 else:
567 return list of msg_ids
568 ? (should this be dict like block=True) ?
569 else:
570 if single target:
571 return result of f(*args, **kwargs)
572 else:
573 return dict of results, keyed by engine
574 """
575
576 # defaults:
577 block = block if block is not None else self.block
578 args = args if args is not None else []
579 kwargs = kwargs if kwargs is not None else {}
580
581 # enforce types of f,args,kwrags
582 if not callable(f):
583 raise TypeError("f must be callable, not %s"%type(f))
584 if not isinstance(args, (tuple, list)):
585 raise TypeError("args must be tuple or list, not %s"%type(args))
586 if not isinstance(kwargs, dict):
587 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
588
589 options = dict(bound=bound, block=block, after=after, follow=follow)
590
591 if targets is None:
592 return self._apply_balanced(f, args, kwargs, **options)
593 else:
594 return self._apply_direct(f, args, kwargs, targets=targets, **options)
595
462 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
596 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
463 after=None, follow=None):
597 after=None, follow=None):
464 """the underlying method for applying functions in a load balanced
598 """The underlying method for applying functions in a load balanced
465 manner."""
599 manner, via the task queue."""
466 block = block if block is not None else self.block
467 if isinstance(after, Dependency):
600 if isinstance(after, Dependency):
468 after = after.as_dict()
601 after = after.as_dict()
469 elif after is None:
602 elif after is None:
@@ -476,7 +609,7 b' class Client(object):'
476
609
477 bufs = ss.pack_apply_message(f,args,kwargs)
610 bufs = ss.pack_apply_message(f,args,kwargs)
478 content = dict(bound=bound)
611 content = dict(bound=bound)
479 msg = self.session.send(self.task_socket, "apply_request",
612 msg = self.session.send(self._task_socket, "apply_request",
480 content=content, buffers=bufs, subheader=subheader)
613 content=content, buffers=bufs, subheader=subheader)
481 msg_id = msg['msg_id']
614 msg_id = msg['msg_id']
482 self.outstanding.add(msg_id)
615 self.outstanding.add(msg_id)
@@ -489,9 +622,8 b' class Client(object):'
489
622
490 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
623 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
491 after=None, follow=None):
624 after=None, follow=None):
492 """Then underlying method for applying functions to specific engines."""
625 """Then underlying method for applying functions to specific engines
493
626 via the MUX queue."""
494 block = block if block is not None else self.block
495
627
496 queues,targets = self._build_targets(targets)
628 queues,targets = self._build_targets(targets)
497 bufs = ss.pack_apply_message(f,args,kwargs)
629 bufs = ss.pack_apply_message(f,args,kwargs)
@@ -507,7 +639,7 b' class Client(object):'
507 content = dict(bound=bound)
639 content = dict(bound=bound)
508 msg_ids = []
640 msg_ids = []
509 for queue in queues:
641 for queue in queues:
510 msg = self.session.send(self.queue_socket, "apply_request",
642 msg = self.session.send(self._mux_socket, "apply_request",
511 content=content, buffers=bufs,ident=queue, subheader=subheader)
643 content=content, buffers=bufs,ident=queue, subheader=subheader)
512 msg_id = msg['msg_id']
644 msg_id = msg['msg_id']
513 self.outstanding.add(msg_id)
645 self.outstanding.add(msg_id)
@@ -528,115 +660,145 b' class Client(object):'
528 result[target] = self.results[mid]
660 result[target] = self.results[mid]
529 return result
661 return result
530
662
531 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
663 #--------------------------------------------------------------------------
532 after=None, follow=None):
664 # Data movement
533 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
665 #--------------------------------------------------------------------------
534
535 if self.block is False:
536 returns msg_id or list of msg_ids
537 else:
538 returns actual result of f(*args, **kwargs)
539 """
540 # enforce types of f,args,kwrags
541 args = args if args is not None else []
542 kwargs = kwargs if kwargs is not None else {}
543 if not callable(f):
544 raise TypeError("f must be callable, not %s"%type(f))
545 if not isinstance(args, (tuple, list)):
546 raise TypeError("args must be tuple or list, not %s"%type(args))
547 if not isinstance(kwargs, dict):
548 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
549
550 options = dict(bound=bound, block=block, after=after, follow=follow)
551
552 if targets is None:
553 return self._apply_balanced(f, args, kwargs, **options)
554 else:
555 return self._apply_direct(f, args, kwargs, targets=targets, **options)
556
666
667 @defaultblock
557 def push(self, ns, targets=None, block=None):
668 def push(self, ns, targets=None, block=None):
558 """push the contents of `ns` into the namespace on `target`"""
669 """Push the contents of `ns` into the namespace on `target`"""
559 if not isinstance(ns, dict):
670 if not isinstance(ns, dict):
560 raise TypeError("Must be a dict, not %s"%type(ns))
671 raise TypeError("Must be a dict, not %s"%type(ns))
561 result = self.apply(_push, (ns,), targets=targets, block=block,bound=True)
672 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
562 return result
673 return result
563
674
564 @spinfirst
675 @defaultblock
565 def pull(self, keys, targets=None, block=True):
676 def pull(self, keys, targets=None, block=True):
566 """pull objects from `target`'s namespace by `keys`"""
677 """Pull objects from `target`'s namespace by `keys`"""
567
678 if isinstance(keys, str):
679 pass
680 elif isistance(keys, (list,tuple,set)):
681 for key in keys:
682 if not isinstance(key, str):
683 raise TypeError
568 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
684 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
569 return result
685 return result
570
686
571 def barrier(self, msg_ids=None, timeout=-1):
687 #--------------------------------------------------------------------------
572 """waits on one or more `msg_ids`, for up to `timeout` seconds.
688 # Query methods
689 #--------------------------------------------------------------------------
690
691 @spinfirst
692 def get_results(self, msg_ids, status_only=False):
693 """Returns the result of the execute or task request with `msg_ids`.
573
694
574 Parameters
695 Parameters
575 ----------
696 ----------
576 msg_ids : int, str, or list of ints and/or strs
697 msg_ids : list of ints or msg_ids
577 ints are indices to self.history
698 if int:
578 strs are msg_ids
699 Passed as index to self.history for convenience.
579 default: wait on all outstanding messages
700 status_only : bool (default: False)
580 timeout : float
701 if False:
581 a time in seconds, after which to give up.
702 return the actual results
582 default is -1, which means no timeout
583
584 Returns
585 -------
586 True : when all msg_ids are done
587 False : timeout reached, msg_ids still outstanding
588 """
703 """
589 tic = time.time()
590 if msg_ids is None:
591 theids = self.outstanding
592 else:
593 if isinstance(msg_ids, (int, str)):
594 msg_ids = [msg_ids]
595 theids = set()
596 for msg_id in msg_ids:
597 if isinstance(msg_id, int):
598 msg_id = self.history[msg_id]
599 theids.add(msg_id)
600 self.spin()
601 while theids.intersection(self.outstanding):
602 if timeout >= 0 and ( time.time()-tic ) > timeout:
603 break
604 time.sleep(1e-3)
605 self.spin()
606 return len(theids.intersection(self.outstanding)) == 0
607
608 @spinfirst
609 def get_results(self, msg_ids,status_only=False):
610 """returns the result of the execute or task request with `msg_id`"""
611 if not isinstance(msg_ids, (list,tuple)):
704 if not isinstance(msg_ids, (list,tuple)):
612 msg_ids = [msg_ids]
705 msg_ids = [msg_ids]
613 theids = []
706 theids = []
614 for msg_id in msg_ids:
707 for msg_id in msg_ids:
615 if isinstance(msg_id, int):
708 if isinstance(msg_id, int):
616 msg_id = self.history[msg_id]
709 msg_id = self.history[msg_id]
710 if not isinstance(msg_id, str):
711 raise TypeError("msg_ids must be str, not %r"%msg_id)
617 theids.append(msg_id)
712 theids.append(msg_id)
618
713
619 content = dict(msg_ids=theids, status_only=status_only)
714 completed = []
620 msg = self.session.send(self.query_socket, "result_request", content=content)
715 local_results = {}
621 zmq.select([self.query_socket], [], [])
716 for msg_id in list(theids):
622 idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK)
717 if msg_id in self.results:
718 completed.append(msg_id)
719 local_results[msg_id] = self.results[msg_id]
720 theids.remove(msg_id)
721
722 if msg_ids: # some not locally cached
723 content = dict(msg_ids=theids, status_only=status_only)
724 msg = self.session.send(self._query_socket, "result_request", content=content)
725 zmq.select([self._query_socket], [], [])
726 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
727 if self.debug:
728 pprint(msg)
729 content = msg['content']
730 if content['status'] != 'ok':
731 raise ss.unwrap_exception(content)
732 else:
733 content = dict(completed=[],pending=[])
734 if not status_only:
735 # load cached results into result:
736 content['completed'].extend(completed)
737 content.update(local_results)
738 # update cache with results:
739 for msg_id in msg_ids:
740 if msg_id in content['completed']:
741 self.results[msg_id] = content[msg_id]
742 return content
743
744 @spinfirst
745 def queue_status(self, targets=None, verbose=False):
746 """Fetch the status of engine queues.
747
748 Parameters
749 ----------
750 targets : int/str/list of ints/strs
751 the engines on which to execute
752 default : all
753 verbose : bool
754 whether to return lengths only, or lists of ids for each element
755 """
756 targets = self._build_targets(targets)[1]
757 content = dict(targets=targets, verbose=verbose)
758 self.session.send(self._query_socket, "queue_request", content=content)
759 idents,msg = self.session.recv(self._query_socket, 0)
623 if self.debug:
760 if self.debug:
624 pprint(msg)
761 pprint(msg)
762 content = msg['content']
763 status = content.pop('status')
764 if status != 'ok':
765 raise ss.unwrap_exception(content)
766 return content
767
768 @spinfirst
769 def purge_results(self, msg_ids=[], targets=[]):
770 """Tell the controller to forget results.
625
771
626 # while True:
772 Individual results can be purged by msg_id, or the entire
627 # try:
773 history of specific targets can
628 # except zmq.ZMQError:
774
629 # time.sleep(1e-3)
775 Parameters
630 # continue
776 ----------
631 # else:
777 targets : int/str/list of ints/strs
632 # break
778 the targets
633 return msg['content']
779 default : None
780 """
781 if not targets and not msg_ids:
782 raise ValueError
783 if targets:
784 targets = self._build_targets(targets)[1]
785 content = dict(targets=targets, msg_ids=msg_ids)
786 self.session.send(self._query_socket, "purge_request", content=content)
787 idents, msg = self.session.recv(self._query_socket, 0)
788 if self.debug:
789 pprint(msg)
790 content = msg['content']
791 if content['status'] != 'ok':
792 raise ss.unwrap_exception(content)
634
793
635 class AsynClient(Client):
794 class AsynClient(Client):
636 """An Asynchronous client, using the Tornado Event Loop"""
795 """An Asynchronous client, using the Tornado Event Loop.
796 !!!unfinished!!!"""
637 io_loop = None
797 io_loop = None
638 queue_stream = None
798 _queue_stream = None
639 notifier_stream = None
799 _notifier_stream = None
800 _task_stream = None
801 _control_stream = None
640
802
641 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
803 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
642 Client.__init__(self, addr, context, username, debug)
804 Client.__init__(self, addr, context, username, debug)
@@ -644,10 +806,10 b' class AsynClient(Client):'
644 io_loop = ioloop.IOLoop.instance()
806 io_loop = ioloop.IOLoop.instance()
645 self.io_loop = io_loop
807 self.io_loop = io_loop
646
808
647 self.queue_stream = zmqstream.ZMQStream(self.queue_socket, io_loop)
809 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
648 self.control_stream = zmqstream.ZMQStream(self.control_socket, io_loop)
810 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
649 self.task_stream = zmqstream.ZMQStream(self.task_socket, io_loop)
811 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
650 self.notification_stream = zmqstream.ZMQStream(self.notification_socket, io_loop)
812 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
651
813
652 def spin(self):
814 def spin(self):
653 for stream in (self.queue_stream, self.notifier_stream,
815 for stream in (self.queue_stream, self.notifier_stream,
General Comments 0
You need to be logged in to leave comments. Login now