##// END OF EJS Templates
protect LBView.targets, AsyncResult._msg_ids -> .msg_ds
MinRK -
Show More
@@ -1,112 +1,112
1 1 """AsyncResult objects for the client"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import error
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Classes
17 17 #-----------------------------------------------------------------------------
18 18
19 19 class AsyncResult(object):
20 20 """Class for representing results of non-blocking calls.
21 21
22 22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
23 23 """
24 24 def __init__(self, client, msg_ids):
25 25 self._client = client
26 self._msg_ids = msg_ids
26 self.msg_ids = msg_ids
27 27 self._ready = False
28 28 self._success = None
29 29
30 30 def __repr__(self):
31 31 if self._ready:
32 32 return "<%s: finished>"%(self.__class__.__name__)
33 33 else:
34 return "<%s: %r>"%(self.__class__.__name__,self._msg_ids)
34 return "<%s: %r>"%(self.__class__.__name__,self.msg_ids)
35 35
36 36
37 37 def _reconstruct_result(self, res):
38 38 """
39 39 Override me in subclasses for turning a list of results
40 40 into the expected form.
41 41 """
42 42 if len(res) == 1:
43 43 return res[0]
44 44 else:
45 45 return res
46 46
47 47 def get(self, timeout=-1):
48 48 """Return the result when it arrives.
49 49
50 50 If `timeout` is not ``None`` and the result does not arrive within
51 51 `timeout` seconds then ``TimeoutError`` is raised. If the
52 52 remote call raised an exception then that exception will be reraised
53 53 by get().
54 54 """
55 55 if not self.ready():
56 56 self.wait(timeout)
57 57
58 58 if self._ready:
59 59 if self._success:
60 60 return self._result
61 61 else:
62 62 raise self._exception
63 63 else:
64 64 raise error.TimeoutError("Result not ready.")
65 65
66 66 def ready(self):
67 67 """Return whether the call has completed."""
68 68 if not self._ready:
69 69 self.wait(0)
70 70 return self._ready
71 71
72 72 def wait(self, timeout=-1):
73 73 """Wait until the result is available or until `timeout` seconds pass.
74 74 """
75 75 if self._ready:
76 76 return
77 self._ready = self._client.barrier(self._msg_ids, timeout)
77 self._ready = self._client.barrier(self.msg_ids, timeout)
78 78 if self._ready:
79 79 try:
80 results = map(self._client.results.get, self._msg_ids)
80 results = map(self._client.results.get, self.msg_ids)
81 81 results = error.collect_exceptions(results, 'get')
82 82 self._result = self._reconstruct_result(results)
83 83 except Exception, e:
84 84 self._exception = e
85 85 self._success = False
86 86 else:
87 87 self._success = True
88 88
89 89
90 90 def successful(self):
91 91 """Return whether the call completed without raising an exception.
92 92
93 93 Will raise ``AssertionError`` if the result is not ready.
94 94 """
95 95 assert self._ready
96 96 return self._success
97 97
98 98 class AsyncMapResult(AsyncResult):
99 99 """Class for representing results of non-blocking gathers.
100 100
101 101 This will properly reconstruct the gather.
102 102 """
103 103
104 104 def __init__(self, client, msg_ids, mapObject):
105 105 self._mapObject = mapObject
106 106 AsyncResult.__init__(self, client, msg_ids)
107 107
108 108 def _reconstruct_result(self, res):
109 109 """Perform the gather on the actual results."""
110 110 return self._mapObject.joinPartitions(res)
111 111
112 112
@@ -1,1053 +1,1053
1 1 """A semi-synchronous Client for the ZMQ controller"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import time
15 15 from getpass import getpass
16 16 from pprint import pprint
17 17
18 18 import zmq
19 19 from zmq.eventloop import ioloop, zmqstream
20 20
21 21 from IPython.external.decorator import decorator
22 22 from IPython.zmq import tunnel
23 23
24 24 import streamsession as ss
25 25 # from remotenamespace import RemoteNamespace
26 26 from view import DirectView, LoadBalancedView
27 27 from dependency import Dependency, depend, require
28 28 import error
29 29 import map as Map
30 30 from asyncresult import AsyncResult, AsyncMapResult
31 31 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
32 32
33 33 #--------------------------------------------------------------------------
34 34 # helpers for implementing old MEC API via client.apply
35 35 #--------------------------------------------------------------------------
36 36
37 37 def _push(ns):
38 38 """helper method for implementing `client.push` via `client.apply`"""
39 39 globals().update(ns)
40 40
41 41 def _pull(keys):
42 42 """helper method for implementing `client.pull` via `client.apply`"""
43 43 g = globals()
44 44 if isinstance(keys, (list,tuple, set)):
45 45 for key in keys:
46 46 if not g.has_key(key):
47 47 raise NameError("name '%s' is not defined"%key)
48 48 return map(g.get, keys)
49 49 else:
50 50 if not g.has_key(keys):
51 51 raise NameError("name '%s' is not defined"%keys)
52 52 return g.get(keys)
53 53
54 54 def _clear():
55 55 """helper method for implementing `client.clear` via `client.apply`"""
56 56 globals().clear()
57 57
58 58 def _execute(code):
59 59 """helper method for implementing `client.execute` via `client.apply`"""
60 60 exec code in globals()
61 61
62 62
63 63 #--------------------------------------------------------------------------
64 64 # Decorators for Client methods
65 65 #--------------------------------------------------------------------------
66 66
67 67 @decorator
68 68 def spinfirst(f, self, *args, **kwargs):
69 69 """Call spin() to sync state prior to calling the method."""
70 70 self.spin()
71 71 return f(self, *args, **kwargs)
72 72
73 73 @decorator
74 74 def defaultblock(f, self, *args, **kwargs):
75 75 """Default to self.block; preserve self.block."""
76 76 block = kwargs.get('block',None)
77 77 block = self.block if block is None else block
78 78 saveblock = self.block
79 79 self.block = block
80 80 try:
81 81 ret = f(self, *args, **kwargs)
82 82 finally:
83 83 self.block = saveblock
84 84 return ret
85 85
86 86
87 87 class AbortedTask(object):
88 88 """A basic wrapper object describing an aborted task."""
89 89 def __init__(self, msg_id):
90 90 self.msg_id = msg_id
91 91
92 92 class ResultDict(dict):
93 93 """A subclass of dict that raises errors if it has them."""
94 94 def __getitem__(self, key):
95 95 res = dict.__getitem__(self, key)
96 96 if isinstance(res, error.KernelError):
97 97 raise res
98 98 return res
99 99
100 100 class Client(object):
101 101 """A semi-synchronous client to the IPython ZMQ controller
102 102
103 103 Parameters
104 104 ----------
105 105
106 106 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
107 107 The address of the controller's registration socket.
108 108 [Default: 'tcp://127.0.0.1:10101']
109 109 context : zmq.Context
110 110 Pass an existing zmq.Context instance, otherwise the client will create its own
111 111 username : bytes
112 112 set username to be passed to the Session object
113 113 debug : bool
114 114 flag for lots of message printing for debug purposes
115 115
116 116 #-------------- ssh related args ----------------
117 117 # These are args for configuring the ssh tunnel to be used
118 118 # credentials are used to forward connections over ssh to the Controller
119 119 # Note that the ip given in `addr` needs to be relative to sshserver
120 120 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
121 121 # and set sshserver as the same machine the Controller is on. However,
122 122 # the only requirement is that sshserver is able to see the Controller
123 123 # (i.e. is within the same trusted network).
124 124
125 125 sshserver : str
126 126 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
127 127 If keyfile or password is specified, and this is not, it will default to
128 128 the ip given in addr.
129 129 sshkey : str; path to public ssh key file
130 130 This specifies a key to be used in ssh login, default None.
131 131 Regular default ssh keys will be used without specifying this argument.
132 132 password : str;
133 133 Your ssh password to sshserver. Note that if this is left None,
134 134 you will be prompted for it if passwordless key based login is unavailable.
135 135
136 136 #------- exec authentication args -------
137 137 # If even localhost is untrusted, you can have some protection against
138 138 # unauthorized execution by using a key. Messages are still sent
139 139 # as cleartext, so if someone can snoop your loopback traffic this will
140 140 # not help anything.
141 141
142 142 exec_key : str
143 143 an authentication key or file containing a key
144 144 default: None
145 145
146 146
147 147 Attributes
148 148 ----------
149 149 ids : set of int engine IDs
150 150 requesting the ids attribute always synchronizes
151 151 the registration state. To request ids without synchronization,
152 152 use semi-private _ids attributes.
153 153
154 154 history : list of msg_ids
155 155 a list of msg_ids, keeping track of all the execution
156 156 messages you have submitted in order.
157 157
158 158 outstanding : set of msg_ids
159 159 a set of msg_ids that have been submitted, but whose
160 160 results have not yet been received.
161 161
162 162 results : dict
163 163 a dict of all our results, keyed by msg_id
164 164
165 165 block : bool
166 166 determines default behavior when block not specified
167 167 in execution methods
168 168
169 169 Methods
170 170 -------
171 171 spin : flushes incoming results and registration state changes
172 172 control methods spin, and requesting `ids` also ensures up to date
173 173
174 174 barrier : wait on one or more msg_ids
175 175
176 176 execution methods: apply/apply_bound/apply_to/apply_bound
177 177 legacy: execute, run
178 178
179 179 query methods: queue_status, get_result, purge
180 180
181 181 control methods: abort, kill
182 182
183 183 """
184 184
185 185
186 186 _connected=False
187 187 _ssh=False
188 188 _engines=None
189 189 _addr='tcp://127.0.0.1:10101'
190 190 _registration_socket=None
191 191 _query_socket=None
192 192 _control_socket=None
193 193 _notification_socket=None
194 194 _mux_socket=None
195 195 _task_socket=None
196 196 block = False
197 197 outstanding=None
198 198 results = None
199 199 history = None
200 200 debug = False
201 201 targets = None
202 202
203 203 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
204 204 sshserver=None, sshkey=None, password=None, paramiko=None,
205 205 exec_key=None,):
206 206 if context is None:
207 207 context = zmq.Context()
208 208 self.context = context
209 209 self.targets = 'all'
210 210 self._addr = addr
211 211 self._ssh = bool(sshserver or sshkey or password)
212 212 if self._ssh and sshserver is None:
213 213 # default to the same
214 214 sshserver = addr.split('://')[1].split(':')[0]
215 215 if self._ssh and password is None:
216 216 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
217 217 password=False
218 218 else:
219 219 password = getpass("SSH Password for %s: "%sshserver)
220 220 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
221 221
222 222 if exec_key is not None and os.path.isfile(exec_key):
223 223 arg = 'keyfile'
224 224 else:
225 225 arg = 'key'
226 226 key_arg = {arg:exec_key}
227 227 if username is None:
228 228 self.session = ss.StreamSession(**key_arg)
229 229 else:
230 230 self.session = ss.StreamSession(username, **key_arg)
231 231 self._registration_socket = self.context.socket(zmq.XREQ)
232 232 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
233 233 if self._ssh:
234 234 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
235 235 else:
236 236 self._registration_socket.connect(addr)
237 237 self._engines = {}
238 238 self._ids = set()
239 239 self.outstanding=set()
240 240 self.results = {}
241 241 self.history = []
242 242 self.debug = debug
243 243 self.session.debug = debug
244 244
245 245 self._notification_handlers = {'registration_notification' : self._register_engine,
246 246 'unregistration_notification' : self._unregister_engine,
247 247 }
248 248 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
249 249 'apply_reply' : self._handle_apply_reply}
250 250 self._connect(sshserver, ssh_kwargs)
251 251
252 252
253 253 @property
254 254 def ids(self):
255 255 """Always up to date ids property."""
256 256 self._flush_notifications()
257 257 return self._ids
258 258
259 259 def _update_engines(self, engines):
260 260 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
261 261 for k,v in engines.iteritems():
262 262 eid = int(k)
263 263 self._engines[eid] = bytes(v) # force not unicode
264 264 self._ids.add(eid)
265 265
266 266 def _build_targets(self, targets):
267 267 """Turn valid target IDs or 'all' into two lists:
268 268 (int_ids, uuids).
269 269 """
270 270 if targets is None:
271 271 targets = self._ids
272 272 elif isinstance(targets, str):
273 273 if targets.lower() == 'all':
274 274 targets = self._ids
275 275 else:
276 276 raise TypeError("%r not valid str target, must be 'all'"%(targets))
277 277 elif isinstance(targets, int):
278 278 targets = [targets]
279 279 return [self._engines[t] for t in targets], list(targets)
280 280
281 281 def _connect(self, sshserver, ssh_kwargs):
282 282 """setup all our socket connections to the controller. This is called from
283 283 __init__."""
284 284 if self._connected:
285 285 return
286 286 self._connected=True
287 287
288 288 def connect_socket(s, addr):
289 289 if self._ssh:
290 290 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
291 291 else:
292 292 return s.connect(addr)
293 293
294 294 self.session.send(self._registration_socket, 'connection_request')
295 295 idents,msg = self.session.recv(self._registration_socket,mode=0)
296 296 if self.debug:
297 297 pprint(msg)
298 298 msg = ss.Message(msg)
299 299 content = msg.content
300 300 if content.status == 'ok':
301 301 if content.queue:
302 302 self._mux_socket = self.context.socket(zmq.PAIR)
303 303 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
304 304 connect_socket(self._mux_socket, content.queue)
305 305 if content.task:
306 306 self._task_socket = self.context.socket(zmq.PAIR)
307 307 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
308 308 connect_socket(self._task_socket, content.task)
309 309 if content.notification:
310 310 self._notification_socket = self.context.socket(zmq.SUB)
311 311 connect_socket(self._notification_socket, content.notification)
312 312 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
313 313 if content.query:
314 314 self._query_socket = self.context.socket(zmq.PAIR)
315 315 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
316 316 connect_socket(self._query_socket, content.query)
317 317 if content.control:
318 318 self._control_socket = self.context.socket(zmq.PAIR)
319 319 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
320 320 connect_socket(self._control_socket, content.control)
321 321 self._update_engines(dict(content.engines))
322 322
323 323 else:
324 324 self._connected = False
325 325 raise Exception("Failed to connect!")
326 326
327 327 #--------------------------------------------------------------------------
328 328 # handlers and callbacks for incoming messages
329 329 #--------------------------------------------------------------------------
330 330
331 331 def _register_engine(self, msg):
332 332 """Register a new engine, and update our connection info."""
333 333 content = msg['content']
334 334 eid = content['id']
335 335 d = {eid : content['queue']}
336 336 self._update_engines(d)
337 337 self._ids.add(int(eid))
338 338
339 339 def _unregister_engine(self, msg):
340 340 """Unregister an engine that has died."""
341 341 content = msg['content']
342 342 eid = int(content['id'])
343 343 if eid in self._ids:
344 344 self._ids.remove(eid)
345 345 self._engines.pop(eid)
346 346
347 347 def _handle_execute_reply(self, msg):
348 348 """Save the reply to an execute_request into our results."""
349 349 parent = msg['parent_header']
350 350 msg_id = parent['msg_id']
351 351 if msg_id not in self.outstanding:
352 352 print("got unknown result: %s"%msg_id)
353 353 else:
354 354 self.outstanding.remove(msg_id)
355 355 self.results[msg_id] = ss.unwrap_exception(msg['content'])
356 356
357 357 def _handle_apply_reply(self, msg):
358 358 """Save the reply to an apply_request into our results."""
359 359 parent = msg['parent_header']
360 360 msg_id = parent['msg_id']
361 361 if msg_id not in self.outstanding:
362 362 print ("got unknown result: %s"%msg_id)
363 363 else:
364 364 self.outstanding.remove(msg_id)
365 365 content = msg['content']
366 366 if content['status'] == 'ok':
367 367 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
368 368 elif content['status'] == 'aborted':
369 369 self.results[msg_id] = error.AbortedTask(msg_id)
370 370 elif content['status'] == 'resubmitted':
371 371 # TODO: handle resubmission
372 372 pass
373 373 else:
374 374 e = ss.unwrap_exception(content)
375 375 e_uuid = e.engine_info['engineid']
376 376 for k,v in self._engines.iteritems():
377 377 if v == e_uuid:
378 378 e.engine_info['engineid'] = k
379 379 break
380 380 self.results[msg_id] = e
381 381
382 382 def _flush_notifications(self):
383 383 """Flush notifications of engine registrations waiting
384 384 in ZMQ queue."""
385 385 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
386 386 while msg is not None:
387 387 if self.debug:
388 388 pprint(msg)
389 389 msg = msg[-1]
390 390 msg_type = msg['msg_type']
391 391 handler = self._notification_handlers.get(msg_type, None)
392 392 if handler is None:
393 393 raise Exception("Unhandled message type: %s"%msg.msg_type)
394 394 else:
395 395 handler(msg)
396 396 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
397 397
398 398 def _flush_results(self, sock):
399 399 """Flush task or queue results waiting in ZMQ queue."""
400 400 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
401 401 while msg is not None:
402 402 if self.debug:
403 403 pprint(msg)
404 404 msg = msg[-1]
405 405 msg_type = msg['msg_type']
406 406 handler = self._queue_handlers.get(msg_type, None)
407 407 if handler is None:
408 408 raise Exception("Unhandled message type: %s"%msg.msg_type)
409 409 else:
410 410 handler(msg)
411 411 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
412 412
413 413 def _flush_control(self, sock):
414 414 """Flush replies from the control channel waiting
415 415 in the ZMQ queue.
416 416
417 417 Currently: ignore them."""
418 418 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
419 419 while msg is not None:
420 420 if self.debug:
421 421 pprint(msg)
422 422 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
423 423
424 424 #--------------------------------------------------------------------------
425 425 # getitem
426 426 #--------------------------------------------------------------------------
427 427
428 428 def __getitem__(self, key):
429 429 """Dict access returns DirectView multiplexer objects or,
430 430 if key is None, a LoadBalancedView."""
431 431 if key is None:
432 432 return LoadBalancedView(self)
433 433 if isinstance(key, int):
434 434 if key not in self.ids:
435 435 raise IndexError("No such engine: %i"%key)
436 436 return DirectView(self, key)
437 437
438 438 if isinstance(key, slice):
439 439 indices = range(len(self.ids))[key]
440 440 ids = sorted(self._ids)
441 441 key = [ ids[i] for i in indices ]
442 442 # newkeys = sorted(self._ids)[thekeys[k]]
443 443
444 444 if isinstance(key, (tuple, list, xrange)):
445 445 _,targets = self._build_targets(list(key))
446 446 return DirectView(self, targets)
447 447 else:
448 448 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
449 449
450 450 #--------------------------------------------------------------------------
451 451 # Begin public methods
452 452 #--------------------------------------------------------------------------
453 453
454 454 @property
455 455 def remote(self):
456 456 """property for convenient RemoteFunction generation.
457 457
458 458 >>> @client.remote
459 459 ... def f():
460 460 import os
461 461 print (os.getpid())
462 462 """
463 463 return remote(self, block=self.block)
464 464
465 465 def spin(self):
466 466 """Flush any registration notifications and execution results
467 467 waiting in the ZMQ queue.
468 468 """
469 469 if self._notification_socket:
470 470 self._flush_notifications()
471 471 if self._mux_socket:
472 472 self._flush_results(self._mux_socket)
473 473 if self._task_socket:
474 474 self._flush_results(self._task_socket)
475 475 if self._control_socket:
476 476 self._flush_control(self._control_socket)
477 477
478 478 def barrier(self, msg_ids=None, timeout=-1):
479 479 """waits on one or more `msg_ids`, for up to `timeout` seconds.
480 480
481 481 Parameters
482 482 ----------
483 483 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
484 484 ints are indices to self.history
485 485 strs are msg_ids
486 486 default: wait on all outstanding messages
487 487 timeout : float
488 488 a time in seconds, after which to give up.
489 489 default is -1, which means no timeout
490 490
491 491 Returns
492 492 -------
493 493 True : when all msg_ids are done
494 494 False : timeout reached, some msg_ids still outstanding
495 495 """
496 496 tic = time.time()
497 497 if msg_ids is None:
498 498 theids = self.outstanding
499 499 else:
500 500 if isinstance(msg_ids, (int, str, AsyncResult)):
501 501 msg_ids = [msg_ids]
502 502 theids = set()
503 503 for msg_id in msg_ids:
504 504 if isinstance(msg_id, int):
505 505 msg_id = self.history[msg_id]
506 506 elif isinstance(msg_id, AsyncResult):
507 map(theids.add, msg_id._msg_ids)
507 map(theids.add, msg_id.msg_ids)
508 508 continue
509 509 theids.add(msg_id)
510 510 if not theids.intersection(self.outstanding):
511 511 return True
512 512 self.spin()
513 513 while theids.intersection(self.outstanding):
514 514 if timeout >= 0 and ( time.time()-tic ) > timeout:
515 515 break
516 516 time.sleep(1e-3)
517 517 self.spin()
518 518 return len(theids.intersection(self.outstanding)) == 0
519 519
520 520 #--------------------------------------------------------------------------
521 521 # Control methods
522 522 #--------------------------------------------------------------------------
523 523
524 524 @spinfirst
525 525 @defaultblock
526 526 def clear(self, targets=None, block=None):
527 527 """Clear the namespace in target(s)."""
528 528 targets = self._build_targets(targets)[0]
529 529 for t in targets:
530 530 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
531 531 error = False
532 532 if self.block:
533 533 for i in range(len(targets)):
534 534 idents,msg = self.session.recv(self._control_socket,0)
535 535 if self.debug:
536 536 pprint(msg)
537 537 if msg['content']['status'] != 'ok':
538 538 error = ss.unwrap_exception(msg['content'])
539 539 if error:
540 540 return error
541 541
542 542
543 543 @spinfirst
544 544 @defaultblock
545 545 def abort(self, msg_ids = None, targets=None, block=None):
546 546 """Abort the execution queues of target(s)."""
547 547 targets = self._build_targets(targets)[0]
548 548 if isinstance(msg_ids, basestring):
549 549 msg_ids = [msg_ids]
550 550 content = dict(msg_ids=msg_ids)
551 551 for t in targets:
552 552 self.session.send(self._control_socket, 'abort_request',
553 553 content=content, ident=t)
554 554 error = False
555 555 if self.block:
556 556 for i in range(len(targets)):
557 557 idents,msg = self.session.recv(self._control_socket,0)
558 558 if self.debug:
559 559 pprint(msg)
560 560 if msg['content']['status'] != 'ok':
561 561 error = ss.unwrap_exception(msg['content'])
562 562 if error:
563 563 return error
564 564
565 565 @spinfirst
566 566 @defaultblock
567 567 def shutdown(self, targets=None, restart=False, controller=False, block=None):
568 568 """Terminates one or more engine processes, optionally including the controller."""
569 569 if controller:
570 570 targets = 'all'
571 571 targets = self._build_targets(targets)[0]
572 572 for t in targets:
573 573 self.session.send(self._control_socket, 'shutdown_request',
574 574 content={'restart':restart},ident=t)
575 575 error = False
576 576 if block or controller:
577 577 for i in range(len(targets)):
578 578 idents,msg = self.session.recv(self._control_socket,0)
579 579 if self.debug:
580 580 pprint(msg)
581 581 if msg['content']['status'] != 'ok':
582 582 error = ss.unwrap_exception(msg['content'])
583 583
584 584 if controller:
585 585 time.sleep(0.25)
586 586 self.session.send(self._query_socket, 'shutdown_request')
587 587 idents,msg = self.session.recv(self._query_socket, 0)
588 588 if self.debug:
589 589 pprint(msg)
590 590 if msg['content']['status'] != 'ok':
591 591 error = ss.unwrap_exception(msg['content'])
592 592
593 593 if error:
594 594 raise error
595 595
596 596 #--------------------------------------------------------------------------
597 597 # Execution methods
598 598 #--------------------------------------------------------------------------
599 599
600 600 @defaultblock
601 601 def execute(self, code, targets='all', block=None):
602 602 """Executes `code` on `targets` in blocking or nonblocking manner.
603 603
604 604 ``execute`` is always `bound` (affects engine namespace)
605 605
606 606 Parameters
607 607 ----------
608 608 code : str
609 609 the code string to be executed
610 610 targets : int/str/list of ints/strs
611 611 the engines on which to execute
612 612 default : all
613 613 block : bool
614 614 whether or not to wait until done to return
615 615 default: self.block
616 616 """
617 617 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
618 618 return result
619 619
620 620 def run(self, code, block=None):
621 621 """Runs `code` on an engine.
622 622
623 623 Calls to this are load-balanced.
624 624
625 625 ``run`` is never `bound` (no effect on engine namespace)
626 626
627 627 Parameters
628 628 ----------
629 629 code : str
630 630 the code string to be executed
631 631 block : bool
632 632 whether or not to wait until done
633 633
634 634 """
635 635 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
636 636 return result
637 637
638 638 def _maybe_raise(self, result):
639 639 """wrapper for maybe raising an exception if apply failed."""
640 640 if isinstance(result, error.RemoteError):
641 641 raise result
642 642
643 643 return result
644 644
645 645 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
646 646 after=None, follow=None):
647 647 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
648 648
649 649 This is the central execution command for the client.
650 650
651 651 Parameters
652 652 ----------
653 653
654 654 f : function
655 655 The fuction to be called remotely
656 656 args : tuple/list
657 657 The positional arguments passed to `f`
658 658 kwargs : dict
659 659 The keyword arguments passed to `f`
660 660 bound : bool (default: True)
661 661 Whether to execute in the Engine(s) namespace, or in a clean
662 662 namespace not affecting the engine.
663 663 block : bool (default: self.block)
664 664 Whether to wait for the result, or return immediately.
665 665 False:
666 666 returns msg_id(s)
667 667 if multiple targets:
668 668 list of ids
669 669 True:
670 670 returns actual result(s) of f(*args, **kwargs)
671 671 if multiple targets:
672 672 dict of results, by engine ID
673 673 targets : int,list of ints, 'all', None
674 674 Specify the destination of the job.
675 675 if None:
676 676 Submit via Task queue for load-balancing.
677 677 if 'all':
678 678 Run on all active engines
679 679 if list:
680 680 Run on each specified engine
681 681 if int:
682 682 Run on single engine
683 683
684 684 after : Dependency or collection of msg_ids
685 685 Only for load-balanced execution (targets=None)
686 686 Specify a list of msg_ids as a time-based dependency.
687 687 This job will only be run *after* the dependencies
688 688 have been met.
689 689
690 690 follow : Dependency or collection of msg_ids
691 691 Only for load-balanced execution (targets=None)
692 692 Specify a list of msg_ids as a location-based dependency.
693 693 This job will only be run on an engine where this dependency
694 694 is met.
695 695
696 696 Returns
697 697 -------
698 698 if block is False:
699 699 if single target:
700 700 return msg_id
701 701 else:
702 702 return list of msg_ids
703 703 ? (should this be dict like block=True) ?
704 704 else:
705 705 if single target:
706 706 return result of f(*args, **kwargs)
707 707 else:
708 708 return dict of results, keyed by engine
709 709 """
710 710
711 711 # defaults:
712 712 block = block if block is not None else self.block
713 713 args = args if args is not None else []
714 714 kwargs = kwargs if kwargs is not None else {}
715 715
716 716 # enforce types of f,args,kwrags
717 717 if not callable(f):
718 718 raise TypeError("f must be callable, not %s"%type(f))
719 719 if not isinstance(args, (tuple, list)):
720 720 raise TypeError("args must be tuple or list, not %s"%type(args))
721 721 if not isinstance(kwargs, dict):
722 722 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
723 723
724 724 options = dict(bound=bound, block=block, after=after, follow=follow)
725 725
726 726 if targets is None:
727 727 return self._apply_balanced(f, args, kwargs, **options)
728 728 else:
729 729 return self._apply_direct(f, args, kwargs, targets=targets, **options)
730 730
731 731 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
732 732 after=None, follow=None):
733 733 """The underlying method for applying functions in a load balanced
734 734 manner, via the task queue."""
735 735 if isinstance(after, Dependency):
736 736 after = after.as_dict()
737 737 elif after is None:
738 738 after = []
739 739 if isinstance(follow, Dependency):
740 740 follow = follow.as_dict()
741 741 elif follow is None:
742 742 follow = []
743 743 subheader = dict(after=after, follow=follow)
744 744
745 745 bufs = ss.pack_apply_message(f,args,kwargs)
746 746 content = dict(bound=bound)
747 747 msg = self.session.send(self._task_socket, "apply_request",
748 748 content=content, buffers=bufs, subheader=subheader)
749 749 msg_id = msg['msg_id']
750 750 self.outstanding.add(msg_id)
751 751 self.history.append(msg_id)
752 752 if block:
753 753 self.barrier(msg_id)
754 754 return self._maybe_raise(self.results[msg_id])
755 755 else:
756 756 return AsyncResult(self, [msg_id])
757 757
758 758 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
759 759 after=None, follow=None):
760 760 """Then underlying method for applying functions to specific engines
761 761 via the MUX queue."""
762 762
763 763 queues,targets = self._build_targets(targets)
764 764 bufs = ss.pack_apply_message(f,args,kwargs)
765 765 if isinstance(after, Dependency):
766 766 after = after.as_dict()
767 767 elif after is None:
768 768 after = []
769 769 if isinstance(follow, Dependency):
770 770 follow = follow.as_dict()
771 771 elif follow is None:
772 772 follow = []
773 773 subheader = dict(after=after, follow=follow)
774 774 content = dict(bound=bound)
775 775 msg_ids = []
776 776 for queue in queues:
777 777 msg = self.session.send(self._mux_socket, "apply_request",
778 778 content=content, buffers=bufs,ident=queue, subheader=subheader)
779 779 msg_id = msg['msg_id']
780 780 self.outstanding.add(msg_id)
781 781 self.history.append(msg_id)
782 782 msg_ids.append(msg_id)
783 783 if block:
784 784 self.barrier(msg_ids)
785 785 else:
786 786 return AsyncResult(self, msg_ids)
787 787 if len(msg_ids) == 1:
788 788 return self._maybe_raise(self.results[msg_ids[0]])
789 789 else:
790 790 result = {}
791 791 for target,mid in zip(targets, msg_ids):
792 792 result[target] = self.results[mid]
793 793 return error.collect_exceptions(result, f.__name__)
794 794
795 795 #--------------------------------------------------------------------------
796 796 # Map and decorators
797 797 #--------------------------------------------------------------------------
798 798
799 799 def map(self, f, *sequences):
800 800 """Parallel version of builtin `map`, using all our engines."""
801 801 pf = ParallelFunction(self, f, block=self.block,
802 802 bound=True, targets='all')
803 803 return pf.map(*sequences)
804 804
805 805 def parallel(self, bound=True, targets='all', block=True):
806 806 """Decorator for making a ParallelFunction."""
807 807 return parallel(self, bound=bound, targets=targets, block=block)
808 808
809 809 def remote(self, bound=True, targets='all', block=True):
810 810 """Decorator for making a RemoteFunction."""
811 811 return remote(self, bound=bound, targets=targets, block=block)
812 812
813 813 #--------------------------------------------------------------------------
814 814 # Data movement
815 815 #--------------------------------------------------------------------------
816 816
817 817 @defaultblock
818 818 def push(self, ns, targets='all', block=None):
819 819 """Push the contents of `ns` into the namespace on `target`"""
820 820 if not isinstance(ns, dict):
821 821 raise TypeError("Must be a dict, not %s"%type(ns))
822 822 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
823 823 return result
824 824
825 825 @defaultblock
826 826 def pull(self, keys, targets='all', block=None):
827 827 """Pull objects from `target`'s namespace by `keys`"""
828 828 if isinstance(keys, str):
829 829 pass
830 830 elif isinstance(keys, (list,tuple,set)):
831 831 for key in keys:
832 832 if not isinstance(key, str):
833 833 raise TypeError
834 834 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
835 835 return result
836 836
837 837 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
838 838 """
839 839 Partition a Python sequence and send the partitions to a set of engines.
840 840 """
841 841 block = block if block is not None else self.block
842 842 targets = self._build_targets(targets)[-1]
843 843 mapObject = Map.dists[dist]()
844 844 nparts = len(targets)
845 845 msg_ids = []
846 846 for index, engineid in enumerate(targets):
847 847 partition = mapObject.getPartition(seq, index, nparts)
848 848 if flatten and len(partition) == 1:
849 849 r = self.push({key: partition[0]}, targets=engineid, block=False)
850 850 else:
851 851 r = self.push({key: partition}, targets=engineid, block=False)
852 msg_ids.extend(r._msg_ids)
852 msg_ids.extend(r.msg_ids)
853 853 r = AsyncResult(self, msg_ids)
854 854 if block:
855 855 return r.get()
856 856 else:
857 857 return r
858 858
859 859 def gather(self, key, dist='b', targets='all', block=None):
860 860 """
861 861 Gather a partitioned sequence on a set of engines as a single local seq.
862 862 """
863 863 block = block if block is not None else self.block
864 864
865 865 targets = self._build_targets(targets)[-1]
866 866 mapObject = Map.dists[dist]()
867 867 msg_ids = []
868 868 for index, engineid in enumerate(targets):
869 msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids)
869 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
870 870
871 871 r = AsyncMapResult(self, msg_ids, mapObject)
872 872 if block:
873 873 return r.get()
874 874 else:
875 875 return r
876 876
877 877 #--------------------------------------------------------------------------
878 878 # Query methods
879 879 #--------------------------------------------------------------------------
880 880
881 881 @spinfirst
882 882 def get_results(self, msg_ids, status_only=False):
883 883 """Returns the result of the execute or task request with `msg_ids`.
884 884
885 885 Parameters
886 886 ----------
887 887 msg_ids : list of ints or msg_ids
888 888 if int:
889 889 Passed as index to self.history for convenience.
890 890 status_only : bool (default: False)
891 891 if False:
892 892 return the actual results
893 893 """
894 894 if not isinstance(msg_ids, (list,tuple)):
895 895 msg_ids = [msg_ids]
896 896 theids = []
897 897 for msg_id in msg_ids:
898 898 if isinstance(msg_id, int):
899 899 msg_id = self.history[msg_id]
900 900 if not isinstance(msg_id, str):
901 901 raise TypeError("msg_ids must be str, not %r"%msg_id)
902 902 theids.append(msg_id)
903 903
904 904 completed = []
905 905 local_results = {}
906 906 for msg_id in list(theids):
907 907 if msg_id in self.results:
908 908 completed.append(msg_id)
909 909 local_results[msg_id] = self.results[msg_id]
910 910 theids.remove(msg_id)
911 911
912 912 if theids: # some not locally cached
913 913 content = dict(msg_ids=theids, status_only=status_only)
914 914 msg = self.session.send(self._query_socket, "result_request", content=content)
915 915 zmq.select([self._query_socket], [], [])
916 916 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
917 917 if self.debug:
918 918 pprint(msg)
919 919 content = msg['content']
920 920 if content['status'] != 'ok':
921 921 raise ss.unwrap_exception(content)
922 922 else:
923 923 content = dict(completed=[],pending=[])
924 924 if not status_only:
925 925 # load cached results into result:
926 926 content['completed'].extend(completed)
927 927 content.update(local_results)
928 928 # update cache with results:
929 929 for msg_id in msg_ids:
930 930 if msg_id in content['completed']:
931 931 self.results[msg_id] = content[msg_id]
932 932 return content
933 933
934 934 @spinfirst
935 935 def queue_status(self, targets=None, verbose=False):
936 936 """Fetch the status of engine queues.
937 937
938 938 Parameters
939 939 ----------
940 940 targets : int/str/list of ints/strs
941 941 the engines on which to execute
942 942 default : all
943 943 verbose : bool
944 944 Whether to return lengths only, or lists of ids for each element
945 945 """
946 946 targets = self._build_targets(targets)[1]
947 947 content = dict(targets=targets, verbose=verbose)
948 948 self.session.send(self._query_socket, "queue_request", content=content)
949 949 idents,msg = self.session.recv(self._query_socket, 0)
950 950 if self.debug:
951 951 pprint(msg)
952 952 content = msg['content']
953 953 status = content.pop('status')
954 954 if status != 'ok':
955 955 raise ss.unwrap_exception(content)
956 956 return content
957 957
958 958 @spinfirst
959 959 def purge_results(self, msg_ids=[], targets=[]):
960 960 """Tell the controller to forget results.
961 961
962 962 Individual results can be purged by msg_id, or the entire
963 963 history of specific targets can be purged.
964 964
965 965 Parameters
966 966 ----------
967 967 msg_ids : str or list of strs
968 968 the msg_ids whose results should be forgotten.
969 969 targets : int/str/list of ints/strs
970 970 The targets, by uuid or int_id, whose entire history is to be purged.
971 971 Use `targets='all'` to scrub everything from the controller's memory.
972 972
973 973 default : None
974 974 """
975 975 if not targets and not msg_ids:
976 976 raise ValueError
977 977 if targets:
978 978 targets = self._build_targets(targets)[1]
979 979 content = dict(targets=targets, msg_ids=msg_ids)
980 980 self.session.send(self._query_socket, "purge_request", content=content)
981 981 idents, msg = self.session.recv(self._query_socket, 0)
982 982 if self.debug:
983 983 pprint(msg)
984 984 content = msg['content']
985 985 if content['status'] != 'ok':
986 986 raise ss.unwrap_exception(content)
987 987
988 988 #----------------------------------------
989 989 # activate for %px,%autopx magics
990 990 #----------------------------------------
991 991 def activate(self):
992 992 """Make this `View` active for parallel magic commands.
993 993
994 994 IPython has a magic command syntax to work with `MultiEngineClient` objects.
995 995 In a given IPython session there is a single active one. While
996 996 there can be many `Views` created and used by the user,
997 997 there is only one active one. The active `View` is used whenever
998 998 the magic commands %px and %autopx are used.
999 999
1000 1000 The activate() method is called on a given `View` to make it
1001 1001 active. Once this has been done, the magic commands can be used.
1002 1002 """
1003 1003
1004 1004 try:
1005 1005 # This is injected into __builtins__.
1006 1006 ip = get_ipython()
1007 1007 except NameError:
1008 1008 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1009 1009 else:
1010 1010 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1011 1011 if pmagic is not None:
1012 1012 pmagic.active_multiengine_client = self
1013 1013 else:
1014 1014 print "You must first load the parallelmagic extension " \
1015 1015 "by doing '%load_ext parallelmagic'"
1016 1016
1017 1017 class AsynClient(Client):
1018 1018 """An Asynchronous client, using the Tornado Event Loop.
1019 1019 !!!unfinished!!!"""
1020 1020 io_loop = None
1021 1021 _queue_stream = None
1022 1022 _notifier_stream = None
1023 1023 _task_stream = None
1024 1024 _control_stream = None
1025 1025
1026 1026 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1027 1027 Client.__init__(self, addr, context, username, debug)
1028 1028 if io_loop is None:
1029 1029 io_loop = ioloop.IOLoop.instance()
1030 1030 self.io_loop = io_loop
1031 1031
1032 1032 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1033 1033 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1034 1034 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1035 1035 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1036 1036
1037 1037 def spin(self):
1038 1038 for stream in (self.queue_stream, self.notifier_stream,
1039 1039 self.task_stream, self.control_stream):
1040 1040 stream.flush()
1041 1041
1042 1042 __all__ = [ 'Client',
1043 1043 'depend',
1044 1044 'require',
1045 1045 'remote',
1046 1046 'parallel',
1047 1047 'RemoteFunction',
1048 1048 'ParallelFunction',
1049 1049 'DirectView',
1050 1050 'LoadBalancedView',
1051 1051 'AsyncResult',
1052 1052 'AsyncMapResult'
1053 1053 ]
@@ -1,347 +1,355
1 1 """Views of remote engines"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from IPython.external.decorator import decorator
14 14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Decorators
18 18 #-----------------------------------------------------------------------------
19 19
20 20 @decorator
21 21 def myblock(f, self, *args, **kwargs):
22 22 """override client.block with self.block during a call"""
23 23 block = self.client.block
24 24 self.client.block = self.block
25 25 try:
26 26 ret = f(self, *args, **kwargs)
27 27 finally:
28 28 self.client.block = block
29 29 return ret
30 30
31 31 @decorator
32 32 def save_ids(f, self, *args, **kwargs):
33 33 """Keep our history and outstanding attributes up to date after a method call."""
34 34 n_previous = len(self.client.history)
35 35 ret = f(self, *args, **kwargs)
36 36 nmsgs = len(self.client.history) - n_previous
37 37 msg_ids = self.client.history[-nmsgs:]
38 38 self.history.extend(msg_ids)
39 39 map(self.outstanding.add, msg_ids)
40 40 return ret
41 41
42 42 @decorator
43 43 def sync_results(f, self, *args, **kwargs):
44 44 """sync relevant results from self.client to our results attribute."""
45 45 ret = f(self, *args, **kwargs)
46 46 delta = self.outstanding.difference(self.client.outstanding)
47 47 completed = self.outstanding.intersection(delta)
48 48 self.outstanding = self.outstanding.difference(completed)
49 49 for msg_id in completed:
50 50 self.results[msg_id] = self.client.results[msg_id]
51 51 return ret
52 52
53 53 @decorator
54 54 def spin_after(f, self, *args, **kwargs):
55 55 """call spin after the method."""
56 56 ret = f(self, *args, **kwargs)
57 57 self.spin()
58 58 return ret
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # Classes
62 62 #-----------------------------------------------------------------------------
63 63
64 64 class View(object):
65 65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
66 66
67 67 Don't use this class, use subclasses.
68 68 """
69 69 _targets = None
70 70 block=None
71 71 bound=None
72 72 history=None
73 73
74 74 def __init__(self, client, targets=None):
75 75 self.client = client
76 76 self._targets = targets
77 77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
78 78 self.block = client.block
79 79 self.bound=False
80 80 self.history = []
81 81 self.outstanding = set()
82 82 self.results = {}
83 83
84 84 def __repr__(self):
85 85 strtargets = str(self._targets)
86 86 if len(strtargets) > 16:
87 87 strtargets = strtargets[:12]+'...]'
88 88 return "<%s %s>"%(self.__class__.__name__, strtargets)
89 89
90 90 @property
91 91 def targets(self):
92 92 return self._targets
93 93
94 94 @targets.setter
95 95 def targets(self, value):
96 96 self._targets = value
97 97 # raise AttributeError("Cannot set my targets argument after construction!")
98 98
99 99 @sync_results
100 100 def spin(self):
101 101 """spin the client, and sync"""
102 102 self.client.spin()
103 103
104 104 @sync_results
105 105 @save_ids
106 106 def apply(self, f, *args, **kwargs):
107 107 """calls f(*args, **kwargs) on remote engines, returning the result.
108 108
109 109 This method does not involve the engine's namespace.
110 110
111 111 if self.block is False:
112 112 returns msg_id
113 113 else:
114 114 returns actual result of f(*args, **kwargs)
115 115 """
116 116 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
117 117
118 118 @save_ids
119 119 def apply_async(self, f, *args, **kwargs):
120 120 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
121 121
122 122 This method does not involve the engine's namespace.
123 123
124 124 returns msg_id
125 125 """
126 126 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
127 127
128 128 @spin_after
129 129 @save_ids
130 130 def apply_sync(self, f, *args, **kwargs):
131 131 """calls f(*args, **kwargs) on remote engines in a blocking manner,
132 132 returning the result.
133 133
134 134 This method does not involve the engine's namespace.
135 135
136 136 returns: actual result of f(*args, **kwargs)
137 137 """
138 138 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
139 139
140 140 @sync_results
141 141 @save_ids
142 142 def apply_bound(self, f, *args, **kwargs):
143 143 """calls f(*args, **kwargs) bound to engine namespace(s).
144 144
145 145 if self.block is False:
146 146 returns msg_id
147 147 else:
148 148 returns actual result of f(*args, **kwargs)
149 149
150 150 This method has access to the targets' globals
151 151
152 152 """
153 153 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
154 154
155 155 @sync_results
156 156 @save_ids
157 157 def apply_async_bound(self, f, *args, **kwargs):
158 158 """calls f(*args, **kwargs) bound to engine namespace(s)
159 159 in a nonblocking manner.
160 160
161 161 returns: msg_id
162 162
163 163 This method has access to the targets' globals
164 164
165 165 """
166 166 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
167 167
168 168 @spin_after
169 169 @save_ids
170 170 def apply_sync_bound(self, f, *args, **kwargs):
171 171 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
172 172
173 173 returns: actual result of f(*args, **kwargs)
174 174
175 175 This method has access to the targets' globals
176 176
177 177 """
178 178 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
179 179
180 180 @spin_after
181 181 @save_ids
182 182 def map(self, f, *sequences):
183 183 """Parallel version of builtin `map`, using this view's engines."""
184 184 if isinstance(self.targets, int):
185 185 targets = [self.targets]
186 186 pf = ParallelFunction(self.client, f, block=self.block,
187 187 bound=True, targets=targets)
188 188 return pf.map(*sequences)
189 189
190 190 def parallel(self, bound=True, block=True):
191 191 """Decorator for making a ParallelFunction"""
192 192 return parallel(self.client, bound=bound, targets=self.targets, block=block)
193 193
194 194 def abort(self, msg_ids=None, block=None):
195 195 """Abort jobs on my engines.
196 196
197 197 Parameters
198 198 ----------
199 199
200 200 msg_ids : None, str, list of strs, optional
201 201 if None: abort all jobs.
202 202 else: abort specific msg_id(s).
203 203 """
204 204 block = block if block is not None else self.block
205 205 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
206 206
207 207 def queue_status(self, verbose=False):
208 208 """Fetch the Queue status of my engines"""
209 209 return self.client.queue_status(targets=self.targets, verbose=verbose)
210 210
211 211 def purge_results(self, msg_ids=[], targets=[]):
212 212 """Instruct the controller to forget specific results."""
213 213 if targets is None or targets == 'all':
214 214 targets = self.targets
215 215 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
216 216
217 217
218 218
219 219 class DirectView(View):
220 220 """Direct Multiplexer View of one or more engines.
221 221
222 222 These are created via indexed access to a client:
223 223
224 224 >>> dv_1 = client[1]
225 225 >>> dv_all = client[:]
226 226 >>> dv_even = client[::2]
227 227 >>> dv_some = client[1:3]
228 228
229 229 This object provides dictionary access
230 230
231 231 """
232 232
233 233 @sync_results
234 234 @save_ids
235 235 def execute(self, code, block=True):
236 236 """execute some code on my targets."""
237 237 return self.client.execute(code, block=self.block, targets=self.targets)
238 238
239 239 def update(self, ns):
240 240 """update remote namespace with dict `ns`"""
241 241 return self.client.push(ns, targets=self.targets, block=self.block)
242 242
243 243 push = update
244 244
245 245 def get(self, key_s):
246 246 """get object(s) by `key_s` from remote namespace
247 247 will return one object if it is a key.
248 248 It also takes a list of keys, and will return a list of objects."""
249 249 # block = block if block is not None else self.block
250 250 return self.client.pull(key_s, block=True, targets=self.targets)
251 251
252 252 @sync_results
253 253 @save_ids
254 254 def pull(self, key_s, block=True):
255 255 """get object(s) by `key_s` from remote namespace
256 256 will return one object if it is a key.
257 257 It also takes a list of keys, and will return a list of objects."""
258 258 block = block if block is not None else self.block
259 259 return self.client.pull(key_s, block=block, targets=self.targets)
260 260
261 261 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
262 262 """
263 263 Partition a Python sequence and send the partitions to a set of engines.
264 264 """
265 265 block = block if block is not None else self.block
266 266 if targets is None:
267 267 targets = self.targets
268 268
269 269 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
270 270 targets=targets, block=block)
271 271
272 272 @sync_results
273 273 @save_ids
274 274 def gather(self, key, dist='b', targets=None, block=True):
275 275 """
276 276 Gather a partitioned sequence on a set of engines as a single local seq.
277 277 """
278 278 block = block if block is not None else self.block
279 279 if targets is None:
280 280 targets = self.targets
281 281
282 282 return self.client.gather(key, dist=dist, targets=targets, block=block)
283 283
284 284 def __getitem__(self, key):
285 285 return self.get(key)
286 286
287 287 def __setitem__(self,key, value):
288 288 self.update({key:value})
289 289
290 290 def clear(self, block=False):
291 291 """Clear the remote namespaces on my engines."""
292 292 block = block if block is not None else self.block
293 293 return self.client.clear(targets=self.targets, block=block)
294 294
295 295 def kill(self, block=True):
296 296 """Kill my engines."""
297 297 block = block if block is not None else self.block
298 298 return self.client.kill(targets=self.targets, block=block)
299 299
300 300 #----------------------------------------
301 301 # activate for %px,%autopx magics
302 302 #----------------------------------------
303 303 def activate(self):
304 304 """Make this `View` active for parallel magic commands.
305 305
306 306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
307 307 In a given IPython session there is a single active one. While
308 308 there can be many `Views` created and used by the user,
309 309 there is only one active one. The active `View` is used whenever
310 310 the magic commands %px and %autopx are used.
311 311
312 312 The activate() method is called on a given `View` to make it
313 313 active. Once this has been done, the magic commands can be used.
314 314 """
315 315
316 316 try:
317 317 # This is injected into __builtins__.
318 318 ip = get_ipython()
319 319 except NameError:
320 320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
321 321 else:
322 322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
323 323 if pmagic is not None:
324 324 pmagic.active_multiengine_client = self
325 325 else:
326 326 print "You must first load the parallelmagic extension " \
327 327 "by doing '%load_ext parallelmagic'"
328 328
329 329
330 330 class LoadBalancedView(View):
331 331 """An engine-agnostic View that only executes via the Task queue.
332 332
333 333 Typically created via:
334 334
335 335 >>> lbv = client[None]
336 336 <LoadBalancedView tcp://127.0.0.1:12345>
337 337
338 338 but can also be created with:
339 339
340 340 >>> lbc = LoadBalancedView(client)
341 341
342 342 TODO: allow subset of engines across which to balance.
343 343 """
344 344 def __repr__(self):
345 345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
346 346
347 @property
348 def targets(self):
349 return None
350
351 @targets.setter
352 def targets(self, value):
353 raise AttributeError("Cannot set targets for LoadbalancedView!")
354
347 355 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now