##// END OF EJS Templates
major cleanup of client code + purge_request implemented
MinRK -
Show More
This diff has been collapsed as it changes many lines, (562 lines changed) Show them Hide them
@@ -30,7 +30,7 b' def _push(ns):'
30 30
31 31 def _pull(keys):
32 32 g = globals()
33 if isinstance(keys, (list,tuple)):
33 if isinstance(keys, (list,tuple, set)):
34 34 return map(g.get, keys)
35 35 else:
36 36 return g.get(keys)
@@ -41,14 +41,19 b' def _clear():'
41 41 def execute(code):
42 42 exec code in globals()
43 43
44 # decorators for methods:
44 #--------------------------------------------------------------------------
45 # Decorators for Client methods
46 #--------------------------------------------------------------------------
47
45 48 @decorator
46 def spinfirst(f,self,*args,**kwargs):
49 def spinfirst(f, self, *args, **kwargs):
50 """Call spin() to sync state prior to calling the method."""
47 51 self.spin()
48 52 return f(self, *args, **kwargs)
49 53
50 54 @decorator
51 55 def defaultblock(f, self, *args, **kwargs):
56 """Default to self.block; preserve self.block."""
52 57 block = kwargs.get('block',None)
53 58 block = self.block if block is None else block
54 59 saveblock = self.block
@@ -57,45 +62,46 b' def defaultblock(f, self, *args, **kwargs):'
57 62 self.block = saveblock
58 63 return ret
59 64
65 #--------------------------------------------------------------------------
66 # Classes
67 #--------------------------------------------------------------------------
68
69
60 70 class AbortedTask(object):
71 """A basic wrapper object describing an aborted task."""
61 72 def __init__(self, msg_id):
62 73 self.msg_id = msg_id
63 # @decorator
64 # def checktargets(f):
65 # @wraps(f)
66 # def checked_method(self, *args, **kwargs):
67 # self._build_targets(kwargs['targets'])
68 # return f(self, *args, **kwargs)
69 # return checked_method
70 74
71
72 # class _ZMQEventLoopThread(threading.Thread):
73 #
74 # def __init__(self, loop):
75 # self.loop = loop
76 # threading.Thread.__init__(self)
77 #
78 # def run(self):
79 # self.loop.start()
80 #
75 class ControllerError(Exception):
76 def __init__(self, etype, evalue, tb):
77 self.etype = etype
78 self.evalue = evalue
79 self.traceback=tb
80
81 81 class Client(object):
82 82 """A semi-synchronous client to the IPython ZMQ controller
83 83
84 Parameters
85 ----------
86
87 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101
88 The address of the controller's registration socket.
89
90
84 91 Attributes
85 92 ----------
86 ids : set
87 a set of engine IDs
93 ids : set of int engine IDs
88 94 requesting the ids attribute always synchronizes
89 95 the registration state. To request ids without synchronization,
90 use _ids
96 use semi-private _ids.
91 97
92 98 history : list of msg_ids
93 99 a list of msg_ids, keeping track of all the execution
94 messages you have submitted
100 messages you have submitted in order.
95 101
96 102 outstanding : set of msg_ids
97 103 a set of msg_ids that have been submitted, but whose
98 results have not been received
104 results have not yet been received.
99 105
100 106 results : dict
101 107 a dict of all our results, keyed by msg_id
@@ -111,44 +117,43 b' class Client(object):'
111 117
112 118 barrier : wait on one or more msg_ids
113 119
114 execution methods: apply/apply_bound/apply_to
120 execution methods: apply/apply_bound/apply_to/applu_bount
115 121 legacy: execute, run
116 122
117 query methods: queue_status, get_result
123 query methods: queue_status, get_result, purge
118 124
119 125 control methods: abort, kill
120 126
121
122
123 127 """
124 128
125 129
126 130 _connected=False
127 131 _engines=None
128 registration_socket=None
129 query_socket=None
130 control_socket=None
131 notification_socket=None
132 queue_socket=None
133 task_socket=None
132 _addr='tcp://127.0.0.1:10101'
133 _registration_socket=None
134 _query_socket=None
135 _control_socket=None
136 _notification_socket=None
137 _mux_socket=None
138 _task_socket=None
134 139 block = False
135 140 outstanding=None
136 141 results = None
137 142 history = None
138 143 debug = False
139 144
140 def __init__(self, addr, context=None, username=None, debug=False):
145 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False):
141 146 if context is None:
142 147 context = zmq.Context()
143 148 self.context = context
144 self.addr = addr
149 self._addr = addr
145 150 if username is None:
146 151 self.session = ss.StreamSession()
147 152 else:
148 153 self.session = ss.StreamSession(username)
149 self.registration_socket = self.context.socket(zmq.PAIR)
150 self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
151 self.registration_socket.connect(addr)
154 self._registration_socket = self.context.socket(zmq.PAIR)
155 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
156 self._registration_socket.connect(addr)
152 157 self._engines = {}
153 158 self._ids = set()
154 159 self.outstanding=set()
@@ -167,16 +172,21 b' class Client(object):'
167 172
168 173 @property
169 174 def ids(self):
175 """Always up to date ids property."""
170 176 self._flush_notifications()
171 177 return self._ids
172 178
173 179 def _update_engines(self, engines):
180 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
174 181 for k,v in engines.iteritems():
175 182 eid = int(k)
176 183 self._engines[eid] = bytes(v) # force not unicode
177 184 self._ids.add(eid)
178 185
179 186 def _build_targets(self, targets):
187 """Turn valid target IDs or 'all' into two lists:
188 (int_ids, uuids).
189 """
180 190 if targets is None:
181 191 targets = self._ids
182 192 elif isinstance(targets, str):
@@ -189,45 +199,50 b' class Client(object):'
189 199 return [self._engines[t] for t in targets], list(targets)
190 200
191 201 def _connect(self):
192 """setup all our socket connections to the controller"""
202 """setup all our socket connections to the controller. This is called from
203 __init__."""
193 204 if self._connected:
194 205 return
195 206 self._connected=True
196 self.session.send(self.registration_socket, 'connection_request')
197 idents,msg = self.session.recv(self.registration_socket,mode=0)
207 self.session.send(self._registration_socket, 'connection_request')
208 idents,msg = self.session.recv(self._registration_socket,mode=0)
198 209 if self.debug:
199 210 pprint(msg)
200 211 msg = ss.Message(msg)
201 212 content = msg.content
202 213 if content.status == 'ok':
203 214 if content.queue:
204 self.queue_socket = self.context.socket(zmq.PAIR)
205 self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session)
206 self.queue_socket.connect(content.queue)
215 self._mux_socket = self.context.socket(zmq.PAIR)
216 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
217 self._mux_socket.connect(content.queue)
207 218 if content.task:
208 self.task_socket = self.context.socket(zmq.PAIR)
209 self.task_socket.setsockopt(zmq.IDENTITY, self.session.session)
210 self.task_socket.connect(content.task)
219 self._task_socket = self.context.socket(zmq.PAIR)
220 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
221 self._task_socket.connect(content.task)
211 222 if content.notification:
212 self.notification_socket = self.context.socket(zmq.SUB)
213 self.notification_socket.connect(content.notification)
214 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
223 self._notification_socket = self.context.socket(zmq.SUB)
224 self._notification_socket.connect(content.notification)
225 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
215 226 if content.query:
216 self.query_socket = self.context.socket(zmq.PAIR)
217 self.query_socket.setsockopt(zmq.IDENTITY, self.session.session)
218 self.query_socket.connect(content.query)
227 self._query_socket = self.context.socket(zmq.PAIR)
228 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
229 self._query_socket.connect(content.query)
219 230 if content.control:
220 self.control_socket = self.context.socket(zmq.PAIR)
221 self.control_socket.setsockopt(zmq.IDENTITY, self.session.session)
222 self.control_socket.connect(content.control)
231 self._control_socket = self.context.socket(zmq.PAIR)
232 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
233 self._control_socket.connect(content.control)
223 234 self._update_engines(dict(content.engines))
224 235
225 236 else:
226 237 self._connected = False
227 238 raise Exception("Failed to connect!")
228 239
229 #### handlers and callbacks for incoming messages #######
240 #--------------------------------------------------------------------------
241 # handlers and callbacks for incoming messages
242 #--------------------------------------------------------------------------
243
230 244 def _register_engine(self, msg):
245 """Register a new engine, and update our connection info."""
231 246 content = msg['content']
232 247 eid = content['id']
233 248 d = {eid : content['queue']}
@@ -235,7 +250,7 b' class Client(object):'
235 250 self._ids.add(int(eid))
236 251
237 252 def _unregister_engine(self, msg):
238 # print 'unregister',msg
253 """Unregister an engine that has died."""
239 254 content = msg['content']
240 255 eid = int(content['id'])
241 256 if eid in self._ids:
@@ -243,7 +258,7 b' class Client(object):'
243 258 self._engines.pop(eid)
244 259
245 260 def _handle_execute_reply(self, msg):
246 # msg_id = msg['msg_id']
261 """Save the reply to an execute_request into our results."""
247 262 parent = msg['parent_header']
248 263 msg_id = parent['msg_id']
249 264 if msg_id not in self.outstanding:
@@ -253,8 +268,7 b' class Client(object):'
253 268 self.results[msg_id] = ss.unwrap_exception(msg['content'])
254 269
255 270 def _handle_apply_reply(self, msg):
256 # pprint(msg)
257 # msg_id = msg['msg_id']
271 """Save the reply to an apply_request into our results."""
258 272 parent = msg['parent_header']
259 273 msg_id = parent['msg_id']
260 274 if msg_id not in self.outstanding:
@@ -272,8 +286,9 b' class Client(object):'
272 286 self.results[msg_id] = ss.unwrap_exception(content)
273 287
274 288 def _flush_notifications(self):
275 "flush incoming notifications of engine registrations"
276 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
289 """Flush notifications of engine registrations waiting
290 in ZMQ queue."""
291 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
277 292 while msg is not None:
278 293 if self.debug:
279 294 pprint(msg)
@@ -284,10 +299,10 b' class Client(object):'
284 299 raise Exception("Unhandled message type: %s"%msg.msg_type)
285 300 else:
286 301 handler(msg)
287 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
302 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
288 303
289 304 def _flush_results(self, sock):
290 "flush incoming task or queue results"
305 """Flush task or queue results waiting in ZMQ queue."""
291 306 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
292 307 while msg is not None:
293 308 if self.debug:
@@ -302,16 +317,20 b' class Client(object):'
302 317 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
303 318
304 319 def _flush_control(self, sock):
305 "flush incoming control replies"
320 """Flush replies from the control channel waiting
321 in the ZMQ queue."""
306 322 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
307 323 while msg is not None:
308 324 if self.debug:
309 325 pprint(msg)
310 326 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
311 327
312 ###### get/setitem ########
328 #--------------------------------------------------------------------------
329 # getitem
330 #--------------------------------------------------------------------------
313 331
314 332 def __getitem__(self, key):
333 """Dict access returns DirectView multiplexer objects."""
315 334 if isinstance(key, int):
316 335 if key not in self.ids:
317 336 raise IndexError("No such engine: %i"%key)
@@ -329,51 +348,75 b' class Client(object):'
329 348 else:
330 349 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
331 350
332 ############ begin real methods #############
351 #--------------------------------------------------------------------------
352 # Begin public methods
353 #--------------------------------------------------------------------------
333 354
334 355 def spin(self):
335 """flush incoming notifications and execution results."""
336 if self.notification_socket:
356 """Flush any registration notifications and execution results
357 waiting in the ZMQ queue.
358 """
359 if self._notification_socket:
337 360 self._flush_notifications()
338 if self.queue_socket:
339 self._flush_results(self.queue_socket)
340 if self.task_socket:
341 self._flush_results(self.task_socket)
342 if self.control_socket:
343 self._flush_control(self.control_socket)
361 if self._mux_socket:
362 self._flush_results(self._mux_socket)
363 if self._task_socket:
364 self._flush_results(self._task_socket)
365 if self._control_socket:
366 self._flush_control(self._control_socket)
344 367
345 @spinfirst
346 def queue_status(self, targets=None, verbose=False):
347 """fetch the status of engine queues
368 def barrier(self, msg_ids=None, timeout=-1):
369 """waits on one or more `msg_ids`, for up to `timeout` seconds.
348 370
349 371 Parameters
350 372 ----------
351 targets : int/str/list of ints/strs
352 the engines on which to execute
353 default : all
354 verbose : bool
355 whether to return lengths only, or lists of ids for each element
356
357 """
358 targets = self._build_targets(targets)[1]
359 content = dict(targets=targets)
360 self.session.send(self.query_socket, "queue_request", content=content)
361 idents,msg = self.session.recv(self.query_socket, 0)
362 if self.debug:
363 pprint(msg)
364 return msg['content']
373 msg_ids : int, str, or list of ints and/or strs
374 ints are indices to self.history
375 strs are msg_ids
376 default: wait on all outstanding messages
377 timeout : float
378 a time in seconds, after which to give up.
379 default is -1, which means no timeout
365 380
381 Returns
382 -------
383 True : when all msg_ids are done
384 False : timeout reached, some msg_ids still outstanding
385 """
386 tic = time.time()
387 if msg_ids is None:
388 theids = self.outstanding
389 else:
390 if isinstance(msg_ids, (int, str)):
391 msg_ids = [msg_ids]
392 theids = set()
393 for msg_id in msg_ids:
394 if isinstance(msg_id, int):
395 msg_id = self.history[msg_id]
396 theids.add(msg_id)
397 self.spin()
398 while theids.intersection(self.outstanding):
399 if timeout >= 0 and ( time.time()-tic ) > timeout:
400 break
401 time.sleep(1e-3)
402 self.spin()
403 return len(theids.intersection(self.outstanding)) == 0
404
405 #--------------------------------------------------------------------------
406 # Control methods
407 #--------------------------------------------------------------------------
408
366 409 @spinfirst
367 410 @defaultblock
368 411 def clear(self, targets=None, block=None):
369 """clear the namespace in target(s)"""
412 """Clear the namespace in target(s)."""
370 413 targets = self._build_targets(targets)[0]
371 414 for t in targets:
372 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
415 self.session.send(self._control_socket, 'clear_request', content={},ident=t)
373 416 error = False
374 417 if self.block:
375 418 for i in range(len(targets)):
376 idents,msg = self.session.recv(self.control_socket,0)
419 idents,msg = self.session.recv(self._control_socket,0)
377 420 if self.debug:
378 421 pprint(msg)
379 422 if msg['content']['status'] != 'ok':
@@ -385,18 +428,18 b' class Client(object):'
385 428 @spinfirst
386 429 @defaultblock
387 430 def abort(self, msg_ids = None, targets=None, block=None):
388 """abort the Queues of target(s)"""
431 """Abort the execution queues of target(s)."""
389 432 targets = self._build_targets(targets)[0]
390 433 if isinstance(msg_ids, basestring):
391 434 msg_ids = [msg_ids]
392 435 content = dict(msg_ids=msg_ids)
393 436 for t in targets:
394 self.session.send(self.control_socket, 'abort_request',
437 self.session.send(self._control_socket, 'abort_request',
395 438 content=content, ident=t)
396 439 error = False
397 440 if self.block:
398 441 for i in range(len(targets)):
399 idents,msg = self.session.recv(self.control_socket,0)
442 idents,msg = self.session.recv(self._control_socket,0)
400 443 if self.debug:
401 444 pprint(msg)
402 445 if msg['content']['status'] != 'ok':
@@ -410,21 +453,25 b' class Client(object):'
410 453 """Terminates one or more engine processes."""
411 454 targets = self._build_targets(targets)[0]
412 455 for t in targets:
413 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
456 self.session.send(self._control_socket, 'kill_request', content={},ident=t)
414 457 error = False
415 458 if self.block:
416 459 for i in range(len(targets)):
417 idents,msg = self.session.recv(self.control_socket,0)
460 idents,msg = self.session.recv(self._control_socket,0)
418 461 if self.debug:
419 462 pprint(msg)
420 463 if msg['content']['status'] != 'ok':
421 464 error = msg['content']
422 465 if error:
423 466 return error
424
467
468 #--------------------------------------------------------------------------
469 # Execution methods
470 #--------------------------------------------------------------------------
471
425 472 @defaultblock
426 473 def execute(self, code, targets='all', block=None):
427 """executes `code` on `targets` in blocking or nonblocking manner.
474 """Executes `code` on `targets` in blocking or nonblocking manner.
428 475
429 476 Parameters
430 477 ----------
@@ -434,7 +481,8 b' class Client(object):'
434 481 the engines on which to execute
435 482 default : all
436 483 block : bool
437 whether or not to wait until done
484 whether or not to wait until done to return
485 default: self.block
438 486 """
439 487 # block = self.block if block is None else block
440 488 # saveblock = self.block
@@ -444,7 +492,7 b' class Client(object):'
444 492 return result
445 493
446 494 def run(self, code, block=None):
447 """runs `code` on an engine.
495 """Runs `code` on an engine.
448 496
449 497 Calls to this are load-balanced.
450 498
@@ -459,11 +507,96 b' class Client(object):'
459 507 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
460 508 return result
461 509
510 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
511 after=None, follow=None):
512 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
513
514 This is the central execution command for the client.
515
516 Parameters
517 ----------
518
519 f : function
520 The fuction to be called remotely
521 args : tuple/list
522 The positional arguments passed to `f`
523 kwargs : dict
524 The keyword arguments passed to `f`
525 bound : bool (default: True)
526 Whether to execute in the Engine(s) namespace, or in a clean
527 namespace not affecting the engine.
528 block : bool (default: self.block)
529 Whether to wait for the result, or return immediately.
530 False:
531 returns msg_id(s)
532 if multiple targets:
533 list of ids
534 True:
535 returns actual result(s) of f(*args, **kwargs)
536 if multiple targets:
537 dict of results, by engine ID
538 targets : int,list of ints, 'all', None
539 Specify the destination of the job.
540 if None:
541 Submit via Task queue for load-balancing.
542 if 'all':
543 Run on all active engines
544 if list:
545 Run on each specified engine
546 if int:
547 Run on single engine
548
549 after : Dependency or collection of msg_ids
550 Only for load-balanced execution (targets=None)
551 Specify a list of msg_ids as a time-based dependency.
552 This job will only be run *after* the dependencies
553 have been met.
554
555 follow : Dependency or collection of msg_ids
556 Only for load-balanced execution (targets=None)
557 Specify a list of msg_ids as a location-based dependency.
558 This job will only be run on an engine where this dependency
559 is met.
560
561 Returns
562 -------
563 if block is False:
564 if single target:
565 return msg_id
566 else:
567 return list of msg_ids
568 ? (should this be dict like block=True) ?
569 else:
570 if single target:
571 return result of f(*args, **kwargs)
572 else:
573 return dict of results, keyed by engine
574 """
575
576 # defaults:
577 block = block if block is not None else self.block
578 args = args if args is not None else []
579 kwargs = kwargs if kwargs is not None else {}
580
581 # enforce types of f,args,kwrags
582 if not callable(f):
583 raise TypeError("f must be callable, not %s"%type(f))
584 if not isinstance(args, (tuple, list)):
585 raise TypeError("args must be tuple or list, not %s"%type(args))
586 if not isinstance(kwargs, dict):
587 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
588
589 options = dict(bound=bound, block=block, after=after, follow=follow)
590
591 if targets is None:
592 return self._apply_balanced(f, args, kwargs, **options)
593 else:
594 return self._apply_direct(f, args, kwargs, targets=targets, **options)
595
462 596 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
463 597 after=None, follow=None):
464 """the underlying method for applying functions in a load balanced
465 manner."""
466 block = block if block is not None else self.block
598 """The underlying method for applying functions in a load balanced
599 manner, via the task queue."""
467 600 if isinstance(after, Dependency):
468 601 after = after.as_dict()
469 602 elif after is None:
@@ -476,7 +609,7 b' class Client(object):'
476 609
477 610 bufs = ss.pack_apply_message(f,args,kwargs)
478 611 content = dict(bound=bound)
479 msg = self.session.send(self.task_socket, "apply_request",
612 msg = self.session.send(self._task_socket, "apply_request",
480 613 content=content, buffers=bufs, subheader=subheader)
481 614 msg_id = msg['msg_id']
482 615 self.outstanding.add(msg_id)
@@ -489,9 +622,8 b' class Client(object):'
489 622
490 623 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
491 624 after=None, follow=None):
492 """Then underlying method for applying functions to specific engines."""
493
494 block = block if block is not None else self.block
625 """Then underlying method for applying functions to specific engines
626 via the MUX queue."""
495 627
496 628 queues,targets = self._build_targets(targets)
497 629 bufs = ss.pack_apply_message(f,args,kwargs)
@@ -507,7 +639,7 b' class Client(object):'
507 639 content = dict(bound=bound)
508 640 msg_ids = []
509 641 for queue in queues:
510 msg = self.session.send(self.queue_socket, "apply_request",
642 msg = self.session.send(self._mux_socket, "apply_request",
511 643 content=content, buffers=bufs,ident=queue, subheader=subheader)
512 644 msg_id = msg['msg_id']
513 645 self.outstanding.add(msg_id)
@@ -528,115 +660,145 b' class Client(object):'
528 660 result[target] = self.results[mid]
529 661 return result
530 662
531 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
532 after=None, follow=None):
533 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
534
535 if self.block is False:
536 returns msg_id or list of msg_ids
537 else:
538 returns actual result of f(*args, **kwargs)
539 """
540 # enforce types of f,args,kwrags
541 args = args if args is not None else []
542 kwargs = kwargs if kwargs is not None else {}
543 if not callable(f):
544 raise TypeError("f must be callable, not %s"%type(f))
545 if not isinstance(args, (tuple, list)):
546 raise TypeError("args must be tuple or list, not %s"%type(args))
547 if not isinstance(kwargs, dict):
548 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
549
550 options = dict(bound=bound, block=block, after=after, follow=follow)
551
552 if targets is None:
553 return self._apply_balanced(f, args, kwargs, **options)
554 else:
555 return self._apply_direct(f, args, kwargs, targets=targets, **options)
663 #--------------------------------------------------------------------------
664 # Data movement
665 #--------------------------------------------------------------------------
556 666
667 @defaultblock
557 668 def push(self, ns, targets=None, block=None):
558 """push the contents of `ns` into the namespace on `target`"""
669 """Push the contents of `ns` into the namespace on `target`"""
559 670 if not isinstance(ns, dict):
560 671 raise TypeError("Must be a dict, not %s"%type(ns))
561 result = self.apply(_push, (ns,), targets=targets, block=block,bound=True)
672 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
562 673 return result
563 674
564 @spinfirst
675 @defaultblock
565 676 def pull(self, keys, targets=None, block=True):
566 """pull objects from `target`'s namespace by `keys`"""
567
677 """Pull objects from `target`'s namespace by `keys`"""
678 if isinstance(keys, str):
679 pass
680 elif isistance(keys, (list,tuple,set)):
681 for key in keys:
682 if not isinstance(key, str):
683 raise TypeError
568 684 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
569 685 return result
570 686
571 def barrier(self, msg_ids=None, timeout=-1):
572 """waits on one or more `msg_ids`, for up to `timeout` seconds.
687 #--------------------------------------------------------------------------
688 # Query methods
689 #--------------------------------------------------------------------------
690
691 @spinfirst
692 def get_results(self, msg_ids, status_only=False):
693 """Returns the result of the execute or task request with `msg_ids`.
573 694
574 695 Parameters
575 696 ----------
576 msg_ids : int, str, or list of ints and/or strs
577 ints are indices to self.history
578 strs are msg_ids
579 default: wait on all outstanding messages
580 timeout : float
581 a time in seconds, after which to give up.
582 default is -1, which means no timeout
583
584 Returns
585 -------
586 True : when all msg_ids are done
587 False : timeout reached, msg_ids still outstanding
697 msg_ids : list of ints or msg_ids
698 if int:
699 Passed as index to self.history for convenience.
700 status_only : bool (default: False)
701 if False:
702 return the actual results
588 703 """
589 tic = time.time()
590 if msg_ids is None:
591 theids = self.outstanding
592 else:
593 if isinstance(msg_ids, (int, str)):
594 msg_ids = [msg_ids]
595 theids = set()
596 for msg_id in msg_ids:
597 if isinstance(msg_id, int):
598 msg_id = self.history[msg_id]
599 theids.add(msg_id)
600 self.spin()
601 while theids.intersection(self.outstanding):
602 if timeout >= 0 and ( time.time()-tic ) > timeout:
603 break
604 time.sleep(1e-3)
605 self.spin()
606 return len(theids.intersection(self.outstanding)) == 0
607
608 @spinfirst
609 def get_results(self, msg_ids,status_only=False):
610 """returns the result of the execute or task request with `msg_id`"""
611 704 if not isinstance(msg_ids, (list,tuple)):
612 705 msg_ids = [msg_ids]
613 706 theids = []
614 707 for msg_id in msg_ids:
615 708 if isinstance(msg_id, int):
616 709 msg_id = self.history[msg_id]
710 if not isinstance(msg_id, str):
711 raise TypeError("msg_ids must be str, not %r"%msg_id)
617 712 theids.append(msg_id)
618 713
619 content = dict(msg_ids=theids, status_only=status_only)
620 msg = self.session.send(self.query_socket, "result_request", content=content)
621 zmq.select([self.query_socket], [], [])
622 idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK)
714 completed = []
715 local_results = {}
716 for msg_id in list(theids):
717 if msg_id in self.results:
718 completed.append(msg_id)
719 local_results[msg_id] = self.results[msg_id]
720 theids.remove(msg_id)
721
722 if msg_ids: # some not locally cached
723 content = dict(msg_ids=theids, status_only=status_only)
724 msg = self.session.send(self._query_socket, "result_request", content=content)
725 zmq.select([self._query_socket], [], [])
726 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
727 if self.debug:
728 pprint(msg)
729 content = msg['content']
730 if content['status'] != 'ok':
731 raise ss.unwrap_exception(content)
732 else:
733 content = dict(completed=[],pending=[])
734 if not status_only:
735 # load cached results into result:
736 content['completed'].extend(completed)
737 content.update(local_results)
738 # update cache with results:
739 for msg_id in msg_ids:
740 if msg_id in content['completed']:
741 self.results[msg_id] = content[msg_id]
742 return content
743
744 @spinfirst
745 def queue_status(self, targets=None, verbose=False):
746 """Fetch the status of engine queues.
747
748 Parameters
749 ----------
750 targets : int/str/list of ints/strs
751 the engines on which to execute
752 default : all
753 verbose : bool
754 whether to return lengths only, or lists of ids for each element
755 """
756 targets = self._build_targets(targets)[1]
757 content = dict(targets=targets, verbose=verbose)
758 self.session.send(self._query_socket, "queue_request", content=content)
759 idents,msg = self.session.recv(self._query_socket, 0)
623 760 if self.debug:
624 761 pprint(msg)
762 content = msg['content']
763 status = content.pop('status')
764 if status != 'ok':
765 raise ss.unwrap_exception(content)
766 return content
767
768 @spinfirst
769 def purge_results(self, msg_ids=[], targets=[]):
770 """Tell the controller to forget results.
625 771
626 # while True:
627 # try:
628 # except zmq.ZMQError:
629 # time.sleep(1e-3)
630 # continue
631 # else:
632 # break
633 return msg['content']
772 Individual results can be purged by msg_id, or the entire
773 history of specific targets can
774
775 Parameters
776 ----------
777 targets : int/str/list of ints/strs
778 the targets
779 default : None
780 """
781 if not targets and not msg_ids:
782 raise ValueError
783 if targets:
784 targets = self._build_targets(targets)[1]
785 content = dict(targets=targets, msg_ids=msg_ids)
786 self.session.send(self._query_socket, "purge_request", content=content)
787 idents, msg = self.session.recv(self._query_socket, 0)
788 if self.debug:
789 pprint(msg)
790 content = msg['content']
791 if content['status'] != 'ok':
792 raise ss.unwrap_exception(content)
634 793
635 794 class AsynClient(Client):
636 """An Asynchronous client, using the Tornado Event Loop"""
795 """An Asynchronous client, using the Tornado Event Loop.
796 !!!unfinished!!!"""
637 797 io_loop = None
638 queue_stream = None
639 notifier_stream = None
798 _queue_stream = None
799 _notifier_stream = None
800 _task_stream = None
801 _control_stream = None
640 802
641 803 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
642 804 Client.__init__(self, addr, context, username, debug)
@@ -644,10 +806,10 b' class AsynClient(Client):'
644 806 io_loop = ioloop.IOLoop.instance()
645 807 self.io_loop = io_loop
646 808
647 self.queue_stream = zmqstream.ZMQStream(self.queue_socket, io_loop)
648 self.control_stream = zmqstream.ZMQStream(self.control_socket, io_loop)
649 self.task_stream = zmqstream.ZMQStream(self.task_socket, io_loop)
650 self.notification_stream = zmqstream.ZMQStream(self.notification_socket, io_loop)
809 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
810 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
811 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
812 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
651 813
652 814 def spin(self):
653 815 for stream in (self.queue_stream, self.notifier_stream,
General Comments 0
You need to be logged in to leave comments. Login now