##// END OF EJS Templates
A new version and API for map is now working. We also now have an...
Brian E Granger -
Show More
@@ -0,0 +1,42 b''
1 # encoding: utf-8
2
3 """A parallelized version of Python's builtin map."""
4
5 __docformat__ = "restructuredtext en"
6
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
13
14 #-------------------------------------------------------------------------------
15 # Imports
16 #-------------------------------------------------------------------------------
17
18 from types import FunctionType
19 from zope.interface import Interface, implements
20
21 class IMapper(Interface):
22
23 def __call__(func, *sequences):
24 """Do map in parallel."""
25
26 class Mapper(object):
27
28 implements(IMapper)
29
30 def __init__(self, multiengine, dist='b', targets='all', block=True):
31 self.multiengine = multiengine
32 self.dist = dist
33 self.targets = targets
34 self.block = block
35
36 def __call__(self, func, *sequences):
37 return self.map(func, *sequences)
38
39 def map(self, func, *sequences):
40 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
41 return self.multiengine._map(func, sequences, dist=self.dist,
42 targets=self.targets, block=self.block) No newline at end of file
@@ -115,7 +115,7 b' class RoundRobinMap(Map):'
115 # result.append(concat[i:totalLength:maxPartitionLength])
115 # result.append(concat[i:totalLength:maxPartitionLength])
116 return self.concatenate(listOfPartitions)
116 return self.concatenate(listOfPartitions)
117
117
118 styles = {'basic':Map}
118 dists = {'b':Map}
119
119
120
120
121
121
@@ -653,67 +653,45 b' components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMul'
653 class IMultiEngineCoordinator(Interface):
653 class IMultiEngineCoordinator(Interface):
654 """Methods that work on multiple engines explicitly."""
654 """Methods that work on multiple engines explicitly."""
655
655
656 def scatter(key, seq, style='basic', flatten=False, targets='all'):
656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 """Partition and distribute a sequence to targets.
657 """Partition and distribute a sequence to targets."""
658
658
659 :Parameters:
659 def gather(key, dist='b', targets='all'):
660 key : str
660 """Gather object key from targets."""
661 The variable name to call the scattered sequence.
662 seq : list, tuple, array
663 The sequence to scatter. The type should be preserved.
664 style : string
665 A specification of how the sequence is partitioned. Currently
666 only 'basic' is implemented.
667 flatten : boolean
668 Should single element sequences be converted to scalars.
669 """
670
671 def gather(key, style='basic', targets='all'):
672 """Gather object key from targets.
673
661
674 :Parameters:
662 def _map(func, seq, dist='b', targets='all'):
675 key : string
663 """A parallelized version of Python's builtin map."""
676 The name of a sequence on the targets to gather.
677 style : string
678 A specification of how the sequence is partitioned. Currently
679 only 'basic' is implemented.
680 """
681
664
682 def map(func, seq, style='basic', targets='all'):
665 def map(func, *sequences):
683 """A parallelized version of Python's builtin map.
666 """Do a basic map with default for dist and targets."""
684
667
685 This function implements the following pattern:
668 def mapper(dist='b', targets='all'):
686
669 """Create a mapper with dist and targets."""
687 1. The sequence seq is scattered to the given targets.
670
688 2. map(functionSource, seq) is called on each engine.
671 def parallel(dist='b', targets='all'):
689 3. The resulting sequences are gathered back to the local machine.
672 """A decorator that build a parallel function."""
690
691 :Parameters:
692 targets : int, list or 'all'
693 The engine ids the action will apply to. Call `get_ids` to see
694 a list of currently available engines.
695 func : str, function
696 An actual function object or a Python string that names a
697 callable defined on the engines.
698 seq : list, tuple or numpy array
699 The local sequence to be scattered.
700 style : str
701 Only 'basic' is supported for now.
702
703 :Returns: A list of len(seq) with functionSource called on each element
704 of seq.
705
706 Example
707 =======
708
709 >>> rc.mapAll('lambda x: x*x', range(10000))
710 [0,2,4,9,25,36,...]
711 """
712
673
713
674
714 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
675 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
715 """Methods that work on multiple engines explicitly."""
676 """Methods that work on multiple engines explicitly."""
716 pass
677
678 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
679 """Partition and distribute a sequence to targets."""
680
681 def gather(key, dist='b', targets='all', block=True):
682 """Gather object key from targets"""
683
684 def _map(func, sequences, dist='b', targets='all', block=True):
685 """Perform an actual map."""
686
687 def map(func, *sequences):
688 """Do a basic map with default for dist and targets."""
689
690 def mapper(dist='b', targets='all', block=True):
691 """Create a mapper with dist, targets and block."""
692
693 def parallel(dist='b', targets='all', block=True):
694 """A decorator that build a parallel function."""
717
695
718
696
719 #-------------------------------------------------------------------------------
697 #-------------------------------------------------------------------------------
@@ -722,46 +700,31 b' class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):'
722
700
723 class IMultiEngineExtras(Interface):
701 class IMultiEngineExtras(Interface):
724
702
725 def zip_pull(targets, *keys):
703 def zip_pull(targets, keys):
726 """Pull, but return results in a different format from `pull`.
704 """
705 Pull, but return results in a different format from `pull`.
727
706
728 This method basically returns zip(pull(targets, *keys)), with a few
707 This method basically returns zip(pull(targets, *keys)), with a few
729 edge cases handled differently. Users of chainsaw will find this format
708 edge cases handled differently. Users of chainsaw will find this format
730 familiar.
709 familiar.
731
732 :Parameters:
733 targets : int, list or 'all'
734 The engine ids the action will apply to. Call `get_ids` to see
735 a list of currently available engines.
736 keys: list or tuple of str
737 A list of variable names as string of the Python objects to be pulled
738 back to the client.
739
740 :Returns: A list of pulled Python objects for each target.
741 """
710 """
742
711
743 def run(targets, fname):
712 def run(targets, fname):
744 """Run a .py file on targets.
713 """Run a .py file on targets."""
745
746 :Parameters:
747 targets : int, list or 'all'
748 The engine ids the action will apply to. Call `get_ids` to see
749 a list of currently available engines.
750 fname : str
751 The filename of a .py file on the local system to be sent to and run
752 on the engines.
753 block : boolean
754 Should I block or not. If block=True, wait for the action to
755 complete and return the result. If block=False, return a
756 `PendingResult` object that can be used to later get the
757 result. If block is not specified, the block attribute
758 will be used instead.
759 """
760
714
761
715
762 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
716 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
763 pass
717 def zip_pull(targets, keys, block=True):
764
718 """
719 Pull, but return results in a different format from `pull`.
720
721 This method basically returns zip(pull(targets, *keys)), with a few
722 edge cases handled differently. Users of chainsaw will find this format
723 familiar.
724 """
725
726 def run(targets, fname, block=True):
727 """Run a .py file on targets."""
765
728
766 #-------------------------------------------------------------------------------
729 #-------------------------------------------------------------------------------
767 # The full MultiEngine interface
730 # The full MultiEngine interface
@@ -31,6 +31,7 b' from IPython.ColorANSI import TermColors'
31 from IPython.kernel.twistedutil import blockingCallFromThread
31 from IPython.kernel.twistedutil import blockingCallFromThread
32 from IPython.kernel import error
32 from IPython.kernel import error
33 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.mapper import Mapper
34 from IPython.kernel import map as Map
35 from IPython.kernel import map as Map
35 from IPython.kernel import multiengine as me
36 from IPython.kernel import multiengine as me
36 from IPython.kernel.multiengine import (IFullMultiEngine,
37 from IPython.kernel.multiengine import (IFullMultiEngine,
@@ -779,29 +780,40 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
779 # IMultiEngineCoordinator
780 # IMultiEngineCoordinator
780 #---------------------------------------------------------------------------
781 #---------------------------------------------------------------------------
781
782
782 def scatter(self, key, seq, style='basic', flatten=False, targets=None, block=None):
783 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
783 """
784 """
784 Partition a Python sequence and send the partitions to a set of engines.
785 Partition a Python sequence and send the partitions to a set of engines.
785 """
786 """
786 targets, block = self._findTargetsAndBlock(targets, block)
787 targets, block = self._findTargetsAndBlock(targets, block)
787 return self._blockFromThread(self.smultiengine.scatter, key, seq,
788 return self._blockFromThread(self.smultiengine.scatter, key, seq,
788 style, flatten, targets=targets, block=block)
789 dist, flatten, targets=targets, block=block)
789
790
790 def gather(self, key, style='basic', targets=None, block=None):
791 def gather(self, key, dist='b', targets=None, block=None):
791 """
792 """
792 Gather a partitioned sequence on a set of engines as a single local seq.
793 Gather a partitioned sequence on a set of engines as a single local seq.
793 """
794 """
794 targets, block = self._findTargetsAndBlock(targets, block)
795 targets, block = self._findTargetsAndBlock(targets, block)
795 return self._blockFromThread(self.smultiengine.gather, key, style,
796 return self._blockFromThread(self.smultiengine.gather, key, dist,
796 targets=targets, block=block)
797 targets=targets, block=block)
797
798
798 def map(self, func, seq, style='basic', targets=None, block=None):
799 def _map(self, func, seq, dist='b', targets=None, block=None):
799 """
800 """
800 A parallelized version of Python's builtin map
801 A parallelized version of Python's builtin map
801 """
802 """
802 targets, block = self._findTargetsAndBlock(targets, block)
803 targets, block = self._findTargetsAndBlock(targets, block)
803 return self._blockFromThread(self.smultiengine.map, func, seq,
804 return self._blockFromThread(self.smultiengine._map, func, seq,
804 style, targets=targets, block=block)
805 dist, targets=targets, block=block)
806
807 def map(self, func, *sequences):
808 return self.mapper()(func, *sequences)
809
810 def mapper(self, dist='b', targets='all', block=None):
811 return Mapper(self, dist, targets, block)
812
813 def parallel(self, dist='b', targets=None, block=None):
814 targets, block = self._findTargetsAndBlock(targets, block)
815 pf = ParallelFunction(self, dist=dist, targets=targets, block=block)
816 return pf
805
817
806 #---------------------------------------------------------------------------
818 #---------------------------------------------------------------------------
807 # IMultiEngineExtras
819 # IMultiEngineExtras
@@ -29,6 +29,8 b' from foolscap import Referenceable'
29 from IPython.kernel import error
29 from IPython.kernel import error
30 from IPython.kernel.util import printer
30 from IPython.kernel.util import printer
31 from IPython.kernel import map as Map
31 from IPython.kernel import map as Map
32 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.mapper import Mapper
32 from IPython.kernel.twistedutil import gatherBoth
34 from IPython.kernel.twistedutil import gatherBoth
33 from IPython.kernel.multiengine import (MultiEngine,
35 from IPython.kernel.multiengine import (MultiEngine,
34 IMultiEngine,
36 IMultiEngine,
@@ -475,7 +477,7 b' class FCFullSynchronousMultiEngineClient(object):'
475 d.addCallback(create_targets)
477 d.addCallback(create_targets)
476 return d
478 return d
477
479
478 def scatter(self, key, seq, style='basic', flatten=False, targets='all', block=True):
480 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
479
481
480 # Note: scatter and gather handle pending deferreds locally through self.pdm.
482 # Note: scatter and gather handle pending deferreds locally through self.pdm.
481 # This enables us to collect a bunch fo deferred ids and make a secondary
483 # This enables us to collect a bunch fo deferred ids and make a secondary
@@ -483,7 +485,7 b' class FCFullSynchronousMultiEngineClient(object):'
483 # difficult to get right though.
485 # difficult to get right though.
484 def do_scatter(engines):
486 def do_scatter(engines):
485 nEngines = len(engines)
487 nEngines = len(engines)
486 mapClass = Map.styles[style]
488 mapClass = Map.dists[dist]
487 mapObject = mapClass()
489 mapObject = mapClass()
488 d_list = []
490 d_list = []
489 # Loop through and push to each engine in non-blocking mode.
491 # Loop through and push to each engine in non-blocking mode.
@@ -541,7 +543,7 b' class FCFullSynchronousMultiEngineClient(object):'
541 d.addCallback(do_scatter)
543 d.addCallback(do_scatter)
542 return d
544 return d
543
545
544 def gather(self, key, style='basic', targets='all', block=True):
546 def gather(self, key, dist='b', targets='all', block=True):
545
547
546 # Note: scatter and gather handle pending deferreds locally through self.pdm.
548 # Note: scatter and gather handle pending deferreds locally through self.pdm.
547 # This enables us to collect a bunch fo deferred ids and make a secondary
549 # This enables us to collect a bunch fo deferred ids and make a secondary
@@ -549,7 +551,7 b' class FCFullSynchronousMultiEngineClient(object):'
549 # difficult to get right though.
551 # difficult to get right though.
550 def do_gather(engines):
552 def do_gather(engines):
551 nEngines = len(engines)
553 nEngines = len(engines)
552 mapClass = Map.styles[style]
554 mapClass = Map.dists[dist]
553 mapObject = mapClass()
555 mapObject = mapClass()
554 d_list = []
556 d_list = []
555 # Loop through and push to each engine in non-blocking mode.
557 # Loop through and push to each engine in non-blocking mode.
@@ -604,14 +606,16 b' class FCFullSynchronousMultiEngineClient(object):'
604 d.addCallback(do_gather)
606 d.addCallback(do_gather)
605 return d
607 return d
606
608
607 def map(self, func, seq, style='basic', targets='all', block=True):
609 def _map(self, func, sequences, dist='b', targets='all', block=True):
608 """
610 """
609 Call a callable on elements of a sequence.
611 Call a callable on elements of a sequence.
610
611 map(f, range(10)) -> [f(0), f(1), f(2), ...]
612 map(f, zip(range))
613 """
612 """
614 d_list = []
613 if not isinstance(sequences, (list, tuple)):
614 raise TypeError('sequences must be a list or tuple')
615 max_len = max(len(s) for s in sequences)
616 for s in sequences:
617 if len(s)!=max_len:
618 raise ValueError('all sequences must have equal length')
615 if isinstance(func, FunctionType):
619 if isinstance(func, FunctionType):
616 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
620 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
617 d.addCallback(lambda did: self.get_pending_deferred(did, True))
621 d.addCallback(lambda did: self.get_pending_deferred(did, True))
@@ -623,12 +627,22 b' class FCFullSynchronousMultiEngineClient(object):'
623 else:
627 else:
624 raise TypeError("func must be a function or str")
628 raise TypeError("func must be a function or str")
625
629
626 d.addCallback(lambda _: self.scatter('_ipython_map_seq', seq, style, targets=targets))
630 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
627 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
631 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
628 d.addCallback(lambda did: self.get_pending_deferred(did, True))
632 d.addCallback(lambda did: self.get_pending_deferred(did, True))
629 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', style, targets=targets, block=block))
633 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
630 return d
634 return d
631
635
636 def map(self, func, *sequences):
637 return self.mapper()(func, *sequences)
638
639 def mapper(self, dist='b', targets='all', block=True):
640 return Mapper(self, dist, targets, block)
641
642 def parallel(self, dist='b', targets='all', block=True):
643 pf = ParallelFunction(self, dist=dist, targets=targets, block=True)
644 return pf
645
632 #---------------------------------------------------------------------------
646 #---------------------------------------------------------------------------
633 # ISynchronousMultiEngineExtras related methods
647 # ISynchronousMultiEngineExtras related methods
634 #---------------------------------------------------------------------------
648 #---------------------------------------------------------------------------
@@ -16,17 +16,32 b' __docformat__ = "restructuredtext en"'
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 from types import FunctionType
18 from types import FunctionType
19 from zope.interface import Interface, implements
19
20
20 class ParallelFunction:
21
21 """A function that operates in parallel on sequences."""
22 class ParallelFunction(object):
22 def __init__(self, func, multiengine, targets, block):
23 """
23 """Create a `ParallelFunction`.
24 A decorator for building parallel functions.
25 """
26
27 def __init__(self, multiengine, dist='b', targets='all', block=True):
28 """
29 Create a `ParallelFunction decorator`.
24 """
30 """
25 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
26 self.func = func
27 self.multiengine = multiengine
31 self.multiengine = multiengine
32 self.dist = dist
28 self.targets = targets
33 self.targets = targets
29 self.block = block
34 self.block = block
30
35
31 def __call__(self, sequence):
36 def __call__(self, func):
32 return self.multiengine.map(self.func, sequence, targets=self.targets, block=self.block) No newline at end of file
37 """
38 Decorate the function to make it run in parallel.
39 """
40 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
41 self.func = func
42 def call_function(*sequences):
43 return self.multiengine._map(self.func, sequences, dist=self.dist,
44 targets=self.targets, block=self.block)
45 return call_function
46
47 No newline at end of file
@@ -52,6 +52,10 b' Bug fixes'
52 Backwards incompatible changes
52 Backwards incompatible changes
53 ------------------------------
53 ------------------------------
54
54
55 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
56 and `map`.
57 * Renamed the values that the rename `dist` keyword argument can have from
58 `'basic'` to `'b'`.
55 * IPython has a larger set of dependencies if you want all of its capabilities.
59 * IPython has a larger set of dependencies if you want all of its capabilities.
56 See the `setup.py` script for details.
60 See the `setup.py` script for details.
57 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
61 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
General Comments 0
You need to be logged in to leave comments. Login now