##// END OF EJS Templates
remove non-functional View.kill method...
MinRK -
Show More
@@ -1,1103 +1,1097 b''
1 1 """Views of remote engines.
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import imp
19 19 import sys
20 20 import warnings
21 21 from contextlib import contextmanager
22 22 from types import ModuleType
23 23
24 24 import zmq
25 25
26 26 from IPython.testing.skipdoctest import skip_doctest
27 27 from IPython.utils.traitlets import (
28 28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
29 29 )
30 30 from IPython.external.decorator import decorator
31 31
32 32 from IPython.parallel import util
33 33 from IPython.parallel.controller.dependency import Dependency, dependent
34 34
35 35 from . import map as Map
36 36 from .asyncresult import AsyncResult, AsyncMapResult
37 37 from .remotefunction import ParallelFunction, parallel, remote, getname
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Decorators
41 41 #-----------------------------------------------------------------------------
42 42
43 43 @decorator
44 44 def save_ids(f, self, *args, **kwargs):
45 45 """Keep our history and outstanding attributes up to date after a method call."""
46 46 n_previous = len(self.client.history)
47 47 try:
48 48 ret = f(self, *args, **kwargs)
49 49 finally:
50 50 nmsgs = len(self.client.history) - n_previous
51 51 msg_ids = self.client.history[-nmsgs:]
52 52 self.history.extend(msg_ids)
53 53 map(self.outstanding.add, msg_ids)
54 54 return ret
55 55
56 56 @decorator
57 57 def sync_results(f, self, *args, **kwargs):
58 58 """sync relevant results from self.client to our results attribute."""
59 59 ret = f(self, *args, **kwargs)
60 60 delta = self.outstanding.difference(self.client.outstanding)
61 61 completed = self.outstanding.intersection(delta)
62 62 self.outstanding = self.outstanding.difference(completed)
63 63 return ret
64 64
65 65 @decorator
66 66 def spin_after(f, self, *args, **kwargs):
67 67 """call spin after the method."""
68 68 ret = f(self, *args, **kwargs)
69 69 self.spin()
70 70 return ret
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Classes
74 74 #-----------------------------------------------------------------------------
75 75
76 76 @skip_doctest
77 77 class View(HasTraits):
78 78 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
79 79
80 80 Don't use this class, use subclasses.
81 81
82 82 Methods
83 83 -------
84 84
85 85 spin
86 86 flushes incoming results and registration state changes
87 87 control methods spin, and requesting `ids` also ensures up to date
88 88
89 89 wait
90 90 wait on one or more msg_ids
91 91
92 92 execution methods
93 93 apply
94 94 legacy: execute, run
95 95
96 96 data movement
97 97 push, pull, scatter, gather
98 98
99 99 query methods
100 100 get_result, queue_status, purge_results, result_status
101 101
102 102 control methods
103 103 abort, shutdown
104 104
105 105 """
106 106 # flags
107 107 block=Bool(False)
108 108 track=Bool(True)
109 109 targets = Any()
110 110
111 111 history=List()
112 112 outstanding = Set()
113 113 results = Dict()
114 114 client = Instance('IPython.parallel.Client')
115 115
116 116 _socket = Instance('zmq.Socket')
117 117 _flag_names = List(['targets', 'block', 'track'])
118 118 _targets = Any()
119 119 _idents = Any()
120 120
121 121 def __init__(self, client=None, socket=None, **flags):
122 122 super(View, self).__init__(client=client, _socket=socket)
123 123 self.results = client.results
124 124 self.block = client.block
125 125
126 126 self.set_flags(**flags)
127 127
128 128 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
129 129
130 130 def __repr__(self):
131 131 strtargets = str(self.targets)
132 132 if len(strtargets) > 16:
133 133 strtargets = strtargets[:12]+'...]'
134 134 return "<%s %s>"%(self.__class__.__name__, strtargets)
135 135
136 136 def __len__(self):
137 137 if isinstance(self.targets, list):
138 138 return len(self.targets)
139 139 elif isinstance(self.targets, int):
140 140 return 1
141 141 else:
142 142 return len(self.client)
143 143
144 144 def set_flags(self, **kwargs):
145 145 """set my attribute flags by keyword.
146 146
147 147 Views determine behavior with a few attributes (`block`, `track`, etc.).
148 148 These attributes can be set all at once by name with this method.
149 149
150 150 Parameters
151 151 ----------
152 152
153 153 block : bool
154 154 whether to wait for results
155 155 track : bool
156 156 whether to create a MessageTracker to allow the user to
157 157 safely edit after arrays and buffers during non-copying
158 158 sends.
159 159 """
160 160 for name, value in kwargs.iteritems():
161 161 if name not in self._flag_names:
162 162 raise KeyError("Invalid name: %r"%name)
163 163 else:
164 164 setattr(self, name, value)
165 165
166 166 @contextmanager
167 167 def temp_flags(self, **kwargs):
168 168 """temporarily set flags, for use in `with` statements.
169 169
170 170 See set_flags for permanent setting of flags
171 171
172 172 Examples
173 173 --------
174 174
175 175 >>> view.track=False
176 176 ...
177 177 >>> with view.temp_flags(track=True):
178 178 ... ar = view.apply(dostuff, my_big_array)
179 179 ... ar.tracker.wait() # wait for send to finish
180 180 >>> view.track
181 181 False
182 182
183 183 """
184 184 # preflight: save flags, and set temporaries
185 185 saved_flags = {}
186 186 for f in self._flag_names:
187 187 saved_flags[f] = getattr(self, f)
188 188 self.set_flags(**kwargs)
189 189 # yield to the with-statement block
190 190 try:
191 191 yield
192 192 finally:
193 193 # postflight: restore saved flags
194 194 self.set_flags(**saved_flags)
195 195
196 196
197 197 #----------------------------------------------------------------
198 198 # apply
199 199 #----------------------------------------------------------------
200 200
201 201 @sync_results
202 202 @save_ids
203 203 def _really_apply(self, f, args, kwargs, block=None, **options):
204 204 """wrapper for client.send_apply_request"""
205 205 raise NotImplementedError("Implement in subclasses")
206 206
207 207 def apply(self, f, *args, **kwargs):
208 208 """calls f(*args, **kwargs) on remote engines, returning the result.
209 209
210 210 This method sets all apply flags via this View's attributes.
211 211
212 212 if self.block is False:
213 213 returns AsyncResult
214 214 else:
215 215 returns actual result of f(*args, **kwargs)
216 216 """
217 217 return self._really_apply(f, args, kwargs)
218 218
219 219 def apply_async(self, f, *args, **kwargs):
220 220 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
221 221
222 222 returns AsyncResult
223 223 """
224 224 return self._really_apply(f, args, kwargs, block=False)
225 225
226 226 @spin_after
227 227 def apply_sync(self, f, *args, **kwargs):
228 228 """calls f(*args, **kwargs) on remote engines in a blocking manner,
229 229 returning the result.
230 230
231 231 returns: actual result of f(*args, **kwargs)
232 232 """
233 233 return self._really_apply(f, args, kwargs, block=True)
234 234
235 235 #----------------------------------------------------------------
236 236 # wrappers for client and control methods
237 237 #----------------------------------------------------------------
238 238 @sync_results
239 239 def spin(self):
240 240 """spin the client, and sync"""
241 241 self.client.spin()
242 242
243 243 @sync_results
244 244 def wait(self, jobs=None, timeout=-1):
245 245 """waits on one or more `jobs`, for up to `timeout` seconds.
246 246
247 247 Parameters
248 248 ----------
249 249
250 250 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
251 251 ints are indices to self.history
252 252 strs are msg_ids
253 253 default: wait on all outstanding messages
254 254 timeout : float
255 255 a time in seconds, after which to give up.
256 256 default is -1, which means no timeout
257 257
258 258 Returns
259 259 -------
260 260
261 261 True : when all msg_ids are done
262 262 False : timeout reached, some msg_ids still outstanding
263 263 """
264 264 if jobs is None:
265 265 jobs = self.history
266 266 return self.client.wait(jobs, timeout)
267 267
268 268 def abort(self, jobs=None, targets=None, block=None):
269 269 """Abort jobs on my engines.
270 270
271 271 Parameters
272 272 ----------
273 273
274 274 jobs : None, str, list of strs, optional
275 275 if None: abort all jobs.
276 276 else: abort specific msg_id(s).
277 277 """
278 278 block = block if block is not None else self.block
279 279 targets = targets if targets is not None else self.targets
280 280 jobs = jobs if jobs is not None else list(self.outstanding)
281 281
282 282 return self.client.abort(jobs=jobs, targets=targets, block=block)
283 283
284 284 def queue_status(self, targets=None, verbose=False):
285 285 """Fetch the Queue status of my engines"""
286 286 targets = targets if targets is not None else self.targets
287 287 return self.client.queue_status(targets=targets, verbose=verbose)
288 288
289 289 def purge_results(self, jobs=[], targets=[]):
290 290 """Instruct the controller to forget specific results."""
291 291 if targets is None or targets == 'all':
292 292 targets = self.targets
293 293 return self.client.purge_results(jobs=jobs, targets=targets)
294 294
295 295 def shutdown(self, targets=None, restart=False, hub=False, block=None):
296 296 """Terminates one or more engine processes, optionally including the hub.
297 297 """
298 298 block = self.block if block is None else block
299 299 if targets is None or targets == 'all':
300 300 targets = self.targets
301 301 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
302 302
303 303 @spin_after
304 304 def get_result(self, indices_or_msg_ids=None):
305 305 """return one or more results, specified by history index or msg_id.
306 306
307 307 See client.get_result for details.
308 308
309 309 """
310 310
311 311 if indices_or_msg_ids is None:
312 312 indices_or_msg_ids = -1
313 313 if isinstance(indices_or_msg_ids, int):
314 314 indices_or_msg_ids = self.history[indices_or_msg_ids]
315 315 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
316 316 indices_or_msg_ids = list(indices_or_msg_ids)
317 317 for i,index in enumerate(indices_or_msg_ids):
318 318 if isinstance(index, int):
319 319 indices_or_msg_ids[i] = self.history[index]
320 320 return self.client.get_result(indices_or_msg_ids)
321 321
322 322 #-------------------------------------------------------------------
323 323 # Map
324 324 #-------------------------------------------------------------------
325 325
326 326 def map(self, f, *sequences, **kwargs):
327 327 """override in subclasses"""
328 328 raise NotImplementedError
329 329
330 330 def map_async(self, f, *sequences, **kwargs):
331 331 """Parallel version of builtin `map`, using this view's engines.
332 332
333 333 This is equivalent to map(...block=False)
334 334
335 335 See `self.map` for details.
336 336 """
337 337 if 'block' in kwargs:
338 338 raise TypeError("map_async doesn't take a `block` keyword argument.")
339 339 kwargs['block'] = False
340 340 return self.map(f,*sequences,**kwargs)
341 341
342 342 def map_sync(self, f, *sequences, **kwargs):
343 343 """Parallel version of builtin `map`, using this view's engines.
344 344
345 345 This is equivalent to map(...block=True)
346 346
347 347 See `self.map` for details.
348 348 """
349 349 if 'block' in kwargs:
350 350 raise TypeError("map_sync doesn't take a `block` keyword argument.")
351 351 kwargs['block'] = True
352 352 return self.map(f,*sequences,**kwargs)
353 353
354 354 def imap(self, f, *sequences, **kwargs):
355 355 """Parallel version of `itertools.imap`.
356 356
357 357 See `self.map` for details.
358 358
359 359 """
360 360
361 361 return iter(self.map_async(f,*sequences, **kwargs))
362 362
363 363 #-------------------------------------------------------------------
364 364 # Decorators
365 365 #-------------------------------------------------------------------
366 366
367 367 def remote(self, block=True, **flags):
368 368 """Decorator for making a RemoteFunction"""
369 369 block = self.block if block is None else block
370 370 return remote(self, block=block, **flags)
371 371
372 372 def parallel(self, dist='b', block=None, **flags):
373 373 """Decorator for making a ParallelFunction"""
374 374 block = self.block if block is None else block
375 375 return parallel(self, dist=dist, block=block, **flags)
376 376
377 377 @skip_doctest
378 378 class DirectView(View):
379 379 """Direct Multiplexer View of one or more engines.
380 380
381 381 These are created via indexed access to a client:
382 382
383 383 >>> dv_1 = client[1]
384 384 >>> dv_all = client[:]
385 385 >>> dv_even = client[::2]
386 386 >>> dv_some = client[1:3]
387 387
388 388 This object provides dictionary access to engine namespaces:
389 389
390 390 # push a=5:
391 391 >>> dv['a'] = 5
392 392 # pull 'foo':
393 393 >>> db['foo']
394 394
395 395 """
396 396
397 397 def __init__(self, client=None, socket=None, targets=None):
398 398 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
399 399
400 400 @property
401 401 def importer(self):
402 402 """sync_imports(local=True) as a property.
403 403
404 404 See sync_imports for details.
405 405
406 406 """
407 407 return self.sync_imports(True)
408 408
409 409 @contextmanager
410 410 def sync_imports(self, local=True, quiet=False):
411 411 """Context Manager for performing simultaneous local and remote imports.
412 412
413 413 'import x as y' will *not* work. The 'as y' part will simply be ignored.
414 414
415 415 If `local=True`, then the package will also be imported locally.
416 416
417 417 If `quiet=True`, no output will be produced when attempting remote
418 418 imports.
419 419
420 420 Note that remote-only (`local=False`) imports have not been implemented.
421 421
422 422 >>> with view.sync_imports():
423 423 ... from numpy import recarray
424 424 importing recarray from numpy on engine(s)
425 425
426 426 """
427 427 import __builtin__
428 428 local_import = __builtin__.__import__
429 429 modules = set()
430 430 results = []
431 431 @util.interactive
432 432 def remote_import(name, fromlist, level):
433 433 """the function to be passed to apply, that actually performs the import
434 434 on the engine, and loads up the user namespace.
435 435 """
436 436 import sys
437 437 user_ns = globals()
438 438 mod = __import__(name, fromlist=fromlist, level=level)
439 439 if fromlist:
440 440 for key in fromlist:
441 441 user_ns[key] = getattr(mod, key)
442 442 else:
443 443 user_ns[name] = sys.modules[name]
444 444
445 445 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
446 446 """the drop-in replacement for __import__, that optionally imports
447 447 locally as well.
448 448 """
449 449 # don't override nested imports
450 450 save_import = __builtin__.__import__
451 451 __builtin__.__import__ = local_import
452 452
453 453 if imp.lock_held():
454 454 # this is a side-effect import, don't do it remotely, or even
455 455 # ignore the local effects
456 456 return local_import(name, globals, locals, fromlist, level)
457 457
458 458 imp.acquire_lock()
459 459 if local:
460 460 mod = local_import(name, globals, locals, fromlist, level)
461 461 else:
462 462 raise NotImplementedError("remote-only imports not yet implemented")
463 463 imp.release_lock()
464 464
465 465 key = name+':'+','.join(fromlist or [])
466 466 if level == -1 and key not in modules:
467 467 modules.add(key)
468 468 if not quiet:
469 469 if fromlist:
470 470 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
471 471 else:
472 472 print "importing %s on engine(s)"%name
473 473 results.append(self.apply_async(remote_import, name, fromlist, level))
474 474 # restore override
475 475 __builtin__.__import__ = save_import
476 476
477 477 return mod
478 478
479 479 # override __import__
480 480 __builtin__.__import__ = view_import
481 481 try:
482 482 # enter the block
483 483 yield
484 484 except ImportError:
485 485 if local:
486 486 raise
487 487 else:
488 488 # ignore import errors if not doing local imports
489 489 pass
490 490 finally:
491 491 # always restore __import__
492 492 __builtin__.__import__ = local_import
493 493
494 494 for r in results:
495 495 # raise possible remote ImportErrors here
496 496 r.get()
497 497
498 498
499 499 @sync_results
500 500 @save_ids
501 501 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
502 502 """calls f(*args, **kwargs) on remote engines, returning the result.
503 503
504 504 This method sets all of `apply`'s flags via this View's attributes.
505 505
506 506 Parameters
507 507 ----------
508 508
509 509 f : callable
510 510
511 511 args : list [default: empty]
512 512
513 513 kwargs : dict [default: empty]
514 514
515 515 targets : target list [default: self.targets]
516 516 where to run
517 517 block : bool [default: self.block]
518 518 whether to block
519 519 track : bool [default: self.track]
520 520 whether to ask zmq to track the message, for safe non-copying sends
521 521
522 522 Returns
523 523 -------
524 524
525 525 if self.block is False:
526 526 returns AsyncResult
527 527 else:
528 528 returns actual result of f(*args, **kwargs) on the engine(s)
529 529 This will be a list of self.targets is also a list (even length 1), or
530 530 the single result if self.targets is an integer engine id
531 531 """
532 532 args = [] if args is None else args
533 533 kwargs = {} if kwargs is None else kwargs
534 534 block = self.block if block is None else block
535 535 track = self.track if track is None else track
536 536 targets = self.targets if targets is None else targets
537 537
538 538 _idents = self.client._build_targets(targets)[0]
539 539 msg_ids = []
540 540 trackers = []
541 541 for ident in _idents:
542 542 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
543 543 ident=ident)
544 544 if track:
545 545 trackers.append(msg['tracker'])
546 546 msg_ids.append(msg['header']['msg_id'])
547 547 tracker = None if track is False else zmq.MessageTracker(*trackers)
548 548 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
549 549 if block:
550 550 try:
551 551 return ar.get()
552 552 except KeyboardInterrupt:
553 553 pass
554 554 return ar
555 555
556 556
557 557 @spin_after
558 558 def map(self, f, *sequences, **kwargs):
559 559 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
560 560
561 561 Parallel version of builtin `map`, using this View's `targets`.
562 562
563 563 There will be one task per target, so work will be chunked
564 564 if the sequences are longer than `targets`.
565 565
566 566 Results can be iterated as they are ready, but will become available in chunks.
567 567
568 568 Parameters
569 569 ----------
570 570
571 571 f : callable
572 572 function to be mapped
573 573 *sequences: one or more sequences of matching length
574 574 the sequences to be distributed and passed to `f`
575 575 block : bool
576 576 whether to wait for the result or not [default self.block]
577 577
578 578 Returns
579 579 -------
580 580
581 581 if block=False:
582 582 AsyncMapResult
583 583 An object like AsyncResult, but which reassembles the sequence of results
584 584 into a single list. AsyncMapResults can be iterated through before all
585 585 results are complete.
586 586 else:
587 587 list
588 588 the result of map(f,*sequences)
589 589 """
590 590
591 591 block = kwargs.pop('block', self.block)
592 592 for k in kwargs.keys():
593 593 if k not in ['block', 'track']:
594 594 raise TypeError("invalid keyword arg, %r"%k)
595 595
596 596 assert len(sequences) > 0, "must have some sequences to map onto!"
597 597 pf = ParallelFunction(self, f, block=block, **kwargs)
598 598 return pf.map(*sequences)
599 599
600 600 @sync_results
601 601 @save_ids
602 602 def execute(self, code, silent=True, targets=None, block=None):
603 603 """Executes `code` on `targets` in blocking or nonblocking manner.
604 604
605 605 ``execute`` is always `bound` (affects engine namespace)
606 606
607 607 Parameters
608 608 ----------
609 609
610 610 code : str
611 611 the code string to be executed
612 612 block : bool
613 613 whether or not to wait until done to return
614 614 default: self.block
615 615 """
616 616 block = self.block if block is None else block
617 617 targets = self.targets if targets is None else targets
618 618
619 619 _idents = self.client._build_targets(targets)[0]
620 620 msg_ids = []
621 621 trackers = []
622 622 for ident in _idents:
623 623 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
624 624 msg_ids.append(msg['header']['msg_id'])
625 625 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
626 626 if block:
627 627 try:
628 628 ar.get()
629 629 except KeyboardInterrupt:
630 630 pass
631 631 return ar
632 632
633 633 def run(self, filename, targets=None, block=None):
634 634 """Execute contents of `filename` on my engine(s).
635 635
636 636 This simply reads the contents of the file and calls `execute`.
637 637
638 638 Parameters
639 639 ----------
640 640
641 641 filename : str
642 642 The path to the file
643 643 targets : int/str/list of ints/strs
644 644 the engines on which to execute
645 645 default : all
646 646 block : bool
647 647 whether or not to wait until done
648 648 default: self.block
649 649
650 650 """
651 651 with open(filename, 'r') as f:
652 652 # add newline in case of trailing indented whitespace
653 653 # which will cause SyntaxError
654 654 code = f.read()+'\n'
655 655 return self.execute(code, block=block, targets=targets)
656 656
657 657 def update(self, ns):
658 658 """update remote namespace with dict `ns`
659 659
660 660 See `push` for details.
661 661 """
662 662 return self.push(ns, block=self.block, track=self.track)
663 663
664 664 def push(self, ns, targets=None, block=None, track=None):
665 665 """update remote namespace with dict `ns`
666 666
667 667 Parameters
668 668 ----------
669 669
670 670 ns : dict
671 671 dict of keys with which to update engine namespace(s)
672 672 block : bool [default : self.block]
673 673 whether to wait to be notified of engine receipt
674 674
675 675 """
676 676
677 677 block = block if block is not None else self.block
678 678 track = track if track is not None else self.track
679 679 targets = targets if targets is not None else self.targets
680 680 # applier = self.apply_sync if block else self.apply_async
681 681 if not isinstance(ns, dict):
682 682 raise TypeError("Must be a dict, not %s"%type(ns))
683 683 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
684 684
685 685 def get(self, key_s):
686 686 """get object(s) by `key_s` from remote namespace
687 687
688 688 see `pull` for details.
689 689 """
690 690 # block = block if block is not None else self.block
691 691 return self.pull(key_s, block=True)
692 692
693 693 def pull(self, names, targets=None, block=None):
694 694 """get object(s) by `name` from remote namespace
695 695
696 696 will return one object if it is a key.
697 697 can also take a list of keys, in which case it will return a list of objects.
698 698 """
699 699 block = block if block is not None else self.block
700 700 targets = targets if targets is not None else self.targets
701 701 applier = self.apply_sync if block else self.apply_async
702 702 if isinstance(names, basestring):
703 703 pass
704 704 elif isinstance(names, (list,tuple,set)):
705 705 for key in names:
706 706 if not isinstance(key, basestring):
707 707 raise TypeError("keys must be str, not type %r"%type(key))
708 708 else:
709 709 raise TypeError("names must be strs, not %r"%names)
710 710 return self._really_apply(util._pull, (names,), block=block, targets=targets)
711 711
712 712 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
713 713 """
714 714 Partition a Python sequence and send the partitions to a set of engines.
715 715 """
716 716 block = block if block is not None else self.block
717 717 track = track if track is not None else self.track
718 718 targets = targets if targets is not None else self.targets
719 719
720 720 # construct integer ID list:
721 721 targets = self.client._build_targets(targets)[1]
722 722
723 723 mapObject = Map.dists[dist]()
724 724 nparts = len(targets)
725 725 msg_ids = []
726 726 trackers = []
727 727 for index, engineid in enumerate(targets):
728 728 partition = mapObject.getPartition(seq, index, nparts)
729 729 if flatten and len(partition) == 1:
730 730 ns = {key: partition[0]}
731 731 else:
732 732 ns = {key: partition}
733 733 r = self.push(ns, block=False, track=track, targets=engineid)
734 734 msg_ids.extend(r.msg_ids)
735 735 if track:
736 736 trackers.append(r._tracker)
737 737
738 738 if track:
739 739 tracker = zmq.MessageTracker(*trackers)
740 740 else:
741 741 tracker = None
742 742
743 743 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
744 744 if block:
745 745 r.wait()
746 746 else:
747 747 return r
748 748
749 749 @sync_results
750 750 @save_ids
751 751 def gather(self, key, dist='b', targets=None, block=None):
752 752 """
753 753 Gather a partitioned sequence on a set of engines as a single local seq.
754 754 """
755 755 block = block if block is not None else self.block
756 756 targets = targets if targets is not None else self.targets
757 757 mapObject = Map.dists[dist]()
758 758 msg_ids = []
759 759
760 760 # construct integer ID list:
761 761 targets = self.client._build_targets(targets)[1]
762 762
763 763 for index, engineid in enumerate(targets):
764 764 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
765 765
766 766 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
767 767
768 768 if block:
769 769 try:
770 770 return r.get()
771 771 except KeyboardInterrupt:
772 772 pass
773 773 return r
774 774
775 775 def __getitem__(self, key):
776 776 return self.get(key)
777 777
778 778 def __setitem__(self,key, value):
779 779 self.update({key:value})
780 780
781 781 def clear(self, targets=None, block=False):
782 782 """Clear the remote namespaces on my engines."""
783 783 block = block if block is not None else self.block
784 784 targets = targets if targets is not None else self.targets
785 785 return self.client.clear(targets=targets, block=block)
786 786
787 def kill(self, targets=None, block=True):
788 """Kill my engines."""
789 block = block if block is not None else self.block
790 targets = targets if targets is not None else self.targets
791 return self.client.kill(targets=targets, block=block)
792
793 787 #----------------------------------------
794 788 # activate for %px, %autopx, etc. magics
795 789 #----------------------------------------
796 790
797 791 def activate(self, suffix=''):
798 792 """Activate IPython magics associated with this View
799 793
800 794 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
801 795
802 796 Parameters
803 797 ----------
804 798
805 799 suffix: str [default: '']
806 800 The suffix, if any, for the magics. This allows you to have
807 801 multiple views associated with parallel magics at the same time.
808 802
809 803 e.g. ``rc[::2].activate(suffix='_even')`` will give you
810 804 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
811 805 on the even engines.
812 806 """
813 807
814 808 from IPython.parallel.client.magics import ParallelMagics
815 809
816 810 try:
817 811 # This is injected into __builtins__.
818 812 ip = get_ipython()
819 813 except NameError:
820 814 print "The IPython parallel magics (%px, etc.) only work within IPython."
821 815 return
822 816
823 817 M = ParallelMagics(ip, self, suffix)
824 818 ip.magics_manager.register(M)
825 819
826 820
827 821 @skip_doctest
828 822 class LoadBalancedView(View):
829 823 """An load-balancing View that only executes via the Task scheduler.
830 824
831 825 Load-balanced views can be created with the client's `view` method:
832 826
833 827 >>> v = client.load_balanced_view()
834 828
835 829 or targets can be specified, to restrict the potential destinations:
836 830
837 831 >>> v = client.client.load_balanced_view([1,3])
838 832
839 833 which would restrict loadbalancing to between engines 1 and 3.
840 834
841 835 """
842 836
843 837 follow=Any()
844 838 after=Any()
845 839 timeout=CFloat()
846 840 retries = Integer(0)
847 841
848 842 _task_scheme = Any()
849 843 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
850 844
851 845 def __init__(self, client=None, socket=None, **flags):
852 846 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
853 847 self._task_scheme=client._task_scheme
854 848
855 849 def _validate_dependency(self, dep):
856 850 """validate a dependency.
857 851
858 852 For use in `set_flags`.
859 853 """
860 854 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
861 855 return True
862 856 elif isinstance(dep, (list,set, tuple)):
863 857 for d in dep:
864 858 if not isinstance(d, (basestring, AsyncResult)):
865 859 return False
866 860 elif isinstance(dep, dict):
867 861 if set(dep.keys()) != set(Dependency().as_dict().keys()):
868 862 return False
869 863 if not isinstance(dep['msg_ids'], list):
870 864 return False
871 865 for d in dep['msg_ids']:
872 866 if not isinstance(d, basestring):
873 867 return False
874 868 else:
875 869 return False
876 870
877 871 return True
878 872
879 873 def _render_dependency(self, dep):
880 874 """helper for building jsonable dependencies from various input forms."""
881 875 if isinstance(dep, Dependency):
882 876 return dep.as_dict()
883 877 elif isinstance(dep, AsyncResult):
884 878 return dep.msg_ids
885 879 elif dep is None:
886 880 return []
887 881 else:
888 882 # pass to Dependency constructor
889 883 return list(Dependency(dep))
890 884
891 885 def set_flags(self, **kwargs):
892 886 """set my attribute flags by keyword.
893 887
894 888 A View is a wrapper for the Client's apply method, but with attributes
895 889 that specify keyword arguments, those attributes can be set by keyword
896 890 argument with this method.
897 891
898 892 Parameters
899 893 ----------
900 894
901 895 block : bool
902 896 whether to wait for results
903 897 track : bool
904 898 whether to create a MessageTracker to allow the user to
905 899 safely edit after arrays and buffers during non-copying
906 900 sends.
907 901
908 902 after : Dependency or collection of msg_ids
909 903 Only for load-balanced execution (targets=None)
910 904 Specify a list of msg_ids as a time-based dependency.
911 905 This job will only be run *after* the dependencies
912 906 have been met.
913 907
914 908 follow : Dependency or collection of msg_ids
915 909 Only for load-balanced execution (targets=None)
916 910 Specify a list of msg_ids as a location-based dependency.
917 911 This job will only be run on an engine where this dependency
918 912 is met.
919 913
920 914 timeout : float/int or None
921 915 Only for load-balanced execution (targets=None)
922 916 Specify an amount of time (in seconds) for the scheduler to
923 917 wait for dependencies to be met before failing with a
924 918 DependencyTimeout.
925 919
926 920 retries : int
927 921 Number of times a task will be retried on failure.
928 922 """
929 923
930 924 super(LoadBalancedView, self).set_flags(**kwargs)
931 925 for name in ('follow', 'after'):
932 926 if name in kwargs:
933 927 value = kwargs[name]
934 928 if self._validate_dependency(value):
935 929 setattr(self, name, value)
936 930 else:
937 931 raise ValueError("Invalid dependency: %r"%value)
938 932 if 'timeout' in kwargs:
939 933 t = kwargs['timeout']
940 934 if not isinstance(t, (int, long, float, type(None))):
941 935 raise TypeError("Invalid type for timeout: %r"%type(t))
942 936 if t is not None:
943 937 if t < 0:
944 938 raise ValueError("Invalid timeout: %s"%t)
945 939 self.timeout = t
946 940
947 941 @sync_results
948 942 @save_ids
949 943 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
950 944 after=None, follow=None, timeout=None,
951 945 targets=None, retries=None):
952 946 """calls f(*args, **kwargs) on a remote engine, returning the result.
953 947
954 948 This method temporarily sets all of `apply`'s flags for a single call.
955 949
956 950 Parameters
957 951 ----------
958 952
959 953 f : callable
960 954
961 955 args : list [default: empty]
962 956
963 957 kwargs : dict [default: empty]
964 958
965 959 block : bool [default: self.block]
966 960 whether to block
967 961 track : bool [default: self.track]
968 962 whether to ask zmq to track the message, for safe non-copying sends
969 963
970 964 !!!!!! TODO: THE REST HERE !!!!
971 965
972 966 Returns
973 967 -------
974 968
975 969 if self.block is False:
976 970 returns AsyncResult
977 971 else:
978 972 returns actual result of f(*args, **kwargs) on the engine(s)
979 973 This will be a list of self.targets is also a list (even length 1), or
980 974 the single result if self.targets is an integer engine id
981 975 """
982 976
983 977 # validate whether we can run
984 978 if self._socket.closed:
985 979 msg = "Task farming is disabled"
986 980 if self._task_scheme == 'pure':
987 981 msg += " because the pure ZMQ scheduler cannot handle"
988 982 msg += " disappearing engines."
989 983 raise RuntimeError(msg)
990 984
991 985 if self._task_scheme == 'pure':
992 986 # pure zmq scheme doesn't support extra features
993 987 msg = "Pure ZMQ scheduler doesn't support the following flags:"
994 988 "follow, after, retries, targets, timeout"
995 989 if (follow or after or retries or targets or timeout):
996 990 # hard fail on Scheduler flags
997 991 raise RuntimeError(msg)
998 992 if isinstance(f, dependent):
999 993 # soft warn on functional dependencies
1000 994 warnings.warn(msg, RuntimeWarning)
1001 995
1002 996 # build args
1003 997 args = [] if args is None else args
1004 998 kwargs = {} if kwargs is None else kwargs
1005 999 block = self.block if block is None else block
1006 1000 track = self.track if track is None else track
1007 1001 after = self.after if after is None else after
1008 1002 retries = self.retries if retries is None else retries
1009 1003 follow = self.follow if follow is None else follow
1010 1004 timeout = self.timeout if timeout is None else timeout
1011 1005 targets = self.targets if targets is None else targets
1012 1006
1013 1007 if not isinstance(retries, int):
1014 1008 raise TypeError('retries must be int, not %r'%type(retries))
1015 1009
1016 1010 if targets is None:
1017 1011 idents = []
1018 1012 else:
1019 1013 idents = self.client._build_targets(targets)[0]
1020 1014 # ensure *not* bytes
1021 1015 idents = [ ident.decode() for ident in idents ]
1022 1016
1023 1017 after = self._render_dependency(after)
1024 1018 follow = self._render_dependency(follow)
1025 1019 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1026 1020
1027 1021 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1028 1022 metadata=metadata)
1029 1023 tracker = None if track is False else msg['tracker']
1030 1024
1031 1025 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1032 1026
1033 1027 if block:
1034 1028 try:
1035 1029 return ar.get()
1036 1030 except KeyboardInterrupt:
1037 1031 pass
1038 1032 return ar
1039 1033
1040 1034 @spin_after
1041 1035 @save_ids
1042 1036 def map(self, f, *sequences, **kwargs):
1043 1037 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1044 1038
1045 1039 Parallel version of builtin `map`, load-balanced by this View.
1046 1040
1047 1041 `block`, and `chunksize` can be specified by keyword only.
1048 1042
1049 1043 Each `chunksize` elements will be a separate task, and will be
1050 1044 load-balanced. This lets individual elements be available for iteration
1051 1045 as soon as they arrive.
1052 1046
1053 1047 Parameters
1054 1048 ----------
1055 1049
1056 1050 f : callable
1057 1051 function to be mapped
1058 1052 *sequences: one or more sequences of matching length
1059 1053 the sequences to be distributed and passed to `f`
1060 1054 block : bool [default self.block]
1061 1055 whether to wait for the result or not
1062 1056 track : bool
1063 1057 whether to create a MessageTracker to allow the user to
1064 1058 safely edit after arrays and buffers during non-copying
1065 1059 sends.
1066 1060 chunksize : int [default 1]
1067 1061 how many elements should be in each task.
1068 1062 ordered : bool [default True]
1069 1063 Whether the results should be gathered as they arrive, or enforce
1070 1064 the order of submission.
1071 1065
1072 1066 Only applies when iterating through AsyncMapResult as results arrive.
1073 1067 Has no effect when block=True.
1074 1068
1075 1069 Returns
1076 1070 -------
1077 1071
1078 1072 if block=False:
1079 1073 AsyncMapResult
1080 1074 An object like AsyncResult, but which reassembles the sequence of results
1081 1075 into a single list. AsyncMapResults can be iterated through before all
1082 1076 results are complete.
1083 1077 else:
1084 1078 the result of map(f,*sequences)
1085 1079
1086 1080 """
1087 1081
1088 1082 # default
1089 1083 block = kwargs.get('block', self.block)
1090 1084 chunksize = kwargs.get('chunksize', 1)
1091 1085 ordered = kwargs.get('ordered', True)
1092 1086
1093 1087 keyset = set(kwargs.keys())
1094 1088 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1095 1089 if extra_keys:
1096 1090 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1097 1091
1098 1092 assert len(sequences) > 0, "must have some sequences to map onto!"
1099 1093
1100 1094 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1101 1095 return pf.map(*sequences)
1102 1096
1103 1097 __all__ = ['LoadBalancedView', 'DirectView']
General Comments 0
You need to be logged in to leave comments. Login now