##// END OF EJS Templates
tweaks related to docs + add activate() for magics
MinRK -
Show More
@@ -1,1019 +1,1053 b''
1 1 """A semi-synchronous Client for the ZMQ controller"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 from __future__ import print_function
14
15 13 import os
16 14 import time
17 15 from getpass import getpass
18 16 from pprint import pprint
19 17
20 18 import zmq
21 19 from zmq.eventloop import ioloop, zmqstream
22 20
23 21 from IPython.external.decorator import decorator
24 22 from IPython.zmq import tunnel
25 23
26 24 import streamsession as ss
27 25 # from remotenamespace import RemoteNamespace
28 26 from view import DirectView, LoadBalancedView
29 27 from dependency import Dependency, depend, require
30 28 import error
31 29 import map as Map
32 30 from asyncresult import AsyncResult, AsyncMapResult
33 31 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
34 32
35 33 #--------------------------------------------------------------------------
36 34 # helpers for implementing old MEC API via client.apply
37 35 #--------------------------------------------------------------------------
38 36
39 37 def _push(ns):
40 38 """helper method for implementing `client.push` via `client.apply`"""
41 39 globals().update(ns)
42 40
43 41 def _pull(keys):
44 42 """helper method for implementing `client.pull` via `client.apply`"""
45 43 g = globals()
46 44 if isinstance(keys, (list,tuple, set)):
47 45 for key in keys:
48 46 if not g.has_key(key):
49 47 raise NameError("name '%s' is not defined"%key)
50 48 return map(g.get, keys)
51 49 else:
52 50 if not g.has_key(keys):
53 51 raise NameError("name '%s' is not defined"%keys)
54 52 return g.get(keys)
55 53
56 54 def _clear():
57 55 """helper method for implementing `client.clear` via `client.apply`"""
58 56 globals().clear()
59 57
60 def execute(code):
58 def _execute(code):
61 59 """helper method for implementing `client.execute` via `client.apply`"""
62 60 exec code in globals()
63 61
64 62
65 63 #--------------------------------------------------------------------------
66 64 # Decorators for Client methods
67 65 #--------------------------------------------------------------------------
68 66
69 67 @decorator
70 68 def spinfirst(f, self, *args, **kwargs):
71 69 """Call spin() to sync state prior to calling the method."""
72 70 self.spin()
73 71 return f(self, *args, **kwargs)
74 72
75 73 @decorator
76 74 def defaultblock(f, self, *args, **kwargs):
77 75 """Default to self.block; preserve self.block."""
78 76 block = kwargs.get('block',None)
79 77 block = self.block if block is None else block
80 78 saveblock = self.block
81 79 self.block = block
82 ret = f(self, *args, **kwargs)
83 self.block = saveblock
80 try:
81 ret = f(self, *args, **kwargs)
82 finally:
83 self.block = saveblock
84 84 return ret
85 85
86 86
87 87 class AbortedTask(object):
88 88 """A basic wrapper object describing an aborted task."""
89 89 def __init__(self, msg_id):
90 90 self.msg_id = msg_id
91 91
92 92 class ResultDict(dict):
93 93 """A subclass of dict that raises errors if it has them."""
94 94 def __getitem__(self, key):
95 95 res = dict.__getitem__(self, key)
96 96 if isinstance(res, error.KernelError):
97 97 raise res
98 98 return res
99 99
100 100 class Client(object):
101 101 """A semi-synchronous client to the IPython ZMQ controller
102 102
103 103 Parameters
104 104 ----------
105 105
106 106 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
107 107 The address of the controller's registration socket.
108 108 [Default: 'tcp://127.0.0.1:10101']
109 109 context : zmq.Context
110 110 Pass an existing zmq.Context instance, otherwise the client will create its own
111 111 username : bytes
112 112 set username to be passed to the Session object
113 113 debug : bool
114 114 flag for lots of message printing for debug purposes
115 115
116 116 #-------------- ssh related args ----------------
117 117 # These are args for configuring the ssh tunnel to be used
118 118 # credentials are used to forward connections over ssh to the Controller
119 119 # Note that the ip given in `addr` needs to be relative to sshserver
120 120 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
121 121 # and set sshserver as the same machine the Controller is on. However,
122 122 # the only requirement is that sshserver is able to see the Controller
123 123 # (i.e. is within the same trusted network).
124 124
125 125 sshserver : str
126 126 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
127 127 If keyfile or password is specified, and this is not, it will default to
128 128 the ip given in addr.
129 129 sshkey : str; path to public ssh key file
130 130 This specifies a key to be used in ssh login, default None.
131 131 Regular default ssh keys will be used without specifying this argument.
132 132 password : str;
133 133 Your ssh password to sshserver. Note that if this is left None,
134 134 you will be prompted for it if passwordless key based login is unavailable.
135 135
136 136 #------- exec authentication args -------
137 137 # If even localhost is untrusted, you can have some protection against
138 138 # unauthorized execution by using a key. Messages are still sent
139 139 # as cleartext, so if someone can snoop your loopback traffic this will
140 140 # not help anything.
141 141
142 142 exec_key : str
143 143 an authentication key or file containing a key
144 144 default: None
145 145
146 146
147 147 Attributes
148 148 ----------
149 149 ids : set of int engine IDs
150 150 requesting the ids attribute always synchronizes
151 151 the registration state. To request ids without synchronization,
152 152 use semi-private _ids attributes.
153 153
154 154 history : list of msg_ids
155 155 a list of msg_ids, keeping track of all the execution
156 156 messages you have submitted in order.
157 157
158 158 outstanding : set of msg_ids
159 159 a set of msg_ids that have been submitted, but whose
160 160 results have not yet been received.
161 161
162 162 results : dict
163 163 a dict of all our results, keyed by msg_id
164 164
165 165 block : bool
166 166 determines default behavior when block not specified
167 167 in execution methods
168 168
169 169 Methods
170 170 -------
171 171 spin : flushes incoming results and registration state changes
172 172 control methods spin, and requesting `ids` also ensures up to date
173 173
174 174 barrier : wait on one or more msg_ids
175 175
176 176 execution methods: apply/apply_bound/apply_to/apply_bound
177 177 legacy: execute, run
178 178
179 179 query methods: queue_status, get_result, purge
180 180
181 181 control methods: abort, kill
182 182
183 183 """
184 184
185 185
186 186 _connected=False
187 187 _ssh=False
188 188 _engines=None
189 189 _addr='tcp://127.0.0.1:10101'
190 190 _registration_socket=None
191 191 _query_socket=None
192 192 _control_socket=None
193 193 _notification_socket=None
194 194 _mux_socket=None
195 195 _task_socket=None
196 196 block = False
197 197 outstanding=None
198 198 results = None
199 199 history = None
200 200 debug = False
201 targets = None
201 202
202 203 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
203 204 sshserver=None, sshkey=None, password=None, paramiko=None,
204 205 exec_key=None,):
205 206 if context is None:
206 207 context = zmq.Context()
207 208 self.context = context
209 self.targets = 'all'
208 210 self._addr = addr
209 211 self._ssh = bool(sshserver or sshkey or password)
210 212 if self._ssh and sshserver is None:
211 213 # default to the same
212 214 sshserver = addr.split('://')[1].split(':')[0]
213 215 if self._ssh and password is None:
214 216 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
215 217 password=False
216 218 else:
217 219 password = getpass("SSH Password for %s: "%sshserver)
218 220 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
219 221
220 222 if exec_key is not None and os.path.isfile(exec_key):
221 223 arg = 'keyfile'
222 224 else:
223 225 arg = 'key'
224 226 key_arg = {arg:exec_key}
225 227 if username is None:
226 228 self.session = ss.StreamSession(**key_arg)
227 229 else:
228 230 self.session = ss.StreamSession(username, **key_arg)
229 231 self._registration_socket = self.context.socket(zmq.XREQ)
230 232 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
231 233 if self._ssh:
232 234 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
233 235 else:
234 236 self._registration_socket.connect(addr)
235 237 self._engines = {}
236 238 self._ids = set()
237 239 self.outstanding=set()
238 240 self.results = {}
239 241 self.history = []
240 242 self.debug = debug
241 243 self.session.debug = debug
242 244
243 245 self._notification_handlers = {'registration_notification' : self._register_engine,
244 246 'unregistration_notification' : self._unregister_engine,
245 247 }
246 248 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
247 249 'apply_reply' : self._handle_apply_reply}
248 250 self._connect(sshserver, ssh_kwargs)
249 251
250 252
251 253 @property
252 254 def ids(self):
253 255 """Always up to date ids property."""
254 256 self._flush_notifications()
255 257 return self._ids
256 258
257 259 def _update_engines(self, engines):
258 260 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
259 261 for k,v in engines.iteritems():
260 262 eid = int(k)
261 263 self._engines[eid] = bytes(v) # force not unicode
262 264 self._ids.add(eid)
263 265
264 266 def _build_targets(self, targets):
265 267 """Turn valid target IDs or 'all' into two lists:
266 268 (int_ids, uuids).
267 269 """
268 270 if targets is None:
269 271 targets = self._ids
270 272 elif isinstance(targets, str):
271 273 if targets.lower() == 'all':
272 274 targets = self._ids
273 275 else:
274 276 raise TypeError("%r not valid str target, must be 'all'"%(targets))
275 277 elif isinstance(targets, int):
276 278 targets = [targets]
277 279 return [self._engines[t] for t in targets], list(targets)
278 280
279 281 def _connect(self, sshserver, ssh_kwargs):
280 282 """setup all our socket connections to the controller. This is called from
281 283 __init__."""
282 284 if self._connected:
283 285 return
284 286 self._connected=True
285 287
286 288 def connect_socket(s, addr):
287 289 if self._ssh:
288 290 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
289 291 else:
290 292 return s.connect(addr)
291 293
292 294 self.session.send(self._registration_socket, 'connection_request')
293 295 idents,msg = self.session.recv(self._registration_socket,mode=0)
294 296 if self.debug:
295 297 pprint(msg)
296 298 msg = ss.Message(msg)
297 299 content = msg.content
298 300 if content.status == 'ok':
299 301 if content.queue:
300 302 self._mux_socket = self.context.socket(zmq.PAIR)
301 303 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 304 connect_socket(self._mux_socket, content.queue)
303 305 if content.task:
304 306 self._task_socket = self.context.socket(zmq.PAIR)
305 307 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
306 308 connect_socket(self._task_socket, content.task)
307 309 if content.notification:
308 310 self._notification_socket = self.context.socket(zmq.SUB)
309 311 connect_socket(self._notification_socket, content.notification)
310 312 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
311 313 if content.query:
312 314 self._query_socket = self.context.socket(zmq.PAIR)
313 315 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
314 316 connect_socket(self._query_socket, content.query)
315 317 if content.control:
316 318 self._control_socket = self.context.socket(zmq.PAIR)
317 319 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
318 320 connect_socket(self._control_socket, content.control)
319 321 self._update_engines(dict(content.engines))
320 322
321 323 else:
322 324 self._connected = False
323 325 raise Exception("Failed to connect!")
324 326
325 327 #--------------------------------------------------------------------------
326 328 # handlers and callbacks for incoming messages
327 329 #--------------------------------------------------------------------------
328 330
329 331 def _register_engine(self, msg):
330 332 """Register a new engine, and update our connection info."""
331 333 content = msg['content']
332 334 eid = content['id']
333 335 d = {eid : content['queue']}
334 336 self._update_engines(d)
335 337 self._ids.add(int(eid))
336 338
337 339 def _unregister_engine(self, msg):
338 340 """Unregister an engine that has died."""
339 341 content = msg['content']
340 342 eid = int(content['id'])
341 343 if eid in self._ids:
342 344 self._ids.remove(eid)
343 345 self._engines.pop(eid)
344 346
345 347 def _handle_execute_reply(self, msg):
346 348 """Save the reply to an execute_request into our results."""
347 349 parent = msg['parent_header']
348 350 msg_id = parent['msg_id']
349 351 if msg_id not in self.outstanding:
350 352 print("got unknown result: %s"%msg_id)
351 353 else:
352 354 self.outstanding.remove(msg_id)
353 355 self.results[msg_id] = ss.unwrap_exception(msg['content'])
354 356
355 357 def _handle_apply_reply(self, msg):
356 358 """Save the reply to an apply_request into our results."""
357 359 parent = msg['parent_header']
358 360 msg_id = parent['msg_id']
359 361 if msg_id not in self.outstanding:
360 362 print ("got unknown result: %s"%msg_id)
361 363 else:
362 364 self.outstanding.remove(msg_id)
363 365 content = msg['content']
364 366 if content['status'] == 'ok':
365 367 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
366 368 elif content['status'] == 'aborted':
367 369 self.results[msg_id] = error.AbortedTask(msg_id)
368 370 elif content['status'] == 'resubmitted':
369 371 # TODO: handle resubmission
370 372 pass
371 373 else:
372 374 e = ss.unwrap_exception(content)
373 375 e_uuid = e.engine_info['engineid']
374 376 for k,v in self._engines.iteritems():
375 377 if v == e_uuid:
376 378 e.engine_info['engineid'] = k
377 379 break
378 380 self.results[msg_id] = e
379 381
380 382 def _flush_notifications(self):
381 383 """Flush notifications of engine registrations waiting
382 384 in ZMQ queue."""
383 385 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
384 386 while msg is not None:
385 387 if self.debug:
386 388 pprint(msg)
387 389 msg = msg[-1]
388 390 msg_type = msg['msg_type']
389 391 handler = self._notification_handlers.get(msg_type, None)
390 392 if handler is None:
391 393 raise Exception("Unhandled message type: %s"%msg.msg_type)
392 394 else:
393 395 handler(msg)
394 396 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
395 397
396 398 def _flush_results(self, sock):
397 399 """Flush task or queue results waiting in ZMQ queue."""
398 400 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
399 401 while msg is not None:
400 402 if self.debug:
401 403 pprint(msg)
402 404 msg = msg[-1]
403 405 msg_type = msg['msg_type']
404 406 handler = self._queue_handlers.get(msg_type, None)
405 407 if handler is None:
406 408 raise Exception("Unhandled message type: %s"%msg.msg_type)
407 409 else:
408 410 handler(msg)
409 411 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
410 412
411 413 def _flush_control(self, sock):
412 414 """Flush replies from the control channel waiting
413 415 in the ZMQ queue.
414 416
415 417 Currently: ignore them."""
416 418 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
417 419 while msg is not None:
418 420 if self.debug:
419 421 pprint(msg)
420 422 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
421 423
422 424 #--------------------------------------------------------------------------
423 425 # getitem
424 426 #--------------------------------------------------------------------------
425 427
426 428 def __getitem__(self, key):
427 429 """Dict access returns DirectView multiplexer objects or,
428 430 if key is None, a LoadBalancedView."""
429 431 if key is None:
430 432 return LoadBalancedView(self)
431 433 if isinstance(key, int):
432 434 if key not in self.ids:
433 435 raise IndexError("No such engine: %i"%key)
434 436 return DirectView(self, key)
435 437
436 438 if isinstance(key, slice):
437 439 indices = range(len(self.ids))[key]
438 440 ids = sorted(self._ids)
439 441 key = [ ids[i] for i in indices ]
440 442 # newkeys = sorted(self._ids)[thekeys[k]]
441 443
442 444 if isinstance(key, (tuple, list, xrange)):
443 445 _,targets = self._build_targets(list(key))
444 446 return DirectView(self, targets)
445 447 else:
446 448 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
447 449
448 450 #--------------------------------------------------------------------------
449 451 # Begin public methods
450 452 #--------------------------------------------------------------------------
451 453
452 454 @property
453 455 def remote(self):
454 456 """property for convenient RemoteFunction generation.
455 457
456 458 >>> @client.remote
457 459 ... def f():
458 460 import os
459 461 print (os.getpid())
460 462 """
461 463 return remote(self, block=self.block)
462 464
463 465 def spin(self):
464 466 """Flush any registration notifications and execution results
465 467 waiting in the ZMQ queue.
466 468 """
467 469 if self._notification_socket:
468 470 self._flush_notifications()
469 471 if self._mux_socket:
470 472 self._flush_results(self._mux_socket)
471 473 if self._task_socket:
472 474 self._flush_results(self._task_socket)
473 475 if self._control_socket:
474 476 self._flush_control(self._control_socket)
475 477
476 478 def barrier(self, msg_ids=None, timeout=-1):
477 479 """waits on one or more `msg_ids`, for up to `timeout` seconds.
478 480
479 481 Parameters
480 482 ----------
481 msg_ids : int, str, or list of ints and/or strs
483 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
482 484 ints are indices to self.history
483 485 strs are msg_ids
484 486 default: wait on all outstanding messages
485 487 timeout : float
486 488 a time in seconds, after which to give up.
487 489 default is -1, which means no timeout
488 490
489 491 Returns
490 492 -------
491 493 True : when all msg_ids are done
492 494 False : timeout reached, some msg_ids still outstanding
493 495 """
494 496 tic = time.time()
495 497 if msg_ids is None:
496 498 theids = self.outstanding
497 499 else:
498 if isinstance(msg_ids, (int, str)):
500 if isinstance(msg_ids, (int, str, AsyncResult)):
499 501 msg_ids = [msg_ids]
500 502 theids = set()
501 503 for msg_id in msg_ids:
502 504 if isinstance(msg_id, int):
503 505 msg_id = self.history[msg_id]
506 elif isinstance(msg_id, AsyncResult):
507 map(theids.add, msg_id._msg_ids)
508 continue
504 509 theids.add(msg_id)
510 if not theids.intersection(self.outstanding):
511 return True
505 512 self.spin()
506 513 while theids.intersection(self.outstanding):
507 514 if timeout >= 0 and ( time.time()-tic ) > timeout:
508 515 break
509 516 time.sleep(1e-3)
510 517 self.spin()
511 518 return len(theids.intersection(self.outstanding)) == 0
512 519
513 520 #--------------------------------------------------------------------------
514 521 # Control methods
515 522 #--------------------------------------------------------------------------
516 523
517 524 @spinfirst
518 525 @defaultblock
519 526 def clear(self, targets=None, block=None):
520 527 """Clear the namespace in target(s)."""
521 528 targets = self._build_targets(targets)[0]
522 529 for t in targets:
523 530 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
524 531 error = False
525 532 if self.block:
526 533 for i in range(len(targets)):
527 534 idents,msg = self.session.recv(self._control_socket,0)
528 535 if self.debug:
529 536 pprint(msg)
530 537 if msg['content']['status'] != 'ok':
531 538 error = ss.unwrap_exception(msg['content'])
532 539 if error:
533 540 return error
534 541
535 542
536 543 @spinfirst
537 544 @defaultblock
538 545 def abort(self, msg_ids = None, targets=None, block=None):
539 546 """Abort the execution queues of target(s)."""
540 547 targets = self._build_targets(targets)[0]
541 548 if isinstance(msg_ids, basestring):
542 549 msg_ids = [msg_ids]
543 550 content = dict(msg_ids=msg_ids)
544 551 for t in targets:
545 552 self.session.send(self._control_socket, 'abort_request',
546 553 content=content, ident=t)
547 554 error = False
548 555 if self.block:
549 556 for i in range(len(targets)):
550 557 idents,msg = self.session.recv(self._control_socket,0)
551 558 if self.debug:
552 559 pprint(msg)
553 560 if msg['content']['status'] != 'ok':
554 561 error = ss.unwrap_exception(msg['content'])
555 562 if error:
556 563 return error
557 564
558 565 @spinfirst
559 566 @defaultblock
560 567 def shutdown(self, targets=None, restart=False, controller=False, block=None):
561 568 """Terminates one or more engine processes, optionally including the controller."""
562 569 if controller:
563 570 targets = 'all'
564 571 targets = self._build_targets(targets)[0]
565 572 for t in targets:
566 573 self.session.send(self._control_socket, 'shutdown_request',
567 574 content={'restart':restart},ident=t)
568 575 error = False
569 576 if block or controller:
570 577 for i in range(len(targets)):
571 578 idents,msg = self.session.recv(self._control_socket,0)
572 579 if self.debug:
573 580 pprint(msg)
574 581 if msg['content']['status'] != 'ok':
575 582 error = ss.unwrap_exception(msg['content'])
576 583
577 584 if controller:
578 585 time.sleep(0.25)
579 586 self.session.send(self._query_socket, 'shutdown_request')
580 587 idents,msg = self.session.recv(self._query_socket, 0)
581 588 if self.debug:
582 589 pprint(msg)
583 590 if msg['content']['status'] != 'ok':
584 591 error = ss.unwrap_exception(msg['content'])
585 592
586 593 if error:
587 594 raise error
588 595
589 596 #--------------------------------------------------------------------------
590 597 # Execution methods
591 598 #--------------------------------------------------------------------------
592 599
593 600 @defaultblock
594 601 def execute(self, code, targets='all', block=None):
595 602 """Executes `code` on `targets` in blocking or nonblocking manner.
596 603
597 604 ``execute`` is always `bound` (affects engine namespace)
598 605
599 606 Parameters
600 607 ----------
601 608 code : str
602 609 the code string to be executed
603 610 targets : int/str/list of ints/strs
604 611 the engines on which to execute
605 612 default : all
606 613 block : bool
607 614 whether or not to wait until done to return
608 615 default: self.block
609 616 """
610 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
617 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
611 618 return result
612 619
613 620 def run(self, code, block=None):
614 621 """Runs `code` on an engine.
615 622
616 623 Calls to this are load-balanced.
617 624
618 625 ``run`` is never `bound` (no effect on engine namespace)
619 626
620 627 Parameters
621 628 ----------
622 629 code : str
623 630 the code string to be executed
624 631 block : bool
625 632 whether or not to wait until done
626 633
627 634 """
628 635 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
629 636 return result
630 637
631 638 def _maybe_raise(self, result):
632 639 """wrapper for maybe raising an exception if apply failed."""
633 640 if isinstance(result, error.RemoteError):
634 641 raise result
635 642
636 643 return result
637 644
638 645 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
639 646 after=None, follow=None):
640 647 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
641 648
642 649 This is the central execution command for the client.
643 650
644 651 Parameters
645 652 ----------
646 653
647 654 f : function
648 655 The fuction to be called remotely
649 656 args : tuple/list
650 657 The positional arguments passed to `f`
651 658 kwargs : dict
652 659 The keyword arguments passed to `f`
653 660 bound : bool (default: True)
654 661 Whether to execute in the Engine(s) namespace, or in a clean
655 662 namespace not affecting the engine.
656 663 block : bool (default: self.block)
657 664 Whether to wait for the result, or return immediately.
658 665 False:
659 666 returns msg_id(s)
660 667 if multiple targets:
661 668 list of ids
662 669 True:
663 670 returns actual result(s) of f(*args, **kwargs)
664 671 if multiple targets:
665 672 dict of results, by engine ID
666 673 targets : int,list of ints, 'all', None
667 674 Specify the destination of the job.
668 675 if None:
669 676 Submit via Task queue for load-balancing.
670 677 if 'all':
671 678 Run on all active engines
672 679 if list:
673 680 Run on each specified engine
674 681 if int:
675 682 Run on single engine
676 683
677 684 after : Dependency or collection of msg_ids
678 685 Only for load-balanced execution (targets=None)
679 686 Specify a list of msg_ids as a time-based dependency.
680 687 This job will only be run *after* the dependencies
681 688 have been met.
682 689
683 690 follow : Dependency or collection of msg_ids
684 691 Only for load-balanced execution (targets=None)
685 692 Specify a list of msg_ids as a location-based dependency.
686 693 This job will only be run on an engine where this dependency
687 694 is met.
688 695
689 696 Returns
690 697 -------
691 698 if block is False:
692 699 if single target:
693 700 return msg_id
694 701 else:
695 702 return list of msg_ids
696 703 ? (should this be dict like block=True) ?
697 704 else:
698 705 if single target:
699 706 return result of f(*args, **kwargs)
700 707 else:
701 708 return dict of results, keyed by engine
702 709 """
703 710
704 711 # defaults:
705 712 block = block if block is not None else self.block
706 713 args = args if args is not None else []
707 714 kwargs = kwargs if kwargs is not None else {}
708 715
709 716 # enforce types of f,args,kwrags
710 717 if not callable(f):
711 718 raise TypeError("f must be callable, not %s"%type(f))
712 719 if not isinstance(args, (tuple, list)):
713 720 raise TypeError("args must be tuple or list, not %s"%type(args))
714 721 if not isinstance(kwargs, dict):
715 722 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
716 723
717 724 options = dict(bound=bound, block=block, after=after, follow=follow)
718 725
719 726 if targets is None:
720 727 return self._apply_balanced(f, args, kwargs, **options)
721 728 else:
722 729 return self._apply_direct(f, args, kwargs, targets=targets, **options)
723 730
724 731 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
725 732 after=None, follow=None):
726 733 """The underlying method for applying functions in a load balanced
727 734 manner, via the task queue."""
728 735 if isinstance(after, Dependency):
729 736 after = after.as_dict()
730 737 elif after is None:
731 738 after = []
732 739 if isinstance(follow, Dependency):
733 740 follow = follow.as_dict()
734 741 elif follow is None:
735 742 follow = []
736 743 subheader = dict(after=after, follow=follow)
737 744
738 745 bufs = ss.pack_apply_message(f,args,kwargs)
739 746 content = dict(bound=bound)
740 747 msg = self.session.send(self._task_socket, "apply_request",
741 748 content=content, buffers=bufs, subheader=subheader)
742 749 msg_id = msg['msg_id']
743 750 self.outstanding.add(msg_id)
744 751 self.history.append(msg_id)
745 752 if block:
746 753 self.barrier(msg_id)
747 754 return self._maybe_raise(self.results[msg_id])
748 755 else:
749 756 return AsyncResult(self, [msg_id])
750 757
751 758 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
752 759 after=None, follow=None):
753 760 """Then underlying method for applying functions to specific engines
754 761 via the MUX queue."""
755 762
756 763 queues,targets = self._build_targets(targets)
757 764 bufs = ss.pack_apply_message(f,args,kwargs)
758 765 if isinstance(after, Dependency):
759 766 after = after.as_dict()
760 767 elif after is None:
761 768 after = []
762 769 if isinstance(follow, Dependency):
763 770 follow = follow.as_dict()
764 771 elif follow is None:
765 772 follow = []
766 773 subheader = dict(after=after, follow=follow)
767 774 content = dict(bound=bound)
768 775 msg_ids = []
769 776 for queue in queues:
770 777 msg = self.session.send(self._mux_socket, "apply_request",
771 778 content=content, buffers=bufs,ident=queue, subheader=subheader)
772 779 msg_id = msg['msg_id']
773 780 self.outstanding.add(msg_id)
774 781 self.history.append(msg_id)
775 782 msg_ids.append(msg_id)
776 783 if block:
777 784 self.barrier(msg_ids)
778 785 else:
779 786 return AsyncResult(self, msg_ids)
780 787 if len(msg_ids) == 1:
781 788 return self._maybe_raise(self.results[msg_ids[0]])
782 789 else:
783 790 result = {}
784 791 for target,mid in zip(targets, msg_ids):
785 792 result[target] = self.results[mid]
786 793 return error.collect_exceptions(result, f.__name__)
787 794
788 795 #--------------------------------------------------------------------------
789 796 # Map and decorators
790 797 #--------------------------------------------------------------------------
791 798
792 799 def map(self, f, *sequences):
793 800 """Parallel version of builtin `map`, using all our engines."""
794 801 pf = ParallelFunction(self, f, block=self.block,
795 802 bound=True, targets='all')
796 803 return pf.map(*sequences)
797 804
798 805 def parallel(self, bound=True, targets='all', block=True):
799 """Decorator for making a ParallelFunction"""
806 """Decorator for making a ParallelFunction."""
800 807 return parallel(self, bound=bound, targets=targets, block=block)
801 808
802 809 def remote(self, bound=True, targets='all', block=True):
803 """Decorator for making a RemoteFunction"""
810 """Decorator for making a RemoteFunction."""
804 811 return remote(self, bound=bound, targets=targets, block=block)
805 812
806 813 #--------------------------------------------------------------------------
807 814 # Data movement
808 815 #--------------------------------------------------------------------------
809 816
810 817 @defaultblock
811 818 def push(self, ns, targets='all', block=None):
812 819 """Push the contents of `ns` into the namespace on `target`"""
813 820 if not isinstance(ns, dict):
814 821 raise TypeError("Must be a dict, not %s"%type(ns))
815 822 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
816 823 return result
817 824
818 825 @defaultblock
819 def pull(self, keys, targets='all', block=True):
826 def pull(self, keys, targets='all', block=None):
820 827 """Pull objects from `target`'s namespace by `keys`"""
821 828 if isinstance(keys, str):
822 829 pass
823 830 elif isinstance(keys, (list,tuple,set)):
824 831 for key in keys:
825 832 if not isinstance(key, str):
826 833 raise TypeError
827 834 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
828 835 return result
829 836
830 @defaultblock
831 837 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
832 838 """
833 839 Partition a Python sequence and send the partitions to a set of engines.
834 840 """
841 block = block if block is not None else self.block
835 842 targets = self._build_targets(targets)[-1]
836 843 mapObject = Map.dists[dist]()
837 844 nparts = len(targets)
838 845 msg_ids = []
839 846 for index, engineid in enumerate(targets):
840 847 partition = mapObject.getPartition(seq, index, nparts)
841 848 if flatten and len(partition) == 1:
842 mid = self.push({key: partition[0]}, targets=engineid, block=False)
849 r = self.push({key: partition[0]}, targets=engineid, block=False)
843 850 else:
844 mid = self.push({key: partition}, targets=engineid, block=False)
845 msg_ids.append(mid)
851 r = self.push({key: partition}, targets=engineid, block=False)
852 msg_ids.extend(r._msg_ids)
846 853 r = AsyncResult(self, msg_ids)
847 854 if block:
848 r.wait()
849 return
855 return r.get()
850 856 else:
851 857 return r
852 858
853 @defaultblock
854 def gather(self, key, dist='b', targets='all', block=True):
859 def gather(self, key, dist='b', targets='all', block=None):
855 860 """
856 861 Gather a partitioned sequence on a set of engines as a single local seq.
857 862 """
863 block = block if block is not None else self.block
858 864
859 865 targets = self._build_targets(targets)[-1]
860 866 mapObject = Map.dists[dist]()
861 867 msg_ids = []
862 868 for index, engineid in enumerate(targets):
863 msg_ids.append(self.pull(key, targets=engineid,block=False))
869 msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids)
864 870
865 871 r = AsyncMapResult(self, msg_ids, mapObject)
866 872 if block:
867 r.wait()
868 return r.result
873 return r.get()
869 874 else:
870 875 return r
871 876
872 877 #--------------------------------------------------------------------------
873 878 # Query methods
874 879 #--------------------------------------------------------------------------
875 880
876 881 @spinfirst
877 882 def get_results(self, msg_ids, status_only=False):
878 883 """Returns the result of the execute or task request with `msg_ids`.
879 884
880 885 Parameters
881 886 ----------
882 887 msg_ids : list of ints or msg_ids
883 888 if int:
884 889 Passed as index to self.history for convenience.
885 890 status_only : bool (default: False)
886 891 if False:
887 892 return the actual results
888 893 """
889 894 if not isinstance(msg_ids, (list,tuple)):
890 895 msg_ids = [msg_ids]
891 896 theids = []
892 897 for msg_id in msg_ids:
893 898 if isinstance(msg_id, int):
894 899 msg_id = self.history[msg_id]
895 900 if not isinstance(msg_id, str):
896 901 raise TypeError("msg_ids must be str, not %r"%msg_id)
897 902 theids.append(msg_id)
898 903
899 904 completed = []
900 905 local_results = {}
901 906 for msg_id in list(theids):
902 907 if msg_id in self.results:
903 908 completed.append(msg_id)
904 909 local_results[msg_id] = self.results[msg_id]
905 910 theids.remove(msg_id)
906 911
907 912 if theids: # some not locally cached
908 913 content = dict(msg_ids=theids, status_only=status_only)
909 914 msg = self.session.send(self._query_socket, "result_request", content=content)
910 915 zmq.select([self._query_socket], [], [])
911 916 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
912 917 if self.debug:
913 918 pprint(msg)
914 919 content = msg['content']
915 920 if content['status'] != 'ok':
916 921 raise ss.unwrap_exception(content)
917 922 else:
918 923 content = dict(completed=[],pending=[])
919 924 if not status_only:
920 925 # load cached results into result:
921 926 content['completed'].extend(completed)
922 927 content.update(local_results)
923 928 # update cache with results:
924 929 for msg_id in msg_ids:
925 930 if msg_id in content['completed']:
926 931 self.results[msg_id] = content[msg_id]
927 932 return content
928 933
929 934 @spinfirst
930 935 def queue_status(self, targets=None, verbose=False):
931 936 """Fetch the status of engine queues.
932 937
933 938 Parameters
934 939 ----------
935 940 targets : int/str/list of ints/strs
936 941 the engines on which to execute
937 942 default : all
938 943 verbose : bool
939 944 Whether to return lengths only, or lists of ids for each element
940 945 """
941 946 targets = self._build_targets(targets)[1]
942 947 content = dict(targets=targets, verbose=verbose)
943 948 self.session.send(self._query_socket, "queue_request", content=content)
944 949 idents,msg = self.session.recv(self._query_socket, 0)
945 950 if self.debug:
946 951 pprint(msg)
947 952 content = msg['content']
948 953 status = content.pop('status')
949 954 if status != 'ok':
950 955 raise ss.unwrap_exception(content)
951 956 return content
952 957
953 958 @spinfirst
954 959 def purge_results(self, msg_ids=[], targets=[]):
955 960 """Tell the controller to forget results.
956 961
957 962 Individual results can be purged by msg_id, or the entire
958 963 history of specific targets can be purged.
959 964
960 965 Parameters
961 966 ----------
962 967 msg_ids : str or list of strs
963 968 the msg_ids whose results should be forgotten.
964 969 targets : int/str/list of ints/strs
965 970 The targets, by uuid or int_id, whose entire history is to be purged.
966 971 Use `targets='all'` to scrub everything from the controller's memory.
967 972
968 973 default : None
969 974 """
970 975 if not targets and not msg_ids:
971 976 raise ValueError
972 977 if targets:
973 978 targets = self._build_targets(targets)[1]
974 979 content = dict(targets=targets, msg_ids=msg_ids)
975 980 self.session.send(self._query_socket, "purge_request", content=content)
976 981 idents, msg = self.session.recv(self._query_socket, 0)
977 982 if self.debug:
978 983 pprint(msg)
979 984 content = msg['content']
980 985 if content['status'] != 'ok':
981 986 raise ss.unwrap_exception(content)
982 987
988 #----------------------------------------
989 # activate for %px,%autopx magics
990 #----------------------------------------
991 def activate(self):
992 """Make this `View` active for parallel magic commands.
993
994 IPython has a magic command syntax to work with `MultiEngineClient` objects.
995 In a given IPython session there is a single active one. While
996 there can be many `Views` created and used by the user,
997 there is only one active one. The active `View` is used whenever
998 the magic commands %px and %autopx are used.
999
1000 The activate() method is called on a given `View` to make it
1001 active. Once this has been done, the magic commands can be used.
1002 """
1003
1004 try:
1005 # This is injected into __builtins__.
1006 ip = get_ipython()
1007 except NameError:
1008 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1009 else:
1010 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1011 if pmagic is not None:
1012 pmagic.active_multiengine_client = self
1013 else:
1014 print "You must first load the parallelmagic extension " \
1015 "by doing '%load_ext parallelmagic'"
1016
983 1017 class AsynClient(Client):
984 1018 """An Asynchronous client, using the Tornado Event Loop.
985 1019 !!!unfinished!!!"""
986 1020 io_loop = None
987 1021 _queue_stream = None
988 1022 _notifier_stream = None
989 1023 _task_stream = None
990 1024 _control_stream = None
991 1025
992 1026 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
993 1027 Client.__init__(self, addr, context, username, debug)
994 1028 if io_loop is None:
995 1029 io_loop = ioloop.IOLoop.instance()
996 1030 self.io_loop = io_loop
997 1031
998 1032 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
999 1033 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1000 1034 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1001 1035 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1002 1036
1003 1037 def spin(self):
1004 1038 for stream in (self.queue_stream, self.notifier_stream,
1005 1039 self.task_stream, self.control_stream):
1006 1040 stream.flush()
1007 1041
1008 1042 __all__ = [ 'Client',
1009 1043 'depend',
1010 1044 'require',
1011 1045 'remote',
1012 1046 'parallel',
1013 1047 'RemoteFunction',
1014 1048 'ParallelFunction',
1015 1049 'DirectView',
1016 1050 'LoadBalancedView',
1017 1051 'AsyncResult',
1018 1052 'AsyncMapResult'
1019 1053 ]
@@ -1,298 +1,347 b''
1 1 """Views of remote engines"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from IPython.external.decorator import decorator
14 from IPython.zmq.parallel.remotefunction import ParallelFunction
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Decorators
18 18 #-----------------------------------------------------------------------------
19 19
20 20 @decorator
21 21 def myblock(f, self, *args, **kwargs):
22 22 """override client.block with self.block during a call"""
23 23 block = self.client.block
24 24 self.client.block = self.block
25 ret = f(self, *args, **kwargs)
26 self.client.block = block
25 try:
26 ret = f(self, *args, **kwargs)
27 finally:
28 self.client.block = block
27 29 return ret
28 30
29 31 @decorator
30 32 def save_ids(f, self, *args, **kwargs):
31 33 """Keep our history and outstanding attributes up to date after a method call."""
32 34 n_previous = len(self.client.history)
33 35 ret = f(self, *args, **kwargs)
34 36 nmsgs = len(self.client.history) - n_previous
35 37 msg_ids = self.client.history[-nmsgs:]
36 38 self.history.extend(msg_ids)
37 39 map(self.outstanding.add, msg_ids)
38 40 return ret
39 41
40 42 @decorator
41 43 def sync_results(f, self, *args, **kwargs):
42 44 """sync relevant results from self.client to our results attribute."""
43 45 ret = f(self, *args, **kwargs)
44 46 delta = self.outstanding.difference(self.client.outstanding)
45 47 completed = self.outstanding.intersection(delta)
46 48 self.outstanding = self.outstanding.difference(completed)
47 49 for msg_id in completed:
48 50 self.results[msg_id] = self.client.results[msg_id]
49 51 return ret
50 52
51 53 @decorator
52 54 def spin_after(f, self, *args, **kwargs):
53 55 """call spin after the method."""
54 56 ret = f(self, *args, **kwargs)
55 57 self.spin()
56 58 return ret
57 59
58 60 #-----------------------------------------------------------------------------
59 61 # Classes
60 62 #-----------------------------------------------------------------------------
61 63
62 64 class View(object):
63 65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
64 66
65 67 Don't use this class, use subclasses.
66 68 """
67 69 _targets = None
68 _ntargets = None
69 70 block=None
70 71 bound=None
71 72 history=None
72 73
73 74 def __init__(self, client, targets=None):
74 75 self.client = client
75 76 self._targets = targets
76 77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
77 78 self.block = client.block
78 self.bound=True
79 self.bound=False
79 80 self.history = []
80 81 self.outstanding = set()
81 82 self.results = {}
82 83
83 84 def __repr__(self):
84 85 strtargets = str(self._targets)
85 86 if len(strtargets) > 16:
86 87 strtargets = strtargets[:12]+'...]'
87 88 return "<%s %s>"%(self.__class__.__name__, strtargets)
88 89
89 90 @property
90 91 def targets(self):
91 92 return self._targets
92 93
93 94 @targets.setter
94 95 def targets(self, value):
95 raise AttributeError("Cannot set my targets argument after construction!")
96 self._targets = value
97 # raise AttributeError("Cannot set my targets argument after construction!")
96 98
97 99 @sync_results
98 100 def spin(self):
99 101 """spin the client, and sync"""
100 102 self.client.spin()
101 103
102 104 @sync_results
103 105 @save_ids
104 106 def apply(self, f, *args, **kwargs):
105 107 """calls f(*args, **kwargs) on remote engines, returning the result.
106 108
107 109 This method does not involve the engine's namespace.
108 110
109 111 if self.block is False:
110 112 returns msg_id
111 113 else:
112 114 returns actual result of f(*args, **kwargs)
113 115 """
114 116 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
115 117
116 118 @save_ids
117 119 def apply_async(self, f, *args, **kwargs):
118 120 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
119 121
120 122 This method does not involve the engine's namespace.
121 123
122 124 returns msg_id
123 125 """
124 126 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
125 127
126 128 @spin_after
127 129 @save_ids
128 130 def apply_sync(self, f, *args, **kwargs):
129 131 """calls f(*args, **kwargs) on remote engines in a blocking manner,
130 132 returning the result.
131 133
132 134 This method does not involve the engine's namespace.
133 135
134 136 returns: actual result of f(*args, **kwargs)
135 137 """
136 138 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
137 139
138 140 @sync_results
139 141 @save_ids
140 142 def apply_bound(self, f, *args, **kwargs):
141 143 """calls f(*args, **kwargs) bound to engine namespace(s).
142 144
143 145 if self.block is False:
144 146 returns msg_id
145 147 else:
146 148 returns actual result of f(*args, **kwargs)
147 149
148 150 This method has access to the targets' globals
149 151
150 152 """
151 153 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
152 154
153 155 @sync_results
154 156 @save_ids
155 157 def apply_async_bound(self, f, *args, **kwargs):
156 158 """calls f(*args, **kwargs) bound to engine namespace(s)
157 159 in a nonblocking manner.
158 160
159 161 returns: msg_id
160 162
161 163 This method has access to the targets' globals
162 164
163 165 """
164 166 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
165 167
166 168 @spin_after
167 169 @save_ids
168 170 def apply_sync_bound(self, f, *args, **kwargs):
169 171 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
170 172
171 173 returns: actual result of f(*args, **kwargs)
172 174
173 175 This method has access to the targets' globals
174 176
175 177 """
176 178 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
177 179
178 180 @spin_after
179 181 @save_ids
180 182 def map(self, f, *sequences):
181 183 """Parallel version of builtin `map`, using this view's engines."""
182 184 if isinstance(self.targets, int):
183 185 targets = [self.targets]
184 186 pf = ParallelFunction(self.client, f, block=self.block,
185 187 bound=True, targets=targets)
186 188 return pf.map(*sequences)
187 189
190 def parallel(self, bound=True, block=True):
191 """Decorator for making a ParallelFunction"""
192 return parallel(self.client, bound=bound, targets=self.targets, block=block)
193
188 194 def abort(self, msg_ids=None, block=None):
189 195 """Abort jobs on my engines.
190 196
191 197 Parameters
192 198 ----------
193 199
194 200 msg_ids : None, str, list of strs, optional
195 201 if None: abort all jobs.
196 202 else: abort specific msg_id(s).
197 203 """
198 204 block = block if block is not None else self.block
199 205 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
200 206
201 207 def queue_status(self, verbose=False):
202 208 """Fetch the Queue status of my engines"""
203 209 return self.client.queue_status(targets=self.targets, verbose=verbose)
204 210
205 def purge_results(self, msg_ids=[],targets=[]):
211 def purge_results(self, msg_ids=[], targets=[]):
206 212 """Instruct the controller to forget specific results."""
207 213 if targets is None or targets == 'all':
208 214 targets = self.targets
209 215 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
216
210 217
211 218
212 219 class DirectView(View):
213 220 """Direct Multiplexer View of one or more engines.
214 221
215 222 These are created via indexed access to a client:
216 223
217 224 >>> dv_1 = client[1]
218 225 >>> dv_all = client[:]
219 226 >>> dv_even = client[::2]
220 227 >>> dv_some = client[1:3]
221 228
229 This object provides dictionary access
230
222 231 """
223 232
233 @sync_results
234 @save_ids
235 def execute(self, code, block=True):
236 """execute some code on my targets."""
237 return self.client.execute(code, block=self.block, targets=self.targets)
238
224 239 def update(self, ns):
225 240 """update remote namespace with dict `ns`"""
226 241 return self.client.push(ns, targets=self.targets, block=self.block)
227 242
228 243 push = update
229 244
230 245 def get(self, key_s):
231 246 """get object(s) by `key_s` from remote namespace
232 247 will return one object if it is a key.
233 248 It also takes a list of keys, and will return a list of objects."""
234 249 # block = block if block is not None else self.block
235 250 return self.client.pull(key_s, block=True, targets=self.targets)
236 251
252 @sync_results
253 @save_ids
237 254 def pull(self, key_s, block=True):
238 255 """get object(s) by `key_s` from remote namespace
239 256 will return one object if it is a key.
240 257 It also takes a list of keys, and will return a list of objects."""
241 258 block = block if block is not None else self.block
242 259 return self.client.pull(key_s, block=block, targets=self.targets)
243 260
244 261 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
245 262 """
246 263 Partition a Python sequence and send the partitions to a set of engines.
247 264 """
248 265 block = block if block is not None else self.block
249 266 if targets is None:
250 267 targets = self.targets
251 268
252 269 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
253 270 targets=targets, block=block)
254 271
272 @sync_results
273 @save_ids
255 274 def gather(self, key, dist='b', targets=None, block=True):
256 275 """
257 276 Gather a partitioned sequence on a set of engines as a single local seq.
258 277 """
259 278 block = block if block is not None else self.block
260 279 if targets is None:
261 280 targets = self.targets
262 281
263 282 return self.client.gather(key, dist=dist, targets=targets, block=block)
264 283
265 284 def __getitem__(self, key):
266 285 return self.get(key)
267 286
268 287 def __setitem__(self,key, value):
269 288 self.update({key:value})
270 289
271 290 def clear(self, block=False):
272 291 """Clear the remote namespaces on my engines."""
273 292 block = block if block is not None else self.block
274 293 return self.client.clear(targets=self.targets, block=block)
275 294
276 295 def kill(self, block=True):
277 296 """Kill my engines."""
278 297 block = block if block is not None else self.block
279 298 return self.client.kill(targets=self.targets, block=block)
280 299
300 #----------------------------------------
301 # activate for %px,%autopx magics
302 #----------------------------------------
303 def activate(self):
304 """Make this `View` active for parallel magic commands.
305
306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
307 In a given IPython session there is a single active one. While
308 there can be many `Views` created and used by the user,
309 there is only one active one. The active `View` is used whenever
310 the magic commands %px and %autopx are used.
311
312 The activate() method is called on a given `View` to make it
313 active. Once this has been done, the magic commands can be used.
314 """
315
316 try:
317 # This is injected into __builtins__.
318 ip = get_ipython()
319 except NameError:
320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
321 else:
322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
323 if pmagic is not None:
324 pmagic.active_multiengine_client = self
325 else:
326 print "You must first load the parallelmagic extension " \
327 "by doing '%load_ext parallelmagic'"
328
329
281 330 class LoadBalancedView(View):
282 331 """An engine-agnostic View that only executes via the Task queue.
283 332
284 333 Typically created via:
285 334
286 335 >>> lbv = client[None]
287 336 <LoadBalancedView tcp://127.0.0.1:12345>
288 337
289 338 but can also be created with:
290 339
291 340 >>> lbc = LoadBalancedView(client)
292 341
293 342 TODO: allow subset of engines across which to balance.
294 343 """
295 344 def __repr__(self):
296 345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
297 346
298 347 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now