##// END OF EJS Templates
major cleanup of client code + purge_request implemented
MinRK -
Show More
This diff has been collapsed as it changes many lines, (548 lines changed) Show them Hide them
@@ -1,656 +1,818
1 1 """A semi-synchronous Client for the ZMQ controller"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from __future__ import print_function
14 14
15 15 import time
16 16 from pprint import pprint
17 17
18 18 import zmq
19 19 from zmq.eventloop import ioloop, zmqstream
20 20
21 21 from IPython.external.decorator import decorator
22 22
23 23 import streamsession as ss
24 24 from remotenamespace import RemoteNamespace
25 25 from view import DirectView
26 26 from dependency import Dependency, depend, require
27 27
28 28 def _push(ns):
29 29 globals().update(ns)
30 30
31 31 def _pull(keys):
32 32 g = globals()
33 if isinstance(keys, (list,tuple)):
33 if isinstance(keys, (list,tuple, set)):
34 34 return map(g.get, keys)
35 35 else:
36 36 return g.get(keys)
37 37
38 38 def _clear():
39 39 globals().clear()
40 40
41 41 def execute(code):
42 42 exec code in globals()
43 43
44 # decorators for methods:
44 #--------------------------------------------------------------------------
45 # Decorators for Client methods
46 #--------------------------------------------------------------------------
47
45 48 @decorator
46 49 def spinfirst(f,self,*args,**kwargs):
50 """Call spin() to sync state prior to calling the method."""
47 51 self.spin()
48 52 return f(self, *args, **kwargs)
49 53
50 54 @decorator
51 55 def defaultblock(f, self, *args, **kwargs):
56 """Default to self.block; preserve self.block."""
52 57 block = kwargs.get('block',None)
53 58 block = self.block if block is None else block
54 59 saveblock = self.block
55 60 self.block = block
56 61 ret = f(self, *args, **kwargs)
57 62 self.block = saveblock
58 63 return ret
59 64
65 #--------------------------------------------------------------------------
66 # Classes
67 #--------------------------------------------------------------------------
68
69
60 70 class AbortedTask(object):
71 """A basic wrapper object describing an aborted task."""
61 72 def __init__(self, msg_id):
62 73 self.msg_id = msg_id
63 # @decorator
64 # def checktargets(f):
65 # @wraps(f)
66 # def checked_method(self, *args, **kwargs):
67 # self._build_targets(kwargs['targets'])
68 # return f(self, *args, **kwargs)
69 # return checked_method
70 74
75 class ControllerError(Exception):
76 def __init__(self, etype, evalue, tb):
77 self.etype = etype
78 self.evalue = evalue
79 self.traceback=tb
71 80
72 # class _ZMQEventLoopThread(threading.Thread):
73 #
74 # def __init__(self, loop):
75 # self.loop = loop
76 # threading.Thread.__init__(self)
77 #
78 # def run(self):
79 # self.loop.start()
80 #
81 81 class Client(object):
82 82 """A semi-synchronous client to the IPython ZMQ controller
83 83
84 Parameters
85 ----------
86
87 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101
88 The address of the controller's registration socket.
89
90
84 91 Attributes
85 92 ----------
86 ids : set
87 a set of engine IDs
93 ids : set of int engine IDs
88 94 requesting the ids attribute always synchronizes
89 95 the registration state. To request ids without synchronization,
90 use _ids
96 use semi-private _ids.
91 97
92 98 history : list of msg_ids
93 99 a list of msg_ids, keeping track of all the execution
94 messages you have submitted
100 messages you have submitted in order.
95 101
96 102 outstanding : set of msg_ids
97 103 a set of msg_ids that have been submitted, but whose
98 results have not been received
104 results have not yet been received.
99 105
100 106 results : dict
101 107 a dict of all our results, keyed by msg_id
102 108
103 109 block : bool
104 110 determines default behavior when block not specified
105 111 in execution methods
106 112
107 113 Methods
108 114 -------
109 115 spin : flushes incoming results and registration state changes
110 116 control methods spin, and requesting `ids` also ensures up to date
111 117
112 118 barrier : wait on one or more msg_ids
113 119
114 execution methods: apply/apply_bound/apply_to
120 execution methods: apply/apply_bound/apply_to/applu_bount
115 121 legacy: execute, run
116 122
117 query methods: queue_status, get_result
123 query methods: queue_status, get_result, purge
118 124
119 125 control methods: abort, kill
120 126
121
122
123 127 """
124 128
125 129
126 130 _connected=False
127 131 _engines=None
128 registration_socket=None
129 query_socket=None
130 control_socket=None
131 notification_socket=None
132 queue_socket=None
133 task_socket=None
132 _addr='tcp://127.0.0.1:10101'
133 _registration_socket=None
134 _query_socket=None
135 _control_socket=None
136 _notification_socket=None
137 _mux_socket=None
138 _task_socket=None
134 139 block = False
135 140 outstanding=None
136 141 results = None
137 142 history = None
138 143 debug = False
139 144
140 def __init__(self, addr, context=None, username=None, debug=False):
145 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False):
141 146 if context is None:
142 147 context = zmq.Context()
143 148 self.context = context
144 self.addr = addr
149 self._addr = addr
145 150 if username is None:
146 151 self.session = ss.StreamSession()
147 152 else:
148 153 self.session = ss.StreamSession(username)
149 self.registration_socket = self.context.socket(zmq.PAIR)
150 self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
151 self.registration_socket.connect(addr)
154 self._registration_socket = self.context.socket(zmq.PAIR)
155 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
156 self._registration_socket.connect(addr)
152 157 self._engines = {}
153 158 self._ids = set()
154 159 self.outstanding=set()
155 160 self.results = {}
156 161 self.history = []
157 162 self.debug = debug
158 163 self.session.debug = debug
159 164
160 165 self._notification_handlers = {'registration_notification' : self._register_engine,
161 166 'unregistration_notification' : self._unregister_engine,
162 167 }
163 168 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
164 169 'apply_reply' : self._handle_apply_reply}
165 170 self._connect()
166 171
167 172
168 173 @property
169 174 def ids(self):
175 """Always up to date ids property."""
170 176 self._flush_notifications()
171 177 return self._ids
172 178
173 179 def _update_engines(self, engines):
180 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
174 181 for k,v in engines.iteritems():
175 182 eid = int(k)
176 183 self._engines[eid] = bytes(v) # force not unicode
177 184 self._ids.add(eid)
178 185
179 186 def _build_targets(self, targets):
187 """Turn valid target IDs or 'all' into two lists:
188 (int_ids, uuids).
189 """
180 190 if targets is None:
181 191 targets = self._ids
182 192 elif isinstance(targets, str):
183 193 if targets.lower() == 'all':
184 194 targets = self._ids
185 195 else:
186 196 raise TypeError("%r not valid str target, must be 'all'"%(targets))
187 197 elif isinstance(targets, int):
188 198 targets = [targets]
189 199 return [self._engines[t] for t in targets], list(targets)
190 200
191 201 def _connect(self):
192 """setup all our socket connections to the controller"""
202 """setup all our socket connections to the controller. This is called from
203 __init__."""
193 204 if self._connected:
194 205 return
195 206 self._connected=True
196 self.session.send(self.registration_socket, 'connection_request')
197 idents,msg = self.session.recv(self.registration_socket,mode=0)
207 self.session.send(self._registration_socket, 'connection_request')
208 idents,msg = self.session.recv(self._registration_socket,mode=0)
198 209 if self.debug:
199 210 pprint(msg)
200 211 msg = ss.Message(msg)
201 212 content = msg.content
202 213 if content.status == 'ok':
203 214 if content.queue:
204 self.queue_socket = self.context.socket(zmq.PAIR)
205 self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session)
206 self.queue_socket.connect(content.queue)
215 self._mux_socket = self.context.socket(zmq.PAIR)
216 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
217 self._mux_socket.connect(content.queue)
207 218 if content.task:
208 self.task_socket = self.context.socket(zmq.PAIR)
209 self.task_socket.setsockopt(zmq.IDENTITY, self.session.session)
210 self.task_socket.connect(content.task)
219 self._task_socket = self.context.socket(zmq.PAIR)
220 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
221 self._task_socket.connect(content.task)
211 222 if content.notification:
212 self.notification_socket = self.context.socket(zmq.SUB)
213 self.notification_socket.connect(content.notification)
214 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
223 self._notification_socket = self.context.socket(zmq.SUB)
224 self._notification_socket.connect(content.notification)
225 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
215 226 if content.query:
216 self.query_socket = self.context.socket(zmq.PAIR)
217 self.query_socket.setsockopt(zmq.IDENTITY, self.session.session)
218 self.query_socket.connect(content.query)
227 self._query_socket = self.context.socket(zmq.PAIR)
228 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
229 self._query_socket.connect(content.query)
219 230 if content.control:
220 self.control_socket = self.context.socket(zmq.PAIR)
221 self.control_socket.setsockopt(zmq.IDENTITY, self.session.session)
222 self.control_socket.connect(content.control)
231 self._control_socket = self.context.socket(zmq.PAIR)
232 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
233 self._control_socket.connect(content.control)
223 234 self._update_engines(dict(content.engines))
224 235
225 236 else:
226 237 self._connected = False
227 238 raise Exception("Failed to connect!")
228 239
229 #### handlers and callbacks for incoming messages #######
240 #--------------------------------------------------------------------------
241 # handlers and callbacks for incoming messages
242 #--------------------------------------------------------------------------
243
230 244 def _register_engine(self, msg):
245 """Register a new engine, and update our connection info."""
231 246 content = msg['content']
232 247 eid = content['id']
233 248 d = {eid : content['queue']}
234 249 self._update_engines(d)
235 250 self._ids.add(int(eid))
236 251
237 252 def _unregister_engine(self, msg):
238 # print 'unregister',msg
253 """Unregister an engine that has died."""
239 254 content = msg['content']
240 255 eid = int(content['id'])
241 256 if eid in self._ids:
242 257 self._ids.remove(eid)
243 258 self._engines.pop(eid)
244 259
245 260 def _handle_execute_reply(self, msg):
246 # msg_id = msg['msg_id']
261 """Save the reply to an execute_request into our results."""
247 262 parent = msg['parent_header']
248 263 msg_id = parent['msg_id']
249 264 if msg_id not in self.outstanding:
250 265 print("got unknown result: %s"%msg_id)
251 266 else:
252 267 self.outstanding.remove(msg_id)
253 268 self.results[msg_id] = ss.unwrap_exception(msg['content'])
254 269
255 270 def _handle_apply_reply(self, msg):
256 # pprint(msg)
257 # msg_id = msg['msg_id']
271 """Save the reply to an apply_request into our results."""
258 272 parent = msg['parent_header']
259 273 msg_id = parent['msg_id']
260 274 if msg_id not in self.outstanding:
261 275 print ("got unknown result: %s"%msg_id)
262 276 else:
263 277 self.outstanding.remove(msg_id)
264 278 content = msg['content']
265 279 if content['status'] == 'ok':
266 280 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
267 281 elif content['status'] == 'aborted':
268 282 self.results[msg_id] = AbortedTask(msg_id)
269 283 elif content['status'] == 'resubmitted':
270 284 pass # handle resubmission
271 285 else:
272 286 self.results[msg_id] = ss.unwrap_exception(content)
273 287
274 288 def _flush_notifications(self):
275 "flush incoming notifications of engine registrations"
276 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
289 """Flush notifications of engine registrations waiting
290 in ZMQ queue."""
291 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
277 292 while msg is not None:
278 293 if self.debug:
279 294 pprint(msg)
280 295 msg = msg[-1]
281 296 msg_type = msg['msg_type']
282 297 handler = self._notification_handlers.get(msg_type, None)
283 298 if handler is None:
284 299 raise Exception("Unhandled message type: %s"%msg.msg_type)
285 300 else:
286 301 handler(msg)
287 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
302 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
288 303
289 304 def _flush_results(self, sock):
290 "flush incoming task or queue results"
305 """Flush task or queue results waiting in ZMQ queue."""
291 306 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
292 307 while msg is not None:
293 308 if self.debug:
294 309 pprint(msg)
295 310 msg = msg[-1]
296 311 msg_type = msg['msg_type']
297 312 handler = self._queue_handlers.get(msg_type, None)
298 313 if handler is None:
299 314 raise Exception("Unhandled message type: %s"%msg.msg_type)
300 315 else:
301 316 handler(msg)
302 317 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
303 318
304 319 def _flush_control(self, sock):
305 "flush incoming control replies"
320 """Flush replies from the control channel waiting
321 in the ZMQ queue."""
306 322 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
307 323 while msg is not None:
308 324 if self.debug:
309 325 pprint(msg)
310 326 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
311 327
312 ###### get/setitem ########
328 #--------------------------------------------------------------------------
329 # getitem
330 #--------------------------------------------------------------------------
313 331
314 332 def __getitem__(self, key):
333 """Dict access returns DirectView multiplexer objects."""
315 334 if isinstance(key, int):
316 335 if key not in self.ids:
317 336 raise IndexError("No such engine: %i"%key)
318 337 return DirectView(self, key)
319 338
320 339 if isinstance(key, slice):
321 340 indices = range(len(self.ids))[key]
322 341 ids = sorted(self._ids)
323 342 key = [ ids[i] for i in indices ]
324 343 # newkeys = sorted(self._ids)[thekeys[k]]
325 344
326 345 if isinstance(key, (tuple, list, xrange)):
327 346 _,targets = self._build_targets(list(key))
328 347 return DirectView(self, targets)
329 348 else:
330 349 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
331 350
332 ############ begin real methods #############
351 #--------------------------------------------------------------------------
352 # Begin public methods
353 #--------------------------------------------------------------------------
333 354
334 355 def spin(self):
335 """flush incoming notifications and execution results."""
336 if self.notification_socket:
356 """Flush any registration notifications and execution results
357 waiting in the ZMQ queue.
358 """
359 if self._notification_socket:
337 360 self._flush_notifications()
338 if self.queue_socket:
339 self._flush_results(self.queue_socket)
340 if self.task_socket:
341 self._flush_results(self.task_socket)
342 if self.control_socket:
343 self._flush_control(self.control_socket)
361 if self._mux_socket:
362 self._flush_results(self._mux_socket)
363 if self._task_socket:
364 self._flush_results(self._task_socket)
365 if self._control_socket:
366 self._flush_control(self._control_socket)
344 367
345 @spinfirst
346 def queue_status(self, targets=None, verbose=False):
347 """fetch the status of engine queues
368 def barrier(self, msg_ids=None, timeout=-1):
369 """waits on one or more `msg_ids`, for up to `timeout` seconds.
348 370
349 371 Parameters
350 372 ----------
351 targets : int/str/list of ints/strs
352 the engines on which to execute
353 default : all
354 verbose : bool
355 whether to return lengths only, or lists of ids for each element
373 msg_ids : int, str, or list of ints and/or strs
374 ints are indices to self.history
375 strs are msg_ids
376 default: wait on all outstanding messages
377 timeout : float
378 a time in seconds, after which to give up.
379 default is -1, which means no timeout
356 380
381 Returns
382 -------
383 True : when all msg_ids are done
384 False : timeout reached, some msg_ids still outstanding
357 385 """
358 targets = self._build_targets(targets)[1]
359 content = dict(targets=targets)
360 self.session.send(self.query_socket, "queue_request", content=content)
361 idents,msg = self.session.recv(self.query_socket, 0)
362 if self.debug:
363 pprint(msg)
364 return msg['content']
386 tic = time.time()
387 if msg_ids is None:
388 theids = self.outstanding
389 else:
390 if isinstance(msg_ids, (int, str)):
391 msg_ids = [msg_ids]
392 theids = set()
393 for msg_id in msg_ids:
394 if isinstance(msg_id, int):
395 msg_id = self.history[msg_id]
396 theids.add(msg_id)
397 self.spin()
398 while theids.intersection(self.outstanding):
399 if timeout >= 0 and ( time.time()-tic ) > timeout:
400 break
401 time.sleep(1e-3)
402 self.spin()
403 return len(theids.intersection(self.outstanding)) == 0
404
405 #--------------------------------------------------------------------------
406 # Control methods
407 #--------------------------------------------------------------------------
365 408
366 409 @spinfirst
367 410 @defaultblock
368 411 def clear(self, targets=None, block=None):
369 """clear the namespace in target(s)"""
412 """Clear the namespace in target(s)."""
370 413 targets = self._build_targets(targets)[0]
371 414 for t in targets:
372 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
415 self.session.send(self._control_socket, 'clear_request', content={},ident=t)
373 416 error = False
374 417 if self.block:
375 418 for i in range(len(targets)):
376 idents,msg = self.session.recv(self.control_socket,0)
419 idents,msg = self.session.recv(self._control_socket,0)
377 420 if self.debug:
378 421 pprint(msg)
379 422 if msg['content']['status'] != 'ok':
380 423 error = msg['content']
381 424 if error:
382 425 return error
383 426
384 427
385 428 @spinfirst
386 429 @defaultblock
387 430 def abort(self, msg_ids = None, targets=None, block=None):
388 """abort the Queues of target(s)"""
431 """Abort the execution queues of target(s)."""
389 432 targets = self._build_targets(targets)[0]
390 433 if isinstance(msg_ids, basestring):
391 434 msg_ids = [msg_ids]
392 435 content = dict(msg_ids=msg_ids)
393 436 for t in targets:
394 self.session.send(self.control_socket, 'abort_request',
437 self.session.send(self._control_socket, 'abort_request',
395 438 content=content, ident=t)
396 439 error = False
397 440 if self.block:
398 441 for i in range(len(targets)):
399 idents,msg = self.session.recv(self.control_socket,0)
442 idents,msg = self.session.recv(self._control_socket,0)
400 443 if self.debug:
401 444 pprint(msg)
402 445 if msg['content']['status'] != 'ok':
403 446 error = msg['content']
404 447 if error:
405 448 return error
406 449
407 450 @spinfirst
408 451 @defaultblock
409 452 def kill(self, targets=None, block=None):
410 453 """Terminates one or more engine processes."""
411 454 targets = self._build_targets(targets)[0]
412 455 for t in targets:
413 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
456 self.session.send(self._control_socket, 'kill_request', content={},ident=t)
414 457 error = False
415 458 if self.block:
416 459 for i in range(len(targets)):
417 idents,msg = self.session.recv(self.control_socket,0)
460 idents,msg = self.session.recv(self._control_socket,0)
418 461 if self.debug:
419 462 pprint(msg)
420 463 if msg['content']['status'] != 'ok':
421 464 error = msg['content']
422 465 if error:
423 466 return error
424 467
468 #--------------------------------------------------------------------------
469 # Execution methods
470 #--------------------------------------------------------------------------
471
425 472 @defaultblock
426 473 def execute(self, code, targets='all', block=None):
427 """executes `code` on `targets` in blocking or nonblocking manner.
474 """Executes `code` on `targets` in blocking or nonblocking manner.
428 475
429 476 Parameters
430 477 ----------
431 478 code : str
432 479 the code string to be executed
433 480 targets : int/str/list of ints/strs
434 481 the engines on which to execute
435 482 default : all
436 483 block : bool
437 whether or not to wait until done
484 whether or not to wait until done to return
485 default: self.block
438 486 """
439 487 # block = self.block if block is None else block
440 488 # saveblock = self.block
441 489 # self.block = block
442 490 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
443 491 # self.block = saveblock
444 492 return result
445 493
446 494 def run(self, code, block=None):
447 """runs `code` on an engine.
495 """Runs `code` on an engine.
448 496
449 497 Calls to this are load-balanced.
450 498
451 499 Parameters
452 500 ----------
453 501 code : str
454 502 the code string to be executed
455 503 block : bool
456 504 whether or not to wait until done
457 505
458 506 """
459 507 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
460 508 return result
461 509
462 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
510 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
463 511 after=None, follow=None):
464 """the underlying method for applying functions in a load balanced
465 manner."""
512 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
513
514 This is the central execution command for the client.
515
516 Parameters
517 ----------
518
519 f : function
520 The fuction to be called remotely
521 args : tuple/list
522 The positional arguments passed to `f`
523 kwargs : dict
524 The keyword arguments passed to `f`
525 bound : bool (default: True)
526 Whether to execute in the Engine(s) namespace, or in a clean
527 namespace not affecting the engine.
528 block : bool (default: self.block)
529 Whether to wait for the result, or return immediately.
530 False:
531 returns msg_id(s)
532 if multiple targets:
533 list of ids
534 True:
535 returns actual result(s) of f(*args, **kwargs)
536 if multiple targets:
537 dict of results, by engine ID
538 targets : int,list of ints, 'all', None
539 Specify the destination of the job.
540 if None:
541 Submit via Task queue for load-balancing.
542 if 'all':
543 Run on all active engines
544 if list:
545 Run on each specified engine
546 if int:
547 Run on single engine
548
549 after : Dependency or collection of msg_ids
550 Only for load-balanced execution (targets=None)
551 Specify a list of msg_ids as a time-based dependency.
552 This job will only be run *after* the dependencies
553 have been met.
554
555 follow : Dependency or collection of msg_ids
556 Only for load-balanced execution (targets=None)
557 Specify a list of msg_ids as a location-based dependency.
558 This job will only be run on an engine where this dependency
559 is met.
560
561 Returns
562 -------
563 if block is False:
564 if single target:
565 return msg_id
566 else:
567 return list of msg_ids
568 ? (should this be dict like block=True) ?
569 else:
570 if single target:
571 return result of f(*args, **kwargs)
572 else:
573 return dict of results, keyed by engine
574 """
575
576 # defaults:
466 577 block = block if block is not None else self.block
578 args = args if args is not None else []
579 kwargs = kwargs if kwargs is not None else {}
580
581 # enforce types of f,args,kwrags
582 if not callable(f):
583 raise TypeError("f must be callable, not %s"%type(f))
584 if not isinstance(args, (tuple, list)):
585 raise TypeError("args must be tuple or list, not %s"%type(args))
586 if not isinstance(kwargs, dict):
587 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
588
589 options = dict(bound=bound, block=block, after=after, follow=follow)
590
591 if targets is None:
592 return self._apply_balanced(f, args, kwargs, **options)
593 else:
594 return self._apply_direct(f, args, kwargs, targets=targets, **options)
595
596 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
597 after=None, follow=None):
598 """The underlying method for applying functions in a load balanced
599 manner, via the task queue."""
467 600 if isinstance(after, Dependency):
468 601 after = after.as_dict()
469 602 elif after is None:
470 603 after = []
471 604 if isinstance(follow, Dependency):
472 605 follow = follow.as_dict()
473 606 elif follow is None:
474 607 follow = []
475 608 subheader = dict(after=after, follow=follow)
476 609
477 610 bufs = ss.pack_apply_message(f,args,kwargs)
478 611 content = dict(bound=bound)
479 msg = self.session.send(self.task_socket, "apply_request",
612 msg = self.session.send(self._task_socket, "apply_request",
480 613 content=content, buffers=bufs, subheader=subheader)
481 614 msg_id = msg['msg_id']
482 615 self.outstanding.add(msg_id)
483 616 self.history.append(msg_id)
484 617 if block:
485 618 self.barrier(msg_id)
486 619 return self.results[msg_id]
487 620 else:
488 621 return msg_id
489 622
490 623 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
491 624 after=None, follow=None):
492 """Then underlying method for applying functions to specific engines."""
493
494 block = block if block is not None else self.block
625 """Then underlying method for applying functions to specific engines
626 via the MUX queue."""
495 627
496 628 queues,targets = self._build_targets(targets)
497 629 bufs = ss.pack_apply_message(f,args,kwargs)
498 630 if isinstance(after, Dependency):
499 631 after = after.as_dict()
500 632 elif after is None:
501 633 after = []
502 634 if isinstance(follow, Dependency):
503 635 follow = follow.as_dict()
504 636 elif follow is None:
505 637 follow = []
506 638 subheader = dict(after=after, follow=follow)
507 639 content = dict(bound=bound)
508 640 msg_ids = []
509 641 for queue in queues:
510 msg = self.session.send(self.queue_socket, "apply_request",
642 msg = self.session.send(self._mux_socket, "apply_request",
511 643 content=content, buffers=bufs,ident=queue, subheader=subheader)
512 644 msg_id = msg['msg_id']
513 645 self.outstanding.add(msg_id)
514 646 self.history.append(msg_id)
515 647 msg_ids.append(msg_id)
516 648 if block:
517 649 self.barrier(msg_ids)
518 650 else:
519 651 if len(msg_ids) == 1:
520 652 return msg_ids[0]
521 653 else:
522 654 return msg_ids
523 655 if len(msg_ids) == 1:
524 656 return self.results[msg_ids[0]]
525 657 else:
526 658 result = {}
527 659 for target,mid in zip(targets, msg_ids):
528 660 result[target] = self.results[mid]
529 661 return result
530 662
531 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
532 after=None, follow=None):
533 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
534
535 if self.block is False:
536 returns msg_id or list of msg_ids
537 else:
538 returns actual result of f(*args, **kwargs)
539 """
540 # enforce types of f,args,kwrags
541 args = args if args is not None else []
542 kwargs = kwargs if kwargs is not None else {}
543 if not callable(f):
544 raise TypeError("f must be callable, not %s"%type(f))
545 if not isinstance(args, (tuple, list)):
546 raise TypeError("args must be tuple or list, not %s"%type(args))
547 if not isinstance(kwargs, dict):
548 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
549
550 options = dict(bound=bound, block=block, after=after, follow=follow)
551
552 if targets is None:
553 return self._apply_balanced(f, args, kwargs, **options)
554 else:
555 return self._apply_direct(f, args, kwargs, targets=targets, **options)
663 #--------------------------------------------------------------------------
664 # Data movement
665 #--------------------------------------------------------------------------
556 666
667 @defaultblock
557 668 def push(self, ns, targets=None, block=None):
558 """push the contents of `ns` into the namespace on `target`"""
669 """Push the contents of `ns` into the namespace on `target`"""
559 670 if not isinstance(ns, dict):
560 671 raise TypeError("Must be a dict, not %s"%type(ns))
561 672 result = self.apply(_push, (ns,), targets=targets, block=block,bound=True)
562 673 return result
563 674
564 @spinfirst
675 @defaultblock
565 676 def pull(self, keys, targets=None, block=True):
566 """pull objects from `target`'s namespace by `keys`"""
567
677 """Pull objects from `target`'s namespace by `keys`"""
678 if isinstance(keys, str):
679 pass
680 elif isistance(keys, (list,tuple,set)):
681 for key in keys:
682 if not isinstance(key, str):
683 raise TypeError
568 684 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
569 685 return result
570 686
571 def barrier(self, msg_ids=None, timeout=-1):
572 """waits on one or more `msg_ids`, for up to `timeout` seconds.
687 #--------------------------------------------------------------------------
688 # Query methods
689 #--------------------------------------------------------------------------
690
691 @spinfirst
692 def get_results(self, msg_ids, status_only=False):
693 """Returns the result of the execute or task request with `msg_ids`.
573 694
574 695 Parameters
575 696 ----------
576 msg_ids : int, str, or list of ints and/or strs
577 ints are indices to self.history
578 strs are msg_ids
579 default: wait on all outstanding messages
580 timeout : float
581 a time in seconds, after which to give up.
582 default is -1, which means no timeout
583
584 Returns
585 -------
586 True : when all msg_ids are done
587 False : timeout reached, msg_ids still outstanding
697 msg_ids : list of ints or msg_ids
698 if int:
699 Passed as index to self.history for convenience.
700 status_only : bool (default: False)
701 if False:
702 return the actual results
588 703 """
589 tic = time.time()
590 if msg_ids is None:
591 theids = self.outstanding
592 else:
593 if isinstance(msg_ids, (int, str)):
594 msg_ids = [msg_ids]
595 theids = set()
596 for msg_id in msg_ids:
597 if isinstance(msg_id, int):
598 msg_id = self.history[msg_id]
599 theids.add(msg_id)
600 self.spin()
601 while theids.intersection(self.outstanding):
602 if timeout >= 0 and ( time.time()-tic ) > timeout:
603 break
604 time.sleep(1e-3)
605 self.spin()
606 return len(theids.intersection(self.outstanding)) == 0
607
608 @spinfirst
609 def get_results(self, msg_ids,status_only=False):
610 """returns the result of the execute or task request with `msg_id`"""
611 704 if not isinstance(msg_ids, (list,tuple)):
612 705 msg_ids = [msg_ids]
613 706 theids = []
614 707 for msg_id in msg_ids:
615 708 if isinstance(msg_id, int):
616 709 msg_id = self.history[msg_id]
710 if not isinstance(msg_id, str):
711 raise TypeError("msg_ids must be str, not %r"%msg_id)
617 712 theids.append(msg_id)
618 713
714 completed = []
715 local_results = {}
716 for msg_id in list(theids):
717 if msg_id in self.results:
718 completed.append(msg_id)
719 local_results[msg_id] = self.results[msg_id]
720 theids.remove(msg_id)
721
722 if msg_ids: # some not locally cached
619 723 content = dict(msg_ids=theids, status_only=status_only)
620 msg = self.session.send(self.query_socket, "result_request", content=content)
621 zmq.select([self.query_socket], [], [])
622 idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK)
724 msg = self.session.send(self._query_socket, "result_request", content=content)
725 zmq.select([self._query_socket], [], [])
726 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
623 727 if self.debug:
624 728 pprint(msg)
729 content = msg['content']
730 if content['status'] != 'ok':
731 raise ss.unwrap_exception(content)
732 else:
733 content = dict(completed=[],pending=[])
734 if not status_only:
735 # load cached results into result:
736 content['completed'].extend(completed)
737 content.update(local_results)
738 # update cache with results:
739 for msg_id in msg_ids:
740 if msg_id in content['completed']:
741 self.results[msg_id] = content[msg_id]
742 return content
743
744 @spinfirst
745 def queue_status(self, targets=None, verbose=False):
746 """Fetch the status of engine queues.
625 747
626 # while True:
627 # try:
628 # except zmq.ZMQError:
629 # time.sleep(1e-3)
630 # continue
631 # else:
632 # break
633 return msg['content']
748 Parameters
749 ----------
750 targets : int/str/list of ints/strs
751 the engines on which to execute
752 default : all
753 verbose : bool
754 whether to return lengths only, or lists of ids for each element
755 """
756 targets = self._build_targets(targets)[1]
757 content = dict(targets=targets, verbose=verbose)
758 self.session.send(self._query_socket, "queue_request", content=content)
759 idents,msg = self.session.recv(self._query_socket, 0)
760 if self.debug:
761 pprint(msg)
762 content = msg['content']
763 status = content.pop('status')
764 if status != 'ok':
765 raise ss.unwrap_exception(content)
766 return content
767
768 @spinfirst
769 def purge_results(self, msg_ids=[], targets=[]):
770 """Tell the controller to forget results.
771
772 Individual results can be purged by msg_id, or the entire
773 history of specific targets can
774
775 Parameters
776 ----------
777 targets : int/str/list of ints/strs
778 the targets
779 default : None
780 """
781 if not targets and not msg_ids:
782 raise ValueError
783 if targets:
784 targets = self._build_targets(targets)[1]
785 content = dict(targets=targets, msg_ids=msg_ids)
786 self.session.send(self._query_socket, "purge_request", content=content)
787 idents, msg = self.session.recv(self._query_socket, 0)
788 if self.debug:
789 pprint(msg)
790 content = msg['content']
791 if content['status'] != 'ok':
792 raise ss.unwrap_exception(content)
634 793
635 794 class AsynClient(Client):
636 """An Asynchronous client, using the Tornado Event Loop"""
795 """An Asynchronous client, using the Tornado Event Loop.
796 !!!unfinished!!!"""
637 797 io_loop = None
638 queue_stream = None
639 notifier_stream = None
798 _queue_stream = None
799 _notifier_stream = None
800 _task_stream = None
801 _control_stream = None
640 802
641 803 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
642 804 Client.__init__(self, addr, context, username, debug)
643 805 if io_loop is None:
644 806 io_loop = ioloop.IOLoop.instance()
645 807 self.io_loop = io_loop
646 808
647 self.queue_stream = zmqstream.ZMQStream(self.queue_socket, io_loop)
648 self.control_stream = zmqstream.ZMQStream(self.control_socket, io_loop)
649 self.task_stream = zmqstream.ZMQStream(self.task_socket, io_loop)
650 self.notification_stream = zmqstream.ZMQStream(self.notification_socket, io_loop)
809 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
810 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
811 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
812 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
651 813
652 814 def spin(self):
653 815 for stream in (self.queue_stream, self.notifier_stream,
654 816 self.task_stream, self.control_stream):
655 817 stream.flush()
656 818 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now