##// END OF EJS Templates
Merge pull request #6042 from minrk/import-less-than-everything...
Thomas Kluyver -
r17055:c03e562b merge
parent child Browse files
Show More
@@ -21,7 +21,6 b' from zmq.eventloop import ioloop'
21
21
22 from IPython.config.configurable import LoggingConfigurable
22 from IPython.config.configurable import LoggingConfigurable
23 from IPython.utils.traitlets import Dict, Instance, CFloat
23 from IPython.utils.traitlets import Dict, Instance, CFloat
24 from IPython.parallel.apps.ipclusterapp import IPClusterStart
25 from IPython.core.profileapp import list_profiles_in
24 from IPython.core.profileapp import list_profiles_in
26 from IPython.core.profiledir import ProfileDir
25 from IPython.core.profiledir import ProfileDir
27 from IPython.utils import py3compat
26 from IPython.utils import py3compat
@@ -33,17 +32,6 b' from IPython.utils.path import get_ipython_dir'
33 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
34
33
35
34
36 class DummyIPClusterStart(IPClusterStart):
37 """Dummy subclass to skip init steps that conflict with global app.
38
39 Instantiating and initializing this class should result in fully configured
40 launchers, but no other side effects or state.
41 """
42
43 def init_signal(self):
44 pass
45 def reinit_logging(self):
46 pass
47
35
48
36
49 class ClusterManager(LoggingConfigurable):
37 class ClusterManager(LoggingConfigurable):
@@ -59,6 +47,20 b' class ClusterManager(LoggingConfigurable):'
59 return IOLoop.instance()
47 return IOLoop.instance()
60
48
61 def build_launchers(self, profile_dir):
49 def build_launchers(self, profile_dir):
50 from IPython.parallel.apps.ipclusterapp import IPClusterStart
51
52 class DummyIPClusterStart(IPClusterStart):
53 """Dummy subclass to skip init steps that conflict with global app.
54
55 Instantiating and initializing this class should result in fully configured
56 launchers, but no other side effects or state.
57 """
58
59 def init_signal(self):
60 pass
61 def reinit_logging(self):
62 pass
63
62 starter = DummyIPClusterStart(log=self.log)
64 starter = DummyIPClusterStart(log=self.log)
63 starter.initialize(['--profile-dir', profile_dir])
65 starter.initialize(['--profile-dir', profile_dir])
64 cl = starter.controller_launcher
66 cl = starter.controller_launcher
@@ -22,7 +22,6 b' from subprocess import Popen, PIPE'
22 import tempfile
22 import tempfile
23
23
24 import zmq
24 import zmq
25 from zmq.ssh import tunnel
26
25
27 # IPython imports
26 # IPython imports
28 from IPython.config import LoggingConfigurable
27 from IPython.config import LoggingConfigurable
@@ -342,6 +341,7 b' def tunnel_to_kernel(connection_info, sshserver, sshkey=None):'
342 (shell, iopub, stdin, hb) : ints
341 (shell, iopub, stdin, hb) : ints
343 The four ports on localhost that have been forwarded to the kernel.
342 The four ports on localhost that have been forwarded to the kernel.
344 """
343 """
344 from zmq.ssh import tunnel
345 if isinstance(connection_info, string_types):
345 if isinstance(connection_info, string_types):
346 # it's a path, unpack it
346 # it's a path, unpack it
347 with open(connection_info) as f:
347 with open(connection_info) as f:
@@ -18,7 +18,6 b' from pprint import pprint'
18 pjoin = os.path.join
18 pjoin = os.path.join
19
19
20 import zmq
20 import zmq
21 from zmq.ssh import tunnel
22
21
23 from IPython.config.configurable import MultipleInstanceError
22 from IPython.config.configurable import MultipleInstanceError
24 from IPython.core.application import BaseIPythonApplication
23 from IPython.core.application import BaseIPythonApplication
@@ -443,6 +442,7 b' class Client(HasTraits):'
443 # default to ssh via localhost
442 # default to ssh via localhost
444 sshserver = addr
443 sshserver = addr
445 if self._ssh and password is None:
444 if self._ssh and password is None:
445 from zmq.ssh import tunnel
446 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
446 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
447 password=False
447 password=False
448 else:
448 else:
@@ -467,6 +467,7 b' class Client(HasTraits):'
467 self._query_socket = self._context.socket(zmq.DEALER)
467 self._query_socket = self._context.socket(zmq.DEALER)
468
468
469 if self._ssh:
469 if self._ssh:
470 from zmq.ssh import tunnel
470 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
471 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
471 else:
472 else:
472 self._query_socket.connect(cfg['registration'])
473 self._query_socket.connect(cfg['registration'])
@@ -589,6 +590,7 b' class Client(HasTraits):'
589
590
590 def connect_socket(s, url):
591 def connect_socket(s, url):
591 if self._ssh:
592 if self._ssh:
593 from zmq.ssh import tunnel
592 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
594 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
593 else:
595 else:
594 return s.connect(url)
596 return s.connect(url)
@@ -4,56 +4,33 b''
4
4
5 Scattering consists of partitioning a sequence and sending the various
5 Scattering consists of partitioning a sequence and sending the various
6 pieces to individual nodes in a cluster.
6 pieces to individual nodes in a cluster.
7
8
9 Authors:
10
11 * Brian Granger
12 * MinRK
13
14 """
7 """
15
8
16 #-------------------------------------------------------------------------------
9 # Copyright (c) IPython Development Team.
17 # Copyright (C) 2008-2011 The IPython Development Team
10 # Distributed under the terms of the Modified BSD License.
18 #
19 # Distributed under the terms of the BSD License. The full license is in
20 # the file COPYING, distributed as part of this software.
21 #-------------------------------------------------------------------------------
22
23 #-------------------------------------------------------------------------------
24 # Imports
25 #-------------------------------------------------------------------------------
26
11
27 from __future__ import division
12 from __future__ import division
28
13
14 import sys
29 from itertools import islice
15 from itertools import islice
30
16
31 from IPython.utils.data import flatten as utils_flatten
17 from IPython.utils.data import flatten as utils_flatten
32
18
33 #-------------------------------------------------------------------------------
19
34 # Figure out which array packages are present and their array types
20 numpy = None
35 #-------------------------------------------------------------------------------
21
36
22 def is_array(obj):
37 arrayModules = []
23 """Is an object a numpy array?
38 try:
24
39 import Numeric
25 Avoids importing numpy until it is requested
40 except ImportError:
26 """
41 pass
27 global numpy
42 else:
28 if 'numpy' not in sys.modules:
43 arrayModules.append({'module':Numeric, 'type':Numeric.arraytype})
29 return False
44 try:
30
45 import numpy
31 if numpy is None:
46 except ImportError:
32 import numpy
47 pass
33 return isinstance(obj, numpy.ndarray)
48 else:
49 arrayModules.append({'module':numpy, 'type':numpy.ndarray})
50 try:
51 import numarray
52 except ImportError:
53 pass
54 else:
55 arrayModules.append({'module':numarray,
56 'type':numarray.numarraycore.NumArray})
57
34
58 class Map(object):
35 class Map(object):
59 """A class for partitioning a sequence using a map."""
36 """A class for partitioning a sequence using a map."""
@@ -90,14 +67,12 b' class Map(object):'
90
67
91 def joinPartitions(self, listOfPartitions):
68 def joinPartitions(self, listOfPartitions):
92 return self.concatenate(listOfPartitions)
69 return self.concatenate(listOfPartitions)
93
70
94 def concatenate(self, listOfPartitions):
71 def concatenate(self, listOfPartitions):
95 testObject = listOfPartitions[0]
72 testObject = listOfPartitions[0]
96 # First see if we have a known array type
73 # First see if we have a known array type
97 for m in arrayModules:
74 if is_array(testObject):
98 #print m
75 return numpy.concatenate(listOfPartitions)
99 if isinstance(testObject, m['type']):
100 return m['module'].concatenate(listOfPartitions)
101 # Next try for Python sequence types
76 # Next try for Python sequence types
102 if isinstance(testObject, (list, tuple)):
77 if isinstance(testObject, (list, tuple)):
103 return utils_flatten(listOfPartitions)
78 return utils_flatten(listOfPartitions)
@@ -117,19 +92,17 b' class RoundRobinMap(Map):'
117 def joinPartitions(self, listOfPartitions):
92 def joinPartitions(self, listOfPartitions):
118 testObject = listOfPartitions[0]
93 testObject = listOfPartitions[0]
119 # First see if we have a known array type
94 # First see if we have a known array type
120 for m in arrayModules:
95 if is_array(testObject):
121 #print m
96 return self.flatten_array(listOfPartitions)
122 if isinstance(testObject, m['type']):
123 return self.flatten_array(m['type'], listOfPartitions)
124 if isinstance(testObject, (list, tuple)):
97 if isinstance(testObject, (list, tuple)):
125 return self.flatten_list(listOfPartitions)
98 return self.flatten_list(listOfPartitions)
126 return listOfPartitions
99 return listOfPartitions
127
100
128 def flatten_array(self, klass, listOfPartitions):
101 def flatten_array(self, listOfPartitions):
129 test = listOfPartitions[0]
102 test = listOfPartitions[0]
130 shape = list(test.shape)
103 shape = list(test.shape)
131 shape[0] = sum([ p.shape[0] for p in listOfPartitions])
104 shape[0] = sum([ p.shape[0] for p in listOfPartitions])
132 A = klass(shape)
105 A = numpy.ndarray(shape)
133 N = shape[0]
106 N = shape[0]
134 q = len(listOfPartitions)
107 q = len(listOfPartitions)
135 for p,part in enumerate(listOfPartitions):
108 for p,part in enumerate(listOfPartitions):
@@ -141,23 +114,13 b' class RoundRobinMap(Map):'
141 for i in range(len(listOfPartitions[0])):
114 for i in range(len(listOfPartitions[0])):
142 flat.extend([ part[i] for part in listOfPartitions if len(part) > i ])
115 flat.extend([ part[i] for part in listOfPartitions if len(part) > i ])
143 return flat
116 return flat
144 #lengths = [len(x) for x in listOfPartitions]
145 #maxPartitionLength = len(listOfPartitions[0])
146 #numberOfPartitions = len(listOfPartitions)
147 #concat = self.concatenate(listOfPartitions)
148 #totalLength = len(concat)
149 #result = []
150 #for i in range(maxPartitionLength):
151 # result.append(concat[i:totalLength:maxPartitionLength])
152 # return self.concatenate(listOfPartitions)
153
117
154 def mappable(obj):
118 def mappable(obj):
155 """return whether an object is mappable or not."""
119 """return whether an object is mappable or not."""
156 if isinstance(obj, (tuple,list)):
120 if isinstance(obj, (tuple,list)):
157 return True
121 return True
158 for m in arrayModules:
122 if is_array(obj):
159 if isinstance(obj,m['type']):
123 return True
160 return True
161 return False
124 return False
162
125
163 dists = {'b':Map,'r':RoundRobinMap}
126 dists = {'b':Map,'r':RoundRobinMap}
@@ -14,11 +14,10 b' from getpass import getpass'
14
14
15 import zmq
15 import zmq
16 from zmq.eventloop import ioloop, zmqstream
16 from zmq.eventloop import ioloop, zmqstream
17 from zmq.ssh import tunnel
18
17
19 from IPython.utils.localinterfaces import localhost
18 from IPython.utils.localinterfaces import localhost
20 from IPython.utils.traitlets import (
19 from IPython.utils.traitlets import (
21 Instance, Dict, Integer, Type, Float, Integer, Unicode, CBytes, Bool
20 Instance, Dict, Integer, Type, Float, Unicode, CBytes, Bool
22 )
21 )
23 from IPython.utils.py3compat import cast_bytes
22 from IPython.utils.py3compat import cast_bytes
24
23
@@ -58,6 +57,11 b' class EngineFactory(RegistrationFactory):'
58 help="""The SSH private key file to use when tunneling connections to the Controller.""")
57 help="""The SSH private key file to use when tunneling connections to the Controller.""")
59 paramiko=Bool(sys.platform == 'win32', config=True,
58 paramiko=Bool(sys.platform == 'win32', config=True,
60 help="""Whether to use paramiko instead of openssh for tunnels.""")
59 help="""Whether to use paramiko instead of openssh for tunnels.""")
60
61 @property
62 def tunnel_mod(self):
63 from zmq.ssh import tunnel
64 return tunnel
61
65
62
66
63 # not configurable:
67 # not configurable:
@@ -97,7 +101,7 b' class EngineFactory(RegistrationFactory):'
97 self.sshserver = self.url.split('://')[1].split(':')[0]
101 self.sshserver = self.url.split('://')[1].split(':')[0]
98
102
99 if self.using_ssh:
103 if self.using_ssh:
100 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
104 if self.tunnel_mod.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
101 password=False
105 password=False
102 else:
106 else:
103 password = getpass("SSH Password for %s: "%self.sshserver)
107 password = getpass("SSH Password for %s: "%self.sshserver)
@@ -108,7 +112,7 b' class EngineFactory(RegistrationFactory):'
108 url = disambiguate_url(url, self.location)
112 url = disambiguate_url(url, self.location)
109 if self.using_ssh:
113 if self.using_ssh:
110 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
114 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
111 return tunnel.tunnel_connection(s, url, self.sshserver,
115 return self.tunnel_mod.tunnel_connection(s, url, self.sshserver,
112 keyfile=self.sshkey, paramiko=self.paramiko,
116 keyfile=self.sshkey, paramiko=self.paramiko,
113 password=password,
117 password=password,
114 )
118 )
@@ -120,7 +124,7 b' class EngineFactory(RegistrationFactory):'
120 url = disambiguate_url(url, self.location)
124 url = disambiguate_url(url, self.location)
121 if self.using_ssh:
125 if self.using_ssh:
122 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
126 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
123 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
127 url, tunnelobj = self.tunnel_mod.open_tunnel(url, self.sshserver,
124 keyfile=self.sshkey, paramiko=self.paramiko,
128 keyfile=self.sshkey, paramiko=self.paramiko,
125 password=password,
129 password=password,
126 )
130 )
General Comments 0
You need to be logged in to leave comments. Login now