##// END OF EJS Templates
Semi-working refactored ipcluster....
Brian Granger -
Show More
@@ -1,329 +1,330 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """A simple configuration system.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 """
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2008-2009 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 #-----------------------------------------------------------------------------
18 18 # Imports
19 19 #-----------------------------------------------------------------------------
20 20
21 21 import __builtin__
22 22 import os
23 23 import sys
24 24
25 25 from IPython.external import argparse
26 26 from IPython.utils.genutils import filefind
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # Exceptions
30 30 #-----------------------------------------------------------------------------
31 31
32 32
33 33 class ConfigError(Exception):
34 34 pass
35 35
36 36
37 37 class ConfigLoaderError(ConfigError):
38 38 pass
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Config class for holding config information
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 class Config(dict):
47 47 """An attribute based dict that can do smart merges."""
48 48
49 49 def __init__(self, *args, **kwds):
50 50 dict.__init__(self, *args, **kwds)
51 51 # This sets self.__dict__ = self, but it has to be done this way
52 52 # because we are also overriding __setattr__.
53 53 dict.__setattr__(self, '__dict__', self)
54 54
55 55 def _merge(self, other):
56 56 to_update = {}
57 57 for k, v in other.items():
58 58 if not self.has_key(k):
59 59 to_update[k] = v
60 60 else: # I have this key
61 61 if isinstance(v, Config):
62 62 # Recursively merge common sub Configs
63 63 self[k]._merge(v)
64 64 else:
65 65 # Plain updates for non-Configs
66 66 to_update[k] = v
67 67
68 68 self.update(to_update)
69 69
70 70 def _is_section_key(self, key):
71 71 if key[0].upper()==key[0] and not key.startswith('_'):
72 72 return True
73 73 else:
74 74 return False
75 75
76 76 def has_key(self, key):
77 77 if self._is_section_key(key):
78 78 return True
79 79 else:
80 80 return dict.has_key(self, key)
81 81
82 82 def _has_section(self, key):
83 83 if self._is_section_key(key):
84 84 if dict.has_key(self, key):
85 85 return True
86 86 return False
87 87
88 88 def copy(self):
89 89 return type(self)(dict.copy(self))
90 90
91 91 def __copy__(self):
92 92 return self.copy()
93 93
94 94 def __deepcopy__(self, memo):
95 95 import copy
96 96 return type(self)(copy.deepcopy(self.items()))
97 97
98 98 def __getitem__(self, key):
99 99 # Because we use this for an exec namespace, we need to delegate
100 100 # the lookup of names in __builtin__ to itself. This means
101 101 # that you can't have section or attribute names that are
102 102 # builtins.
103 103 try:
104 104 return getattr(__builtin__, key)
105 105 except AttributeError:
106 106 pass
107 107 if self._is_section_key(key):
108 108 try:
109 109 return dict.__getitem__(self, key)
110 110 except KeyError:
111 111 c = Config()
112 112 dict.__setitem__(self, key, c)
113 113 return c
114 114 else:
115 115 return dict.__getitem__(self, key)
116 116
117 117 def __setitem__(self, key, value):
118 118 # Don't allow names in __builtin__ to be modified.
119 119 if hasattr(__builtin__, key):
120 120 raise ConfigError('Config variable names cannot have the same name '
121 121 'as a Python builtin: %s' % key)
122 122 if self._is_section_key(key):
123 123 if not isinstance(value, Config):
124 124 raise ValueError('values whose keys begin with an uppercase '
125 125 'char must be Config instances: %r, %r' % (key, value))
126 126 else:
127 127 dict.__setitem__(self, key, value)
128 128
129 129 def __getattr__(self, key):
130 130 try:
131 131 return self.__getitem__(key)
132 132 except KeyError, e:
133 133 raise AttributeError(e)
134 134
135 135 def __setattr__(self, key, value):
136 136 try:
137 137 self.__setitem__(key, value)
138 138 except KeyError, e:
139 139 raise AttributeError(e)
140 140
141 141 def __delattr__(self, key):
142 142 try:
143 143 dict.__delitem__(self, key)
144 144 except KeyError, e:
145 145 raise AttributeError(e)
146 146
147 147
148 148 #-----------------------------------------------------------------------------
149 149 # Config loading classes
150 150 #-----------------------------------------------------------------------------
151 151
152 152
153 153 class ConfigLoader(object):
154 154 """A object for loading configurations from just about anywhere.
155 155
156 156 The resulting configuration is packaged as a :class:`Struct`.
157 157
158 158 Notes
159 159 -----
160 160 A :class:`ConfigLoader` does one thing: load a config from a source
161 161 (file, command line arguments) and returns the data as a :class:`Struct`.
162 162 There are lots of things that :class:`ConfigLoader` does not do. It does
163 163 not implement complex logic for finding config files. It does not handle
164 164 default values or merge multiple configs. These things need to be
165 165 handled elsewhere.
166 166 """
167 167
168 168 def __init__(self):
169 169 """A base class for config loaders.
170 170
171 171 Examples
172 172 --------
173 173
174 174 >>> cl = ConfigLoader()
175 175 >>> config = cl.load_config()
176 176 >>> config
177 177 {}
178 178 """
179 179 self.clear()
180 180
181 181 def clear(self):
182 182 self.config = Config()
183 183
184 184 def load_config(self):
185 185 """Load a config from somewhere, return a Struct.
186 186
187 187 Usually, this will cause self.config to be set and then returned.
188 188 """
189 189 return self.config
190 190
191 191
192 192 class FileConfigLoader(ConfigLoader):
193 193 """A base class for file based configurations.
194 194
195 195 As we add more file based config loaders, the common logic should go
196 196 here.
197 197 """
198 198 pass
199 199
200 200
201 201 class PyFileConfigLoader(FileConfigLoader):
202 202 """A config loader for pure python files.
203 203
204 204 This calls execfile on a plain python file and looks for attributes
205 205 that are all caps. These attribute are added to the config Struct.
206 206 """
207 207
208 208 def __init__(self, filename, path=None):
209 209 """Build a config loader for a filename and path.
210 210
211 211 Parameters
212 212 ----------
213 213 filename : str
214 214 The file name of the config file.
215 215 path : str, list, tuple
216 216 The path to search for the config file on, or a sequence of
217 217 paths to try in order.
218 218 """
219 219 super(PyFileConfigLoader, self).__init__()
220 220 self.filename = filename
221 221 self.path = path
222 222 self.full_filename = ''
223 223 self.data = None
224 224
225 225 def load_config(self):
226 226 """Load the config from a file and return it as a Struct."""
227 227 self._find_file()
228 228 self._read_file_as_dict()
229 229 self._convert_to_config()
230 230 return self.config
231 231
232 232 def _find_file(self):
233 233 """Try to find the file by searching the paths."""
234 234 self.full_filename = filefind(self.filename, self.path)
235 235
236 236 def _read_file_as_dict(self):
237 237 """Load the config file into self.config, with recursive loading."""
238 238 # This closure is made available in the namespace that is used
239 239 # to exec the config file. This allows users to call
240 240 # load_subconfig('myconfig.py') to load config files recursively.
241 241 # It needs to be a closure because it has references to self.path
242 242 # and self.config. The sub-config is loaded with the same path
243 243 # as the parent, but it uses an empty config which is then merged
244 244 # with the parents.
245 245 def load_subconfig(fname):
246 246 loader = PyFileConfigLoader(fname, self.path)
247 247 sub_config = loader.load_config()
248 248 self.config._merge(sub_config)
249 249
250 250 # Again, this needs to be a closure and should be used in config
251 251 # files to get the config being loaded.
252 252 def get_config():
253 253 return self.config
254 254
255 255 namespace = dict(load_subconfig=load_subconfig, get_config=get_config)
256 256 execfile(self.full_filename, namespace)
257 257
258 258 def _convert_to_config(self):
259 259 if self.data is None:
260 260 ConfigLoaderError('self.data does not exist')
261 261
262 262
263 263 class CommandLineConfigLoader(ConfigLoader):
264 264 """A config loader for command line arguments.
265 265
266 266 As we add more command line based loaders, the common logic should go
267 267 here.
268 268 """
269 269
270 270
271 271 class NoConfigDefault(object): pass
272 272 NoConfigDefault = NoConfigDefault()
273 273
274
274 275 class ArgParseConfigLoader(CommandLineConfigLoader):
275 276
276 277 # arguments = [(('-f','--file'),dict(type=str,dest='file'))]
277 278 arguments = ()
278 279
279 280 def __init__(self, *args, **kw):
280 281 """Create a config loader for use with argparse.
281 282
282 283 The args and kwargs arguments here are passed onto the constructor
283 284 of :class:`argparse.ArgumentParser`.
284 285 """
285 286 super(CommandLineConfigLoader, self).__init__()
286 287 self.args = args
287 288 self.kw = kw
288 289
289 290 def load_config(self, args=None):
290 291 """Parse command line arguments and return as a Struct."""
291 292 self._create_parser()
292 293 self._parse_args(args)
293 294 self._convert_to_config()
294 295 return self.config
295 296
296 297 def get_extra_args(self):
297 298 if hasattr(self, 'extra_args'):
298 299 return self.extra_args
299 300 else:
300 301 return []
301 302
302 303 def _create_parser(self):
303 304 self.parser = argparse.ArgumentParser(*self.args, **self.kw)
304 305 self._add_arguments()
305 306 self._add_other_arguments()
306 307
307 308 def _add_other_arguments(self):
308 309 pass
309 310
310 311 def _add_arguments(self):
311 312 for argument in self.arguments:
312 313 if not argument[1].has_key('default'):
313 314 argument[1]['default'] = NoConfigDefault
314 315 self.parser.add_argument(*argument[0],**argument[1])
315 316
316 317 def _parse_args(self, args=None):
317 318 """self.parser->self.parsed_data"""
318 319 if args is None:
319 320 self.parsed_data, self.extra_args = self.parser.parse_known_args()
320 321 else:
321 322 self.parsed_data, self.extra_args = self.parser.parse_known_args(args)
322 323
323 324 def _convert_to_config(self):
324 325 """self.parsed_data->self.config"""
325 326 for k, v in vars(self.parsed_data).items():
326 327 if v is not NoConfigDefault:
327 328 exec_str = 'self.config.' + k + '= v'
328 329 exec exec_str in locals(), globals()
329 330
@@ -1,251 +1,263 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import shutil
20 20
21 21 from IPython.core import release
22 22 from IPython.config.loader import PyFileConfigLoader
23 23 from IPython.core.application import Application
24 24 from IPython.core.component import Component
25 25 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
26 26 from IPython.utils.traitlets import Unicode
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # Imports
30 30 #-----------------------------------------------------------------------------
31 31
32 32
33 33 class ClusterDir(Component):
34 34 """An object to manage the cluster directory and its resources.
35 35
36 36 The cluster directory is used by :command:`ipcontroller`,
37 37 :command:`ipcontroller` and :command:`ipcontroller` to manage the
38 38 configuration, logging and security of these applications.
39 39
40 40 This object knows how to find, create and manage these directories. This
41 41 should be used by any code that want's to handle cluster directories.
42 42 """
43 43
44 44 security_dir_name = Unicode('security')
45 45 log_dir_name = Unicode('log')
46 46 security_dir = Unicode()
47 47 log_dir = Unicode('')
48 48 location = Unicode('')
49 49
50 50 def __init__(self, location):
51 51 super(ClusterDir, self).__init__(None)
52 52 self.location = location
53 53
54 54 def _location_changed(self, name, old, new):
55 55 if not os.path.isdir(new):
56 56 os.makedirs(new, mode=0777)
57 57 else:
58 58 os.chmod(new, 0777)
59 59 self.security_dir = os.path.join(new, self.security_dir_name)
60 60 self.log_dir = os.path.join(new, self.log_dir_name)
61 self.check_dirs()
61 62
62 63 def _log_dir_changed(self, name, old, new):
63 if not os.path.isdir(new):
64 os.mkdir(new, 0777)
64 self.check_log_dir()
65
66 def check_log_dir(self):
67 if not os.path.isdir(self.log_dir):
68 os.mkdir(self.log_dir, 0777)
65 69 else:
66 os.chmod(new, 0777)
70 os.chmod(self.log_dir, 0777)
67 71
68 72 def _security_dir_changed(self, name, old, new):
69 if not os.path.isdir(new):
70 os.mkdir(new, 0700)
73 self.check_security_dir()
74
75 def check_security_dir(self):
76 if not os.path.isdir(self.security_dir):
77 os.mkdir(self.security_dir, 0700)
71 78 else:
72 os.chmod(new, 0700)
79 os.chmod(self.security_dir, 0700)
80
81 def check_dirs(self):
82 self.check_security_dir()
83 self.check_log_dir()
73 84
74 85 def load_config_file(self, filename):
75 86 """Load a config file from the top level of the cluster dir.
76 87
77 88 Parameters
78 89 ----------
79 90 filename : unicode or str
80 91 The filename only of the config file that must be located in
81 92 the top-level of the cluster directory.
82 93 """
83 94 loader = PyFileConfigLoader(filename, self.location)
84 95 return loader.load_config()
85 96
86 def copy_config_file(self, config_file, path=None):
97 def copy_config_file(self, config_file, path=None, overwrite=False):
87 98 """Copy a default config file into the active cluster directory.
88 99
89 100 Default configuration files are kept in :mod:`IPython.config.default`.
90 101 This function moves these from that location to the working cluster
91 102 directory.
92 103 """
93 104 if path is None:
94 105 import IPython.config.default
95 106 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
96 107 path = os.path.sep.join(path)
97 108 src = os.path.join(path, config_file)
98 109 dst = os.path.join(self.location, config_file)
110 if not os.path.isfile(dst) or overwrite:
99 111 shutil.copy(src, dst)
100 112
101 def copy_all_config_files(self):
113 def copy_all_config_files(self, path=None, overwrite=False):
102 114 """Copy all config files into the active cluster directory."""
103 for f in ['ipcontroller_config.py', 'ipengine_config.py']:
104 self.copy_config_file(f)
115 for f in ['ipcontroller_config.py', 'ipengine_config.py',
116 'ipcluster_config.py']:
117 self.copy_config_file(f, path=path, overwrite=overwrite)
105 118
106 119 @classmethod
107 120 def find_cluster_dir_by_profile(cls, path, profile='default'):
108 121 """Find/create a cluster dir by profile name and return its ClusterDir.
109 122
110 123 This will create the cluster directory if it doesn't exist.
111 124
112 125 Parameters
113 126 ----------
114 127 path : unicode or str
115 128 The directory path to look for the cluster directory in.
116 129 profile : unicode or str
117 130 The name of the profile. The name of the cluster directory
118 131 will be "cluster_<profile>".
119 132 """
120 133 dirname = 'cluster_' + profile
121 134 cluster_dir = os.path.join(os.getcwd(), dirname)
122 135 if os.path.isdir(cluster_dir):
123 136 return ClusterDir(cluster_dir)
124 137 else:
125 138 if not os.path.isdir(path):
126 139 raise IOError("Directory doesn't exist: %s" % path)
127 140 cluster_dir = os.path.join(path, dirname)
128 141 return ClusterDir(cluster_dir)
129 142
130 143 @classmethod
131 144 def find_cluster_dir(cls, cluster_dir):
132 145 """Find/create a cluster dir and return its ClusterDir.
133 146
134 147 This will create the cluster directory if it doesn't exist.
135 148
136 149 Parameters
137 150 ----------
138 151 cluster_dir : unicode or str
139 152 The path of the cluster directory. This is expanded using
140 153 :func:`os.path.expandvars` and :func:`os.path.expanduser`.
141 154 """
142 155 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
143 156 return ClusterDir(cluster_dir)
144 157
145 158
146 159 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
147 160 """Default command line options for IPython cluster applications."""
148 161
149 162 def _add_other_arguments(self):
150 163 self.parser.add_argument('-ipythondir', '--ipython-dir',
151 164 dest='Global.ipythondir',type=str,
152 165 help='Set to override default location of Global.ipythondir.',
153 166 default=NoConfigDefault,
154 167 metavar='Global.ipythondir')
155 168 self.parser.add_argument('-p','-profile', '--profile',
156 169 dest='Global.profile',type=str,
157 170 help='The string name of the profile to be used. This determines '
158 171 'the name of the cluster dir as: cluster_<profile>. The default profile '
159 172 'is named "default". The cluster directory is resolve this way '
160 173 'if the --cluster-dir option is not used.',
161 174 default=NoConfigDefault,
162 175 metavar='Global.profile')
163 176 self.parser.add_argument('-log_level', '--log-level',
164 177 dest="Global.log_level",type=int,
165 178 help='Set the log level (0,10,20,30,40,50). Default is 30.',
166 179 default=NoConfigDefault)
167 180 self.parser.add_argument('-cluster_dir', '--cluster-dir',
168 181 dest='Global.cluster_dir',type=str,
169 182 help='Set the cluster dir. This overrides the logic used by the '
170 183 '--profile option.',
171 184 default=NoConfigDefault,
172 185 metavar='Global.cluster_dir')
173 186
174 187
175 188 class ApplicationWithClusterDir(Application):
176 189 """An application that puts everything into a cluster directory.
177 190
178 191 Instead of looking for things in the ipythondir, this type of application
179 192 will use its own private directory called the "cluster directory"
180 193 for things like config files, log files, etc.
181 194
182 195 The cluster directory is resolved as follows:
183 196
184 197 * If the ``--cluster-dir`` option is given, it is used.
185 198 * If ``--cluster-dir`` is not given, the application directory is
186 199 resolve using the profile name as ``cluster_<profile>``. The search
187 200 path for this directory is then i) cwd if it is found there
188 201 and ii) in ipythondir otherwise.
189 202
190 203 The config file for the application is to be put in the cluster
191 204 dir and named the value of the ``config_file_name`` class attribute.
192 205 """
193 206
194 207 def create_default_config(self):
195 208 super(ApplicationWithClusterDir, self).create_default_config()
196 209 self.default_config.Global.profile = 'default'
197 210 self.default_config.Global.cluster_dir = ''
198 211
199 212 def create_command_line_config(self):
200 213 """Create and return a command line config loader."""
201 214 return AppWithClusterDirArgParseConfigLoader(
202 215 description=self.description,
203 216 version=release.version
204 217 )
205 218
206 219 def find_config_file_name(self):
207 220 """Find the config file name for this application."""
208 221 # For this type of Application it should be set as a class attribute.
209 222 if not hasattr(self, 'config_file_name'):
210 223 self.log.critical("No config filename found")
211 224
212 225 def find_config_file_paths(self):
213 226 """This resolves the cluster directory and sets ``config_file_paths``.
214 227
215 228 This does the following:
216 229 * Create the :class:`ClusterDir` object for the application.
217 230 * Set the ``cluster_dir`` attribute of the application and config
218 231 objects.
219 232 * Set ``config_file_paths`` to point to the cluster directory.
220 233 """
221 234
222 235 # Create the ClusterDir object for managing everything
223 236 try:
224 237 cluster_dir = self.command_line_config.Global.cluster_dir
225 238 except AttributeError:
226 239 cluster_dir = self.default_config.Global.cluster_dir
227 240 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
228 241 if cluster_dir:
229 242 # Just use cluster_dir
230 243 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
231 244 else:
232 245 # Then look for a profile
233 246 try:
234 247 self.profile = self.command_line_config.Global.profile
235 248 except AttributeError:
236 249 self.profile = self.default_config.Global.profile
237 250 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
238 251 self.ipythondir, self.profile)
239 252
240 253 # Set the cluster directory
241 254 self.cluster_dir = self.cluster_dir_obj.location
242 255
243 256 # These have to be set because they could be different from the one
244 257 # that we just computed. Because command line has the highest
245 258 # priority, this will always end up in the master_config.
246 259 self.default_config.Global.cluster_dir = self.cluster_dir
247 260 self.command_line_config.Global.cluster_dir = self.cluster_dir
248 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
249 261
250 262 # Set the search path to the cluster directory
251 263 self.config_file_paths = (self.cluster_dir,)
@@ -1,92 +1,131 b''
1 1 # encoding: utf-8
2 2
3 3 """A class that manages the engines connection to the controller."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 import os
19 19 import cPickle as pickle
20 20
21 21 from twisted.python import log, failure
22 22 from twisted.internet import defer
23 from twisted.internet.defer import inlineCallbacks, returnValue
23 24
24 25 from IPython.kernel.fcutil import find_furl
25 26 from IPython.kernel.enginefc import IFCEngine
27 from IPython.kernel.twistedutil import sleep_deferred
26 28
27 29 #-------------------------------------------------------------------------------
28 30 # The ClientConnector class
29 31 #-------------------------------------------------------------------------------
30 32
33
34 class EngineConnectorError(Exception):
35 pass
36
37
31 38 class EngineConnector(object):
32 39 """Manage an engines connection to a controller.
33 40
34 41 This class takes a foolscap `Tub` and provides a `connect_to_controller`
35 42 method that will use the `Tub` to connect to a controller and register
36 43 the engine with the controller.
37 44 """
38 45
39 46 def __init__(self, tub):
40 47 self.tub = tub
41 48
42 def connect_to_controller(self, engine_service, furl_or_file):
49 def connect_to_controller(self, engine_service, furl_or_file,
50 delay=0.1, max_tries=10):
43 51 """
44 52 Make a connection to a controller specified by a furl.
45 53
46 54 This method takes an `IEngineBase` instance and a foolcap URL and uses
47 55 the `tub` attribute to make a connection to the controller. The
48 56 foolscap URL contains all the information needed to connect to the
49 57 controller, including the ip and port as well as any encryption and
50 58 authentication information needed for the connection.
51 59
52 60 After getting a reference to the controller, this method calls the
53 61 `register_engine` method of the controller to actually register the
54 62 engine.
55 63
56 :Parameters:
64 This method will try to connect to the controller multiple times with
65 a delay in between. Each time the FURL file is read anew.
66
67 Parameters
68 __________
57 69 engine_service : IEngineBase
58 70 An instance of an `IEngineBase` implementer
59 71 furl_or_file : str
60 72 A furl or a filename containing a furl
73 delay : float
74 The intial time to wait between connection attempts. Subsequent
75 attempts have increasing delays.
76 max_tries : int
77 The maximum number of connection attempts.
61 78 """
62 79 if not self.tub.running:
63 80 self.tub.startService()
64 81 self.engine_service = engine_service
65 82 self.engine_reference = IFCEngine(self.engine_service)
83
84 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
85 return d
86
87 @inlineCallbacks
88 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
89 """Try to connect to the controller with retry logic."""
90 if attempt < max_tries:
91 log.msg("Attempting to connect to controller [%r]: %s" % \
92 (attempt, furl_or_file))
66 93 try:
67 94 self.furl = find_furl(furl_or_file)
68 except ValueError:
69 return defer.fail(failure.Failure())
95 # Uncomment this to see the FURL being tried.
96 # log.msg("FURL: %s" % self.furl)
97 rr = yield self.tub.getReference(self.furl)
98 except:
99 if attempt==max_tries-1:
100 # This will propagate the exception all the way to the top
101 # where it can be handled.
102 raise
70 103 else:
71 d = self.tub.getReference(self.furl)
72 d.addCallbacks(self._register, self._log_failure)
73 return d
74
75 def _log_failure(self, reason):
76 log.err('EngineConnector: engine registration failed:')
77 log.err(reason)
78 return reason
104 yield sleep_deferred(delay)
105 yield self._try_to_connect(
106 furl_or_file, 1.5*delay, max_tries, attempt+1
107 )
108 else:
109 result = yield self._register(rr)
110 returnValue(result)
111 else:
112 raise EngineConnectorError(
113 'Could not connect to controller, max_tries (%r) exceeded. '
114 'This usually means that i) the controller was not started, '
115 'or ii) a firewall was blocking the engine from connecting '
116 'to the controller.' % max_tries
117 )
79 118
80 119 def _register(self, rr):
81 120 self.remote_ref = rr
82 121 # Now register myself with the controller
83 122 desired_id = self.engine_service.id
84 123 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
85 124 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
86 return d.addCallbacks(self._reference_sent, self._log_failure)
125 return d.addCallback(self._reference_sent)
87 126
88 127 def _reference_sent(self, registration_dict):
89 128 self.engine_service.id = registration_dict['id']
90 129 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
91 130 return self.engine_service.id
92 131
@@ -1,236 +1,239 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Foolscap related utilities.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 from __future__ import with_statement
19
18 20 import os
19 21 import tempfile
20 22
21 23 from twisted.internet import reactor, defer
22 24 from twisted.python import log
23 25
24 26 from foolscap import Tub, UnauthenticatedTub
25 27
26 28 from IPython.config.loader import Config
27 29
28 30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
29 31
30 32 from IPython.kernel.error import SecurityError
31 33
32 34 from IPython.utils.traitlets import Int, Str, Bool, Instance
33 35 from IPython.utils.importstring import import_item
34 36
35 37 #-----------------------------------------------------------------------------
36 38 # Code
37 39 #-----------------------------------------------------------------------------
38 40
39 41
40 42 # We do this so if a user doesn't have OpenSSL installed, it will try to use
41 43 # an UnauthenticatedTub. But, they will still run into problems if they
42 44 # try to use encrypted furls.
43 45 try:
44 46 import OpenSSL
45 47 except:
46 48 Tub = UnauthenticatedTub
47 49 have_crypto = False
48 50 else:
49 51 have_crypto = True
50 52
51 53
52 54 def check_furl_file_security(furl_file, secure):
53 55 """Remove the old furl_file if changing security modes."""
54 56 if os.path.isfile(furl_file):
55 57 f = open(furl_file, 'r')
56 58 oldfurl = f.read().strip()
57 59 f.close()
58 60 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
59 61 os.remove(furl_file)
60 62
61 63
62 64 def is_secure(furl):
63 65 """Is the given FURL secure or not."""
64 66 if is_valid(furl):
65 67 if furl.startswith("pb://"):
66 68 return True
67 69 elif furl.startswith("pbu://"):
68 70 return False
69 71 else:
70 72 raise ValueError("invalid FURL: %s" % furl)
71 73
72 74
73 75 def is_valid(furl):
74 76 """Is the str a valid FURL or not."""
75 77 if isinstance(furl, str):
76 78 if furl.startswith("pb://") or furl.startswith("pbu://"):
77 79 return True
78 80 else:
79 81 return False
80 82
81 83
82 84 def find_furl(furl_or_file):
83 85 """Find, validate and return a FURL in a string or file."""
84 86 if isinstance(furl_or_file, str):
85 87 if is_valid(furl_or_file):
86 88 return furl_or_file
87 89 if os.path.isfile(furl_or_file):
88 furl = open(furl_or_file, 'r').read().strip()
90 with open(furl_or_file, 'r') as f:
91 furl = f.read().strip()
89 92 if is_valid(furl):
90 93 return furl
91 raise ValueError("not a FURL or a file containing a FURL: %s" % furl_or_file)
94 raise ValueError("Not a FURL or a file containing a FURL: %s" % furl_or_file)
92 95
93 96
94 97 def get_temp_furlfile(filename):
95 98 """Return a temporary FURL file."""
96 99 return tempfile.mktemp(dir=os.path.dirname(filename),
97 100 prefix=os.path.basename(filename))
98 101
99 102
100 103 def make_tub(ip, port, secure, cert_file):
101 104 """Create a listening tub given an ip, port, and cert_file location.
102 105
103 106 Parameters
104 107 ----------
105 108 ip : str
106 109 The ip address or hostname that the tub should listen on.
107 110 Empty means all interfaces.
108 111 port : int
109 112 The port that the tub should listen on. A value of 0 means
110 113 pick a random port
111 114 secure: bool
112 115 Will the connection be secure (in the Foolscap sense).
113 116 cert_file: str
114 117 A filename of a file to be used for theSSL certificate.
115 118
116 119 Returns
117 120 -------
118 121 A tub, listener tuple.
119 122 """
120 123 if secure:
121 124 if have_crypto:
122 125 tub = Tub(certFile=cert_file)
123 126 else:
124 127 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
125 128 "can't run in secure mode. Try running without "
126 129 "security using 'ipcontroller -xy'.")
127 130 else:
128 131 tub = UnauthenticatedTub()
129 132
130 133 # Set the strport based on the ip and port and start listening
131 134 if ip == '':
132 135 strport = "tcp:%i" % port
133 136 else:
134 137 strport = "tcp:%i:interface=%s" % (port, ip)
135 138 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
136 139 listener = tub.listenOn(strport)
137 140
138 141 return tub, listener
139 142
140 143
141 144 class FCServiceFactory(AdaptedConfiguredObjectFactory):
142 145 """This class creates a tub with various services running in it.
143 146
144 147 The basic idea is that :meth:`create` returns a running :class:`Tub`
145 148 instance that has a number of Foolscap references registered in it.
146 149 This class is a subclass of :class:`IPython.core.component.Component`
147 150 so the IPython configuration and component system are used.
148 151
149 152 Attributes
150 153 ----------
151 154 interfaces : Config
152 155 A Config instance whose values are sub-Config objects having two
153 156 keys: furl_file and interface_chain.
154 157
155 158 The other attributes are the standard ones for Foolscap.
156 159 """
157 160
158 161 ip = Str('', config=True)
159 162 port = Int(0, config=True)
160 163 secure = Bool(True, config=True)
161 164 cert_file = Str('', config=True)
162 165 location = Str('', config=True)
163 166 reuse_furls = Bool(False, config=True)
164 167 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
165 168
166 169 def __init__(self, config, adaptee):
167 170 super(FCServiceFactory, self).__init__(config, adaptee)
168 171 self._check_reuse_furls()
169 172
170 173 def _ip_changed(self, name, old, new):
171 174 if new == 'localhost' or new == '127.0.0.1':
172 175 self.location = '127.0.0.1'
173 176
174 177 def _check_reuse_furls(self):
175 178 furl_files = [i.furl_file for i in self.interfaces.values()]
176 179 for ff in furl_files:
177 180 fullfile = self._get_security_file(ff)
178 181 if self.reuse_furls:
179 182 log.msg("Reusing FURL file: %s" % fullfile)
180 183 else:
181 184 if os.path.isfile(fullfile):
182 185 log.msg("Removing old FURL file: %s" % fullfile)
183 186 os.remove(fullfile)
184 187
185 188 def _get_security_file(self, filename):
186 189 return os.path.join(self.config.Global.security_dir, filename)
187 190
188 191 def create(self):
189 192 """Create and return the Foolscap tub with everything running."""
190 193
191 194 self.tub, self.listener = make_tub(
192 195 self.ip, self.port, self.secure,
193 196 self._get_security_file(self.cert_file)
194 197 )
195 198 # log.msg("Interfaces to register [%r]: %r" % \
196 199 # (self.__class__, self.interfaces))
197 200 if not self.secure:
198 201 log.msg("WARNING: running with no security: %s" % \
199 202 self.__class__.__name__)
200 203 reactor.callWhenRunning(self.set_location_and_register)
201 204 return self.tub
202 205
203 206 def set_location_and_register(self):
204 207 """Set the location for the tub and return a deferred."""
205 208
206 209 if self.location == '':
207 210 d = self.tub.setLocationAutomatically()
208 211 else:
209 212 d = defer.maybeDeferred(self.tub.setLocation,
210 213 "%s:%i" % (self.location, self.listener.getPortnum()))
211 214 self.adapt_to_interfaces(d)
212 215
213 216 def adapt_to_interfaces(self, d):
214 217 """Run through the interfaces, adapt and register."""
215 218
216 219 for ifname, ifconfig in self.interfaces.iteritems():
217 220 ff = self._get_security_file(ifconfig.furl_file)
218 221 log.msg("Adapting [%s] to interface: %s" % \
219 222 (self.adaptee.__class__.__name__, ifname))
220 223 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
221 224 check_furl_file_security(ff, self.secure)
222 225 adaptee = self.adaptee
223 226 for i in ifconfig.interface_chain:
224 227 adaptee = import_item(i)(adaptee)
225 228 d.addCallback(self.register, adaptee, furl_file=ff)
226 229
227 230 def register(self, empty, ref, furl_file):
228 231 """Register the reference with the FURL file.
229 232
230 233 The FURL file is created and then moved to make sure that when the
231 234 file appears, the buffer has been flushed and the file closed.
232 235 """
233 236 temp_furl_file = get_temp_furlfile(furl_file)
234 237 self.tub.registerReference(ref, furlFile=temp_furl_file)
235 238 os.rename(temp_furl_file, furl_file)
236 239
@@ -1,268 +1,274 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import os
20 20 import sys
21 21
22 22 from twisted.application import service
23 23 from twisted.internet import reactor
24 24 from twisted.python import log
25 25
26 26 from IPython.config.loader import Config, NoConfigDefault
27 27
28 28 from IPython.kernel.clusterdir import (
29 29 ApplicationWithClusterDir,
30 30 AppWithClusterDirArgParseConfigLoader
31 31 )
32 32
33 33 from IPython.core import release
34 34
35 35 from IPython.utils.traitlets import Str, Instance
36 36
37 37 from IPython.kernel import controllerservice
38 38
39 39 from IPython.kernel.fcutil import FCServiceFactory
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Default interfaces
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 # The default client interfaces for FCClientServiceFactory.interfaces
47 47 default_client_interfaces = Config()
48 48 default_client_interfaces.Task.interface_chain = [
49 49 'IPython.kernel.task.ITaskController',
50 50 'IPython.kernel.taskfc.IFCTaskController'
51 51 ]
52 52
53 53 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
54 54
55 55 default_client_interfaces.MultiEngine.interface_chain = [
56 56 'IPython.kernel.multiengine.IMultiEngine',
57 57 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
58 58 ]
59 59
60 60 default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
61 61
62 62 # Make this a dict we can pass to Config.__init__ for the default
63 63 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
64 64
65 65
66 66
67 67 # The default engine interfaces for FCEngineServiceFactory.interfaces
68 68 default_engine_interfaces = Config()
69 69 default_engine_interfaces.Default.interface_chain = [
70 70 'IPython.kernel.enginefc.IFCControllerBase'
71 71 ]
72 72
73 73 default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
74 74
75 75 # Make this a dict we can pass to Config.__init__ for the default
76 76 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
77 77
78 78
79 79 #-----------------------------------------------------------------------------
80 80 # Service factories
81 81 #-----------------------------------------------------------------------------
82 82
83 83
84 84 class FCClientServiceFactory(FCServiceFactory):
85 85 """A Foolscap implementation of the client services."""
86 86
87 87 cert_file = Str('ipcontroller-client.pem', config=True)
88 88 interfaces = Instance(klass=Config, kw=default_client_interfaces,
89 89 allow_none=False, config=True)
90 90
91 91
92 92 class FCEngineServiceFactory(FCServiceFactory):
93 93 """A Foolscap implementation of the engine services."""
94 94
95 95 cert_file = Str('ipcontroller-engine.pem', config=True)
96 96 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
97 97 allow_none=False, config=True)
98 98
99 99
100 100 #-----------------------------------------------------------------------------
101 101 # The main application
102 102 #-----------------------------------------------------------------------------
103 103
104 104
105 105 cl_args = (
106 106 # Client config
107 107 (('--client-ip',), dict(
108 108 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
109 109 help='The IP address or hostname the controller will listen on for '
110 110 'client connections.',
111 111 metavar='FCClientServiceFactory.ip')
112 112 ),
113 113 (('--client-port',), dict(
114 114 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
115 115 help='The port the controller will listen on for client connections. '
116 116 'The default is to use 0, which will autoselect an open port.',
117 117 metavar='FCClientServiceFactory.port')
118 118 ),
119 119 (('--client-location',), dict(
120 120 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
121 121 help='The hostname or IP that clients should connect to. This does '
122 122 'not control which interface the controller listens on. Instead, this '
123 123 'determines the hostname/IP that is listed in the FURL, which is how '
124 124 'clients know where to connect. Useful if the controller is listening '
125 125 'on multiple interfaces.',
126 126 metavar='FCClientServiceFactory.location')
127 127 ),
128 128 # Engine config
129 129 (('--engine-ip',), dict(
130 130 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
131 131 help='The IP address or hostname the controller will listen on for '
132 132 'engine connections.',
133 133 metavar='FCEngineServiceFactory.ip')
134 134 ),
135 135 (('--engine-port',), dict(
136 136 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
137 137 help='The port the controller will listen on for engine connections. '
138 138 'The default is to use 0, which will autoselect an open port.',
139 139 metavar='FCEngineServiceFactory.port')
140 140 ),
141 141 (('--engine-location',), dict(
142 142 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
143 143 help='The hostname or IP that engines should connect to. This does '
144 144 'not control which interface the controller listens on. Instead, this '
145 145 'determines the hostname/IP that is listed in the FURL, which is how '
146 146 'engines know where to connect. Useful if the controller is listening '
147 147 'on multiple interfaces.',
148 148 metavar='FCEngineServiceFactory.location')
149 149 ),
150 150 # Global config
151 151 (('--log-to-file',), dict(
152 152 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
153 153 help='Log to a file in the log directory (default is stdout)')
154 154 ),
155 155 (('-r','--reuse-furls'), dict(
156 156 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
157 157 help='Try to reuse all FURL files. If this is not set all FURL files '
158 158 'are deleted before the controller starts. This must be set if '
159 159 'specific ports are specified by --engine-port or --client-port.')
160 160 ),
161 161 (('-ns','--no-security'), dict(
162 162 action='store_false', dest='Global.secure', default=NoConfigDefault,
163 163 help='Turn off SSL encryption for all connections.')
164 164 )
165 165 )
166 166
167 167
168 168 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
169 169
170 170 arguments = cl_args
171 171
172 172
173 173 default_config_file_name = 'ipcontroller_config.py'
174 174
175 175
176 176 class IPControllerApp(ApplicationWithClusterDir):
177 177
178 178 name = 'ipcontroller'
179 179 description = 'Start the IPython controller for parallel computing.'
180 180 config_file_name = default_config_file_name
181 181
182 182 def create_default_config(self):
183 183 super(IPControllerApp, self).create_default_config()
184 184 self.default_config.Global.reuse_furls = False
185 185 self.default_config.Global.secure = True
186 186 self.default_config.Global.import_statements = []
187 187 self.default_config.Global.log_to_file = False
188 188
189 189 def create_command_line_config(self):
190 190 """Create and return a command line config loader."""
191 191 return IPControllerAppCLConfigLoader(
192 192 description=self.description,
193 193 version=release.version
194 194 )
195 195
196 196 def post_load_command_line_config(self):
197 197 # Now setup reuse_furls
198 198 c = self.command_line_config
199 199 if hasattr(c.Global, 'reuse_furls'):
200 200 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
201 201 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
202 202 del c.Global.reuse_furls
203 203 if hasattr(c.Global, 'secure'):
204 204 c.FCClientServiceFactory.secure = c.Global.secure
205 205 c.FCEngineServiceFactory.secure = c.Global.secure
206 206 del c.Global.secure
207 207
208 208 def pre_construct(self):
209 209 # The log and security dirs were set earlier, but here we put them
210 210 # into the config and log them.
211 211 config = self.master_config
212 212 sdir = self.cluster_dir_obj.security_dir
213 213 self.security_dir = config.Global.security_dir = sdir
214 214 ldir = self.cluster_dir_obj.log_dir
215 215 self.log_dir = config.Global.log_dir = ldir
216 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
216 217 self.log.info("Log directory set to: %s" % self.log_dir)
217 218 self.log.info("Security directory set to: %s" % self.security_dir)
218 219
219 220 def construct(self):
220 221 # I am a little hesitant to put these into InteractiveShell itself.
221 222 # But that might be the place for them
222 223 sys.path.insert(0, '')
223 224
224 225 self.start_logging()
225 226 self.import_statements()
226 227
227 228 # Create the service hierarchy
228 229 self.main_service = service.MultiService()
229 230 # The controller service
230 231 controller_service = controllerservice.ControllerService()
231 232 controller_service.setServiceParent(self.main_service)
232 233 # The client tub and all its refereceables
233 234 csfactory = FCClientServiceFactory(self.master_config, controller_service)
234 235 client_service = csfactory.create()
235 236 client_service.setServiceParent(self.main_service)
236 237 # The engine tub
237 238 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
238 239 engine_service = esfactory.create()
239 240 engine_service.setServiceParent(self.main_service)
240 241
241 242 def start_logging(self):
242 243 if self.master_config.Global.log_to_file:
243 244 log_filename = self.name + '-' + str(os.getpid()) + '.log'
244 245 logfile = os.path.join(self.log_dir, log_filename)
245 246 open_log_file = open(logfile, 'w')
246 247 else:
247 248 open_log_file = sys.stdout
248 249 log.startLogging(open_log_file)
249 250
250 251 def import_statements(self):
251 252 statements = self.master_config.Global.import_statements
252 253 for s in statements:
253 254 try:
254 255 log.msg("Executing statement: '%s'" % s)
255 256 exec s in globals(), locals()
256 257 except:
257 258 log.msg("Error running statement: %s" % s)
258 259
259 260 def start_app(self):
260 261 # Start the controller service and set things running
261 262 self.main_service.startService()
262 263 reactor.run()
263 264
264 265
265 266 def launch_new_instance():
266 267 """Create and run the IPython controller"""
267 268 app = IPControllerApp()
268 269 app.start()
270
271
272 if __name__ == '__main__':
273 launch_new_instance()
274
@@ -1,245 +1,248 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 from twisted.application import service
22 22 from twisted.internet import reactor
23 23 from twisted.python import log
24 24
25 25 from IPython.config.loader import NoConfigDefault
26 26
27 27 from IPython.kernel.clusterdir import (
28 28 ApplicationWithClusterDir,
29 29 AppWithClusterDirArgParseConfigLoader
30 30 )
31 31 from IPython.core import release
32 32
33 33 from IPython.utils.importstring import import_item
34 34
35 35 from IPython.kernel.engineservice import EngineService
36 36 from IPython.kernel.fcutil import Tub
37 37 from IPython.kernel.engineconnector import EngineConnector
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # The main application
41 41 #-----------------------------------------------------------------------------
42 42
43 43
44 44 cl_args = (
45 45 # Controller config
46 46 (('--furl-file',), dict(
47 47 type=str, dest='Global.furl_file', default=NoConfigDefault,
48 48 help='The full location of the file containing the FURL of the '
49 49 'controller. If this is not given, the FURL file must be in the '
50 50 'security directory of the cluster directory. This location is '
51 51 'resolved using the --profile and --app-dir options.',
52 52 metavar='Global.furl_file')
53 53 ),
54 54 # MPI
55 55 (('--mpi',), dict(
56 56 type=str, dest='MPI.use', default=NoConfigDefault,
57 57 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
58 58 metavar='MPI.use')
59 59 ),
60 60 # Global config
61 61 (('--log-to-file',), dict(
62 62 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
63 63 help='Log to a file in the log directory (default is stdout)')
64 64 )
65 65 )
66 66
67 67
68 68 class IPEngineAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
69 69
70 70 arguments = cl_args
71 71
72 72
73 73 mpi4py_init = """from mpi4py import MPI as mpi
74 74 mpi.size = mpi.COMM_WORLD.Get_size()
75 75 mpi.rank = mpi.COMM_WORLD.Get_rank()
76 76 """
77 77
78 78 pytrilinos_init = """from PyTrilinos import Epetra
79 79 class SimpleStruct:
80 80 pass
81 81 mpi = SimpleStruct()
82 82 mpi.rank = 0
83 83 mpi.size = 0
84 84 """
85 85
86 86
87 87 default_config_file_name = 'ipengine_config.py'
88 88
89 89
90 90 class IPEngineApp(ApplicationWithClusterDir):
91 91
92 92 name = 'ipengine'
93 93 description = 'Start the IPython engine for parallel computing.'
94 94 config_file_name = default_config_file_name
95 95
96 96 def create_default_config(self):
97 97 super(IPEngineApp, self).create_default_config()
98 98
99 99 # Global config attributes
100 100 self.default_config.Global.log_to_file = False
101 101 self.default_config.Global.exec_lines = []
102 102 # The log and security dir names must match that of the controller
103 103 self.default_config.Global.log_dir_name = 'log'
104 104 self.default_config.Global.security_dir_name = 'security'
105 105 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
106 106
107 107 # Configuration related to the controller
108 108 # This must match the filename (path not included) that the controller
109 109 # used for the FURL file.
110 110 self.default_config.Global.furl_file_name = 'ipcontroller-engine.furl'
111 111 # If given, this is the actual location of the controller's FURL file.
112 112 # If not, this is computed using the profile, app_dir and furl_file_name
113 113 self.default_config.Global.furl_file = ''
114 114
115 115 # MPI related config attributes
116 116 self.default_config.MPI.use = ''
117 117 self.default_config.MPI.mpi4py = mpi4py_init
118 118 self.default_config.MPI.pytrilinos = pytrilinos_init
119 119
120 120 def create_command_line_config(self):
121 121 """Create and return a command line config loader."""
122 122 return IPEngineAppCLConfigLoader(
123 123 description=self.description,
124 124 version=release.version
125 125 )
126 126
127 127 def post_load_command_line_config(self):
128 128 pass
129 129
130 130 def pre_construct(self):
131 131 config = self.master_config
132 132 sdir = self.cluster_dir_obj.security_dir
133 133 self.security_dir = config.Global.security_dir = sdir
134 134 ldir = self.cluster_dir_obj.log_dir
135 135 self.log_dir = config.Global.log_dir = ldir
136 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
136 137 self.log.info("Log directory set to: %s" % self.log_dir)
137 138 self.log.info("Security directory set to: %s" % self.security_dir)
138 139
139 140 self.find_cont_furl_file()
140 141
141 142 def find_cont_furl_file(self):
143 """Set the furl file.
144
145 Here we don't try to actually see if it exists for is valid as that
146 is hadled by the connection logic.
147 """
142 148 config = self.master_config
143 149 # Find the actual controller FURL file
144 if os.path.isfile(config.Global.furl_file):
145 return
146 else:
147 # We should know what the app dir is
150 if not config.Global.furl_file:
148 151 try_this = os.path.join(
149 152 config.Global.cluster_dir,
150 153 config.Global.security_dir,
151 154 config.Global.furl_file_name
152 155 )
153 if os.path.isfile(try_this):
154 156 config.Global.furl_file = try_this
155 return
156 else:
157 self.log.critical("Could not find a valid controller FURL file.")
158 self.abort()
159 157
160 158 def construct(self):
161 159 # I am a little hesitant to put these into InteractiveShell itself.
162 160 # But that might be the place for them
163 161 sys.path.insert(0, '')
164 162
165 163 self.start_mpi()
166 164 self.start_logging()
167 165
168 166 # Create the underlying shell class and EngineService
169 167 shell_class = import_item(self.master_config.Global.shell_class)
170 168 self.engine_service = EngineService(shell_class, mpi=mpi)
171 169
172 170 self.exec_lines()
173 171
174 172 # Create the service hierarchy
175 173 self.main_service = service.MultiService()
176 174 self.engine_service.setServiceParent(self.main_service)
177 175 self.tub_service = Tub()
178 176 self.tub_service.setServiceParent(self.main_service)
179 177 # This needs to be called before the connection is initiated
180 178 self.main_service.startService()
181 179
182 180 # This initiates the connection to the controller and calls
183 181 # register_engine to tell the controller we are ready to do work
184 182 self.engine_connector = EngineConnector(self.tub_service)
185 183
186 184 log.msg("Using furl file: %s" % self.master_config.Global.furl_file)
187 185
188 186 reactor.callWhenRunning(self.call_connect)
189 187
190 188 def call_connect(self):
191 189 d = self.engine_connector.connect_to_controller(
192 190 self.engine_service,
193 191 self.master_config.Global.furl_file
194 192 )
195 193
196 194 def handle_error(f):
197 # If this print statement is replaced by a log.err(f) I get
198 # an unhandled error, which makes no sense. I shouldn't have
199 # to use a print statement here. My only thought is that
200 # at the beginning of the process the logging is still starting up
201 print "Error connecting to controller:", f.getErrorMessage()
195 log.msg('Error connecting to controller. This usually means that '
196 'i) the controller was not started, ii) a firewall was blocking '
197 'the engine from connecting to the controller or iii) the engine '
198 ' was not pointed at the right FURL file:')
199 log.msg(f.getErrorMessage())
202 200 reactor.callLater(0.1, reactor.stop)
203 201
204 202 d.addErrback(handle_error)
205 203
206 204 def start_mpi(self):
207 205 global mpi
208 206 mpikey = self.master_config.MPI.use
209 207 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
210 208 if mpi_import_statement is not None:
211 209 try:
212 210 self.log.info("Initializing MPI:")
213 211 self.log.info(mpi_import_statement)
214 212 exec mpi_import_statement in globals()
215 213 except:
216 214 mpi = None
217 215 else:
218 216 mpi = None
219 217
220 218 def start_logging(self):
221 219 if self.master_config.Global.log_to_file:
222 220 log_filename = self.name + '-' + str(os.getpid()) + '.log'
223 221 logfile = os.path.join(self.log_dir, log_filename)
224 222 open_log_file = open(logfile, 'w')
225 223 else:
226 224 open_log_file = sys.stdout
227 225 log.startLogging(open_log_file)
228 226
229 227 def exec_lines(self):
230 228 for line in self.master_config.Global.exec_lines:
231 229 try:
232 230 log.msg("Executing statement: '%s'" % line)
233 231 self.engine_service.execute(line)
234 232 except:
235 233 log.msg("Error executing statement: %s" % line)
236 234
237 235 def start_app(self):
238 236 # Start the controller service and set things running
239 237 reactor.run()
240 238
241 239
242 240 def launch_new_instance():
243 241 """Create and run the IPython controller"""
244 242 app = IPEngineApp()
245 243 app.start()
244
245
246 if __name__ == '__main__':
247 launch_new_instance()
248
@@ -1,22 +1,18 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 """ipcluster script"""
5
6 __docformat__ = "restructuredtext en"
7
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2008-2009 The IPython Development Team
10 6 #
11 7 # Distributed under the terms of the BSD License. The full license is in
12 8 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
14 10
15 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
16 12 # Imports
17 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
18 15
19 if __name__ == '__main__':
20 from IPython.kernel.scripts import ipcluster
21 ipcluster.main()
16 from IPython.kernel.ipclusterapp import launch_new_instance
22 17
18 launch_new_instance()
@@ -1,813 +1,811 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.utils.genutils import (
33 33 get_ipython_dir,
34 34 get_log_dir,
35 35 get_security_dir,
36 36 num_cpus
37 37 )
38 38 from IPython.kernel.fcutil import have_crypto
39 39
40 40 # Create various ipython directories if they don't exist.
41 41 # This must be done before IPython.kernel.config is imported.
42 42 from IPython.core.oldusersetup import user_setup
43 43 if os.name == 'posix':
44 44 rc_suffix = ''
45 45 else:
46 46 rc_suffix = '.ini'
47 47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 48 get_log_dir()
49 49 get_security_dir()
50 50
51 51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
54 52 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
53
56 54
57 55 #-----------------------------------------------------------------------------
58 56 # General process handling code
59 57 #-----------------------------------------------------------------------------
60 58
61 59
62 60 class ProcessStateError(Exception):
63 61 pass
64 62
65 63 class UnknownStatus(Exception):
66 64 pass
67 65
68 66 class LauncherProcessProtocol(ProcessProtocol):
69 67 """
70 68 A ProcessProtocol to go with the ProcessLauncher.
71 69 """
72 70 def __init__(self, process_launcher):
73 71 self.process_launcher = process_launcher
74 72
75 73 def connectionMade(self):
76 74 self.process_launcher.fire_start_deferred(self.transport.pid)
77 75
78 76 def processEnded(self, status):
79 77 value = status.value
80 78 if isinstance(value, ProcessDone):
81 79 self.process_launcher.fire_stop_deferred(0)
82 80 elif isinstance(value, ProcessTerminated):
83 81 self.process_launcher.fire_stop_deferred(
84 82 {'exit_code':value.exitCode,
85 83 'signal':value.signal,
86 84 'status':value.status
87 85 }
88 86 )
89 87 else:
90 88 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 89
92 90 def outReceived(self, data):
93 91 log.msg(data)
94 92
95 93 def errReceived(self, data):
96 94 log.err(data)
97 95
98 96 class ProcessLauncher(object):
99 97 """
100 98 Start and stop an external process in an asynchronous manner.
101 99
102 100 Currently this uses deferreds to notify other parties of process state
103 101 changes. This is an awkward design and should be moved to using
104 102 a formal NotificationCenter.
105 103 """
106 104 def __init__(self, cmd_and_args):
107 105 self.cmd = cmd_and_args[0]
108 106 self.args = cmd_and_args
109 107 self._reset()
110 108
111 109 def _reset(self):
112 110 self.process_protocol = None
113 111 self.pid = None
114 112 self.start_deferred = None
115 113 self.stop_deferreds = []
116 114 self.state = 'before' # before, running, or after
117 115
118 116 @property
119 117 def running(self):
120 118 if self.state == 'running':
121 119 return True
122 120 else:
123 121 return False
124 122
125 123 def fire_start_deferred(self, pid):
126 124 self.pid = pid
127 125 self.state = 'running'
128 126 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 127 self.start_deferred.callback(pid)
130 128
131 129 def start(self):
132 130 if self.state == 'before':
133 131 self.process_protocol = LauncherProcessProtocol(self)
134 132 self.start_deferred = defer.Deferred()
135 133 self.process_transport = reactor.spawnProcess(
136 134 self.process_protocol,
137 135 self.cmd,
138 136 self.args,
139 137 env=os.environ
140 138 )
141 139 return self.start_deferred
142 140 else:
143 s = 'the process has already been started and has state: %r' % \
141 s = 'The process has already been started and has state: %r' % \
144 142 self.state
145 143 return defer.fail(ProcessStateError(s))
146 144
147 145 def get_stop_deferred(self):
148 146 if self.state == 'running' or self.state == 'before':
149 147 d = defer.Deferred()
150 148 self.stop_deferreds.append(d)
151 149 return d
152 150 else:
153 151 s = 'this process is already complete'
154 152 return defer.fail(ProcessStateError(s))
155 153
156 154 def fire_stop_deferred(self, exit_code):
157 155 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 156 self.state = 'after'
159 157 for d in self.stop_deferreds:
160 158 d.callback(exit_code)
161 159
162 160 def signal(self, sig):
163 161 """
164 162 Send a signal to the process.
165 163
166 164 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 165 """
168 166 if self.state == 'running':
169 167 self.process_transport.signalProcess(sig)
170 168
171 169 # def __del__(self):
172 170 # self.signal('KILL')
173 171
174 172 def interrupt_then_kill(self, delay=1.0):
175 173 self.signal('INT')
176 174 reactor.callLater(delay, self.signal, 'KILL')
177 175
178 176
179 177 #-----------------------------------------------------------------------------
180 178 # Code for launching controller and engines
181 179 #-----------------------------------------------------------------------------
182 180
183 181
184 182 class ControllerLauncher(ProcessLauncher):
185 183
186 184 def __init__(self, extra_args=None):
187 185 if sys.platform == 'win32':
188 186 # This logic is needed because the ipcontroller script doesn't
189 187 # always get installed in the same way or in the same location.
190 188 from IPython.kernel.scripts import ipcontroller
191 189 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 190 # The -u option here turns on unbuffered output, which is required
193 191 # on Win32 to prevent wierd conflict and problems with Twisted.
194 192 # Also, use sys.executable to make sure we are picking up the
195 193 # right python exe.
196 194 args = [sys.executable, '-u', script_location]
197 195 else:
198 196 args = ['ipcontroller']
199 197 self.extra_args = extra_args
200 198 if extra_args is not None:
201 199 args.extend(extra_args)
202 200
203 201 ProcessLauncher.__init__(self, args)
204 202
205 203
206 204 class EngineLauncher(ProcessLauncher):
207 205
208 206 def __init__(self, extra_args=None):
209 207 if sys.platform == 'win32':
210 208 # This logic is needed because the ipcontroller script doesn't
211 209 # always get installed in the same way or in the same location.
212 210 from IPython.kernel.scripts import ipengine
213 211 script_location = ipengine.__file__.replace('.pyc', '.py')
214 212 # The -u option here turns on unbuffered output, which is required
215 213 # on Win32 to prevent wierd conflict and problems with Twisted.
216 214 # Also, use sys.executable to make sure we are picking up the
217 215 # right python exe.
218 216 args = [sys.executable, '-u', script_location]
219 217 else:
220 218 args = ['ipengine']
221 219 self.extra_args = extra_args
222 220 if extra_args is not None:
223 221 args.extend(extra_args)
224 222
225 223 ProcessLauncher.__init__(self, args)
226 224
227 225
228 226 class LocalEngineSet(object):
229 227
230 228 def __init__(self, extra_args=None):
231 229 self.extra_args = extra_args
232 230 self.launchers = []
233 231
234 232 def start(self, n):
235 233 dlist = []
236 234 for i in range(n):
237 235 el = EngineLauncher(extra_args=self.extra_args)
238 236 d = el.start()
239 237 self.launchers.append(el)
240 238 dlist.append(d)
241 239 dfinal = gatherBoth(dlist, consumeErrors=True)
242 240 dfinal.addCallback(self._handle_start)
243 241 return dfinal
244 242
245 243 def _handle_start(self, r):
246 244 log.msg('Engines started with pids: %r' % r)
247 245 return r
248 246
249 247 def _handle_stop(self, r):
250 248 log.msg('Engines received signal: %r' % r)
251 249 return r
252 250
253 251 def signal(self, sig):
254 252 dlist = []
255 253 for el in self.launchers:
256 254 d = el.get_stop_deferred()
257 255 dlist.append(d)
258 256 el.signal(sig)
259 257 dfinal = gatherBoth(dlist, consumeErrors=True)
260 258 dfinal.addCallback(self._handle_stop)
261 259 return dfinal
262 260
263 261 def interrupt_then_kill(self, delay=1.0):
264 262 dlist = []
265 263 for el in self.launchers:
266 264 d = el.get_stop_deferred()
267 265 dlist.append(d)
268 266 el.interrupt_then_kill(delay)
269 267 dfinal = gatherBoth(dlist, consumeErrors=True)
270 268 dfinal.addCallback(self._handle_stop)
271 269 return dfinal
272 270
273 271
274 272 class BatchEngineSet(object):
275 273
276 274 # Subclasses must fill these in. See PBSEngineSet
277 275 submit_command = ''
278 276 delete_command = ''
279 277 job_id_regexp = ''
280 278
281 279 def __init__(self, template_file, **kwargs):
282 280 self.template_file = template_file
283 281 self.context = {}
284 282 self.context.update(kwargs)
285 283 self.batch_file = self.template_file+'-run'
286 284
287 285 def parse_job_id(self, output):
288 286 m = re.match(self.job_id_regexp, output)
289 287 if m is not None:
290 288 job_id = m.group()
291 289 else:
292 290 raise Exception("job id couldn't be determined: %s" % output)
293 291 self.job_id = job_id
294 292 log.msg('Job started with job id: %r' % job_id)
295 293 return job_id
296 294
297 295 def write_batch_script(self, n):
298 296 self.context['n'] = n
299 297 template = open(self.template_file, 'r').read()
300 298 log.msg('Using template for batch script: %s' % self.template_file)
301 299 script_as_string = Itpl.itplns(template, self.context)
302 300 log.msg('Writing instantiated batch script: %s' % self.batch_file)
303 301 f = open(self.batch_file,'w')
304 302 f.write(script_as_string)
305 303 f.close()
306 304
307 305 def handle_error(self, f):
308 306 f.printTraceback()
309 307 f.raiseException()
310 308
311 309 def start(self, n):
312 310 self.write_batch_script(n)
313 311 d = getProcessOutput(self.submit_command,
314 312 [self.batch_file],env=os.environ)
315 313 d.addCallback(self.parse_job_id)
316 314 d.addErrback(self.handle_error)
317 315 return d
318 316
319 317 def kill(self):
320 318 d = getProcessOutput(self.delete_command,
321 319 [self.job_id],env=os.environ)
322 320 return d
323 321
324 322 class PBSEngineSet(BatchEngineSet):
325 323
326 324 submit_command = 'qsub'
327 325 delete_command = 'qdel'
328 326 job_id_regexp = '\d+'
329 327
330 328 def __init__(self, template_file, **kwargs):
331 329 BatchEngineSet.__init__(self, template_file, **kwargs)
332 330
333 331
334 332 sshx_template="""#!/bin/sh
335 333 "$@" &> /dev/null &
336 334 echo $!
337 335 """
338 336
339 337 engine_killer_template="""#!/bin/sh
340 338 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
341 339 """
342 340
343 341 class SSHEngineSet(object):
344 342 sshx_template=sshx_template
345 343 engine_killer_template=engine_killer_template
346 344
347 345 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
348 346 """Start a controller on localhost and engines using ssh.
349 347
350 348 The engine_hosts argument is a dict with hostnames as keys and
351 349 the number of engine (int) as values. sshx is the name of a local
352 350 file that will be used to run remote commands. This file is used
353 351 to setup the environment properly.
354 352 """
355 353
356 354 self.temp_dir = tempfile.gettempdir()
357 355 if sshx is not None:
358 356 self.sshx = sshx
359 357 else:
360 358 # Write the sshx.sh file locally from our template.
361 359 self.sshx = os.path.join(
362 360 self.temp_dir,
363 361 '%s-main-sshx.sh' % os.environ['USER']
364 362 )
365 363 f = open(self.sshx, 'w')
366 364 f.writelines(self.sshx_template)
367 365 f.close()
368 366 self.engine_command = ipengine
369 367 self.engine_hosts = engine_hosts
370 368 # Write the engine killer script file locally from our template.
371 369 self.engine_killer = os.path.join(
372 370 self.temp_dir,
373 371 '%s-local-engine_killer.sh' % os.environ['USER']
374 372 )
375 373 f = open(self.engine_killer, 'w')
376 374 f.writelines(self.engine_killer_template)
377 375 f.close()
378 376
379 377 def start(self, send_furl=False):
380 378 dlist = []
381 379 for host in self.engine_hosts.keys():
382 380 count = self.engine_hosts[host]
383 381 d = self._start(host, count, send_furl)
384 382 dlist.append(d)
385 383 return gatherBoth(dlist, consumeErrors=True)
386 384
387 385 def _start(self, hostname, count=1, send_furl=False):
388 386 if send_furl:
389 387 d = self._scp_furl(hostname)
390 388 else:
391 389 d = defer.succeed(None)
392 390 d.addCallback(lambda r: self._scp_sshx(hostname))
393 391 d.addCallback(lambda r: self._ssh_engine(hostname, count))
394 392 return d
395 393
396 394 def _scp_furl(self, hostname):
397 395 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
398 396 cmd_list = scp_cmd.split()
399 397 cmd_list[1] = os.path.expanduser(cmd_list[1])
400 398 log.msg('Copying furl file: %s' % scp_cmd)
401 399 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
402 400 return d
403 401
404 402 def _scp_sshx(self, hostname):
405 403 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
406 404 self.sshx, hostname,
407 405 self.temp_dir, os.environ['USER']
408 406 )
409 407 print
410 408 log.msg("Copying sshx: %s" % scp_cmd)
411 409 sshx_scp = scp_cmd.split()
412 410 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
413 411 return d
414 412
415 413 def _ssh_engine(self, hostname, count):
416 414 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
417 415 hostname, self.temp_dir,
418 416 os.environ['USER'], self.engine_command
419 417 )
420 418 cmds = exec_engine.split()
421 419 dlist = []
422 420 log.msg("about to start engines...")
423 421 for i in range(count):
424 422 log.msg('Starting engines: %s' % exec_engine)
425 423 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
426 424 dlist.append(d)
427 425 return gatherBoth(dlist, consumeErrors=True)
428 426
429 427 def kill(self):
430 428 dlist = []
431 429 for host in self.engine_hosts.keys():
432 430 d = self._killall(host)
433 431 dlist.append(d)
434 432 return gatherBoth(dlist, consumeErrors=True)
435 433
436 434 def _killall(self, hostname):
437 435 d = self._scp_engine_killer(hostname)
438 436 d.addCallback(lambda r: self._ssh_kill(hostname))
439 437 # d.addErrback(self._exec_err)
440 438 return d
441 439
442 440 def _scp_engine_killer(self, hostname):
443 441 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
444 442 self.engine_killer,
445 443 hostname,
446 444 self.temp_dir,
447 445 os.environ['USER']
448 446 )
449 447 cmds = scp_cmd.split()
450 448 log.msg('Copying engine_killer: %s' % scp_cmd)
451 449 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
452 450 return d
453 451
454 452 def _ssh_kill(self, hostname):
455 453 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
456 454 hostname,
457 455 self.temp_dir,
458 456 os.environ['USER']
459 457 )
460 458 log.msg('Killing engine: %s' % kill_cmd)
461 459 kill_cmd = kill_cmd.split()
462 460 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
463 461 return d
464 462
465 463 def _exec_err(self, r):
466 464 log.msg(r)
467 465
468 466 #-----------------------------------------------------------------------------
469 467 # Main functions for the different types of clusters
470 468 #-----------------------------------------------------------------------------
471 469
472 470 # TODO:
473 471 # The logic in these codes should be moved into classes like LocalCluster
474 472 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
475 473 # The main functions should then just parse the command line arguments, create
476 474 # the appropriate class and call a 'start' method.
477 475
478 476
479 477 def check_security(args, cont_args):
480 478 """Check to see if we should run with SSL support."""
481 479 if (not args.x or not args.y) and not have_crypto:
482 480 log.err("""
483 481 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
484 482 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
485 483 reactor.stop()
486 484 return False
487 485 if args.x:
488 486 cont_args.append('-x')
489 487 if args.y:
490 488 cont_args.append('-y')
491 489 return True
492 490
493 491
494 492 def check_reuse(args, cont_args):
495 493 """Check to see if we should try to resuse FURL files."""
496 494 if args.r:
497 495 cont_args.append('-r')
498 496 if args.client_port == 0 or args.engine_port == 0:
499 497 log.err("""
500 498 To reuse FURL files, you must also set the client and engine ports using
501 499 the --client-port and --engine-port options.""")
502 500 reactor.stop()
503 501 return False
504 502 cont_args.append('--client-port=%i' % args.client_port)
505 503 cont_args.append('--engine-port=%i' % args.engine_port)
506 504 return True
507 505
508 506
509 507 def _err_and_stop(f):
510 508 """Errback to log a failure and halt the reactor on a fatal error."""
511 509 log.err(f)
512 510 reactor.stop()
513 511
514 512
515 513 def _delay_start(cont_pid, start_engines, furl_file, reuse):
516 514 """Wait for controller to create FURL files and the start the engines."""
517 515 if not reuse:
518 516 if os.path.isfile(furl_file):
519 517 os.unlink(furl_file)
520 518 log.msg('Waiting for controller to finish starting...')
521 519 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
522 520 d.addCallback(lambda _: log.msg('Controller started'))
523 521 d.addCallback(lambda _: start_engines(cont_pid))
524 522 return d
525 523
526 524
527 525 def main_local(args):
528 526 cont_args = []
529 527 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
530 528
531 529 # Check security settings before proceeding
532 530 if not check_security(args, cont_args):
533 531 return
534 532
535 533 # See if we are reusing FURL files
536 534 if not check_reuse(args, cont_args):
537 535 return
538 536
539 537 cl = ControllerLauncher(extra_args=cont_args)
540 538 dstart = cl.start()
541 539 def start_engines(cont_pid):
542 540 engine_args = []
543 541 engine_args.append('--logfile=%s' % \
544 542 pjoin(args.logdir,'ipengine%s-' % cont_pid))
545 543 eset = LocalEngineSet(extra_args=engine_args)
546 544 def shutdown(signum, frame):
547 545 log.msg('Stopping local cluster')
548 546 # We are still playing with the times here, but these seem
549 547 # to be reliable in allowing everything to exit cleanly.
550 548 eset.interrupt_then_kill(0.5)
551 549 cl.interrupt_then_kill(0.5)
552 550 reactor.callLater(1.0, reactor.stop)
553 551 signal.signal(signal.SIGINT,shutdown)
554 552 d = eset.start(args.n)
555 553 return d
556 554 config = kernel_config_manager.get_config_obj()
557 555 furl_file = config['controller']['engine_furl_file']
558 556 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
559 557 dstart.addErrback(_err_and_stop)
560 558
561 559
562 560 def main_mpi(args):
563 561 cont_args = []
564 562 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
565 563
566 564 # Check security settings before proceeding
567 565 if not check_security(args, cont_args):
568 566 return
569 567
570 568 # See if we are reusing FURL files
571 569 if not check_reuse(args, cont_args):
572 570 return
573 571
574 572 cl = ControllerLauncher(extra_args=cont_args)
575 573 dstart = cl.start()
576 574 def start_engines(cont_pid):
577 575 raw_args = [args.cmd]
578 576 raw_args.extend(['-n',str(args.n)])
579 577 raw_args.append('ipengine')
580 578 raw_args.append('-l')
581 579 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
582 580 if args.mpi:
583 581 raw_args.append('--mpi=%s' % args.mpi)
584 582 eset = ProcessLauncher(raw_args)
585 583 def shutdown(signum, frame):
586 584 log.msg('Stopping local cluster')
587 585 # We are still playing with the times here, but these seem
588 586 # to be reliable in allowing everything to exit cleanly.
589 587 eset.interrupt_then_kill(1.0)
590 588 cl.interrupt_then_kill(1.0)
591 589 reactor.callLater(2.0, reactor.stop)
592 590 signal.signal(signal.SIGINT,shutdown)
593 591 d = eset.start()
594 592 return d
595 593 config = kernel_config_manager.get_config_obj()
596 594 furl_file = config['controller']['engine_furl_file']
597 595 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
598 596 dstart.addErrback(_err_and_stop)
599 597
600 598
601 599 def main_pbs(args):
602 600 cont_args = []
603 601 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
604 602
605 603 # Check security settings before proceeding
606 604 if not check_security(args, cont_args):
607 605 return
608 606
609 607 # See if we are reusing FURL files
610 608 if not check_reuse(args, cont_args):
611 609 return
612 610
613 611 cl = ControllerLauncher(extra_args=cont_args)
614 612 dstart = cl.start()
615 613 def start_engines(r):
616 614 pbs_set = PBSEngineSet(args.pbsscript)
617 615 def shutdown(signum, frame):
618 616 log.msg('Stopping pbs cluster')
619 617 d = pbs_set.kill()
620 618 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
621 619 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
622 620 signal.signal(signal.SIGINT,shutdown)
623 621 d = pbs_set.start(args.n)
624 622 return d
625 623 config = kernel_config_manager.get_config_obj()
626 624 furl_file = config['controller']['engine_furl_file']
627 625 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
628 626 dstart.addErrback(_err_and_stop)
629 627
630 628
631 629 def main_ssh(args):
632 630 """Start a controller on localhost and engines using ssh.
633 631
634 632 Your clusterfile should look like::
635 633
636 634 send_furl = False # True, if you want
637 635 engines = {
638 636 'engine_host1' : engine_count,
639 637 'engine_host2' : engine_count2
640 638 }
641 639 """
642 640 clusterfile = {}
643 641 execfile(args.clusterfile, clusterfile)
644 642 if not clusterfile.has_key('send_furl'):
645 643 clusterfile['send_furl'] = False
646 644
647 645 cont_args = []
648 646 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
649 647
650 648 # Check security settings before proceeding
651 649 if not check_security(args, cont_args):
652 650 return
653 651
654 652 # See if we are reusing FURL files
655 653 if not check_reuse(args, cont_args):
656 654 return
657 655
658 656 cl = ControllerLauncher(extra_args=cont_args)
659 657 dstart = cl.start()
660 658 def start_engines(cont_pid):
661 659 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
662 660 def shutdown(signum, frame):
663 661 d = ssh_set.kill()
664 662 cl.interrupt_then_kill(1.0)
665 663 reactor.callLater(2.0, reactor.stop)
666 664 signal.signal(signal.SIGINT,shutdown)
667 665 d = ssh_set.start(clusterfile['send_furl'])
668 666 return d
669 667 config = kernel_config_manager.get_config_obj()
670 668 furl_file = config['controller']['engine_furl_file']
671 669 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
672 670 dstart.addErrback(_err_and_stop)
673 671
674 672
675 673 def get_args():
676 674 base_parser = argparse.ArgumentParser(add_help=False)
677 675 base_parser.add_argument(
678 676 '-r',
679 677 action='store_true',
680 678 dest='r',
681 679 help='try to reuse FURL files. Use with --client-port and --engine-port'
682 680 )
683 681 base_parser.add_argument(
684 682 '--client-port',
685 683 type=int,
686 684 dest='client_port',
687 685 help='the port the controller will listen on for client connections',
688 686 default=0
689 687 )
690 688 base_parser.add_argument(
691 689 '--engine-port',
692 690 type=int,
693 691 dest='engine_port',
694 692 help='the port the controller will listen on for engine connections',
695 693 default=0
696 694 )
697 695 base_parser.add_argument(
698 696 '-x',
699 697 action='store_true',
700 698 dest='x',
701 699 help='turn off client security'
702 700 )
703 701 base_parser.add_argument(
704 702 '-y',
705 703 action='store_true',
706 704 dest='y',
707 705 help='turn off engine security'
708 706 )
709 707 base_parser.add_argument(
710 708 "--logdir",
711 709 type=str,
712 710 dest="logdir",
713 711 help="directory to put log files (default=$IPYTHONDIR/log)",
714 712 default=pjoin(get_ipython_dir(),'log')
715 713 )
716 714 base_parser.add_argument(
717 715 "-n",
718 716 "--num",
719 717 type=int,
720 718 dest="n",
721 719 default=2,
722 720 help="the number of engines to start"
723 721 )
724 722
725 723 parser = argparse.ArgumentParser(
726 724 description='IPython cluster startup. This starts a controller and\
727 725 engines using various approaches. Use the IPYTHONDIR environment\
728 726 variable to change your IPython directory from the default of\
729 727 .ipython or _ipython. The log and security subdirectories of your\
730 728 IPython directory will be used by this script for log files and\
731 729 security files.'
732 730 )
733 731 subparsers = parser.add_subparsers(
734 732 help='available cluster types. For help, do "ipcluster TYPE --help"')
735 733
736 734 parser_local = subparsers.add_parser(
737 735 'local',
738 736 help='run a local cluster',
739 737 parents=[base_parser]
740 738 )
741 739 parser_local.set_defaults(func=main_local)
742 740
743 741 parser_mpirun = subparsers.add_parser(
744 742 'mpirun',
745 743 help='run a cluster using mpirun (mpiexec also works)',
746 744 parents=[base_parser]
747 745 )
748 746 parser_mpirun.add_argument(
749 747 "--mpi",
750 748 type=str,
751 749 dest="mpi", # Don't put a default here to allow no MPI support
752 750 help="how to call MPI_Init (default=mpi4py)"
753 751 )
754 752 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
755 753
756 754 parser_mpiexec = subparsers.add_parser(
757 755 'mpiexec',
758 756 help='run a cluster using mpiexec (mpirun also works)',
759 757 parents=[base_parser]
760 758 )
761 759 parser_mpiexec.add_argument(
762 760 "--mpi",
763 761 type=str,
764 762 dest="mpi", # Don't put a default here to allow no MPI support
765 763 help="how to call MPI_Init (default=mpi4py)"
766 764 )
767 765 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
768 766
769 767 parser_pbs = subparsers.add_parser(
770 768 'pbs',
771 769 help='run a pbs cluster',
772 770 parents=[base_parser]
773 771 )
774 772 parser_pbs.add_argument(
775 773 '--pbs-script',
776 774 type=str,
777 775 dest='pbsscript',
778 776 help='PBS script template',
779 777 default='pbs.template'
780 778 )
781 779 parser_pbs.set_defaults(func=main_pbs)
782 780
783 781 parser_ssh = subparsers.add_parser(
784 782 'ssh',
785 783 help='run a cluster using ssh, should have ssh-keys setup',
786 784 parents=[base_parser]
787 785 )
788 786 parser_ssh.add_argument(
789 787 '--clusterfile',
790 788 type=str,
791 789 dest='clusterfile',
792 790 help='python file describing the cluster',
793 791 default='clusterfile.py',
794 792 )
795 793 parser_ssh.add_argument(
796 794 '--sshx',
797 795 type=str,
798 796 dest='sshx',
799 797 help='sshx launcher helper'
800 798 )
801 799 parser_ssh.set_defaults(func=main_ssh)
802 800
803 801 args = parser.parse_args()
804 802 return args
805 803
806 804 def main():
807 805 args = get_args()
808 806 reactor.callWhenRunning(args.func, args)
809 807 log.startLogging(sys.stdout)
810 808 reactor.run()
811 809
812 810 if __name__ == '__main__':
813 811 main()
@@ -1,249 +1,256 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Things directly related to all of twisted."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import os, sys
20 20 import threading, Queue, atexit
21 21
22 22 import twisted
23 23 from twisted.internet import defer, reactor
24 24 from twisted.python import log, failure
25 25
26 26 from IPython.kernel.error import FileTimeoutError
27 27
28 28 #-------------------------------------------------------------------------------
29 29 # Classes related to twisted and threads
30 30 #-------------------------------------------------------------------------------
31 31
32 32
33 33 class ReactorInThread(threading.Thread):
34 34 """Run the twisted reactor in a different thread.
35 35
36 36 For the process to be able to exit cleanly, do the following:
37 37
38 38 rit = ReactorInThread()
39 39 rit.setDaemon(True)
40 40 rit.start()
41 41
42 42 """
43 43
44 44 def run(self):
45 45 reactor.run(installSignalHandlers=0)
46 46 # self.join()
47 47
48 48 def stop(self):
49 49 # I don't think this does anything useful.
50 50 blockingCallFromThread(reactor.stop)
51 51 self.join()
52 52
53 53 if(twisted.version.major >= 8):
54 54 import twisted.internet.threads
55 55 def blockingCallFromThread(f, *a, **kw):
56 56 """
57 57 Run a function in the reactor from a thread, and wait for the result
58 58 synchronously, i.e. until the callback chain returned by the function get a
59 59 result.
60 60
61 61 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
62 62 passing twisted.internet.reactor for the first argument.
63 63
64 64 @param f: the callable to run in the reactor thread
65 65 @type f: any callable.
66 66 @param a: the arguments to pass to C{f}.
67 67 @param kw: the keyword arguments to pass to C{f}.
68 68
69 69 @return: the result of the callback chain.
70 70 @raise: any error raised during the callback chain.
71 71 """
72 72 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
73 73
74 74 else:
75 75 def blockingCallFromThread(f, *a, **kw):
76 76 """
77 77 Run a function in the reactor from a thread, and wait for the result
78 78 synchronously, i.e. until the callback chain returned by the function get a
79 79 result.
80 80
81 81 @param f: the callable to run in the reactor thread
82 82 @type f: any callable.
83 83 @param a: the arguments to pass to C{f}.
84 84 @param kw: the keyword arguments to pass to C{f}.
85 85
86 86 @return: the result of the callback chain.
87 87 @raise: any error raised during the callback chain.
88 88 """
89 89 from twisted.internet import reactor
90 90 queue = Queue.Queue()
91 91 def _callFromThread():
92 92 result = defer.maybeDeferred(f, *a, **kw)
93 93 result.addBoth(queue.put)
94 94
95 95 reactor.callFromThread(_callFromThread)
96 96 result = queue.get()
97 97 if isinstance(result, failure.Failure):
98 98 # This makes it easier for the debugger to get access to the instance
99 99 try:
100 100 result.raiseException()
101 101 except Exception, e:
102 102 raise e
103 103 return result
104 104
105 105
106 106
107 107 #-------------------------------------------------------------------------------
108 108 # Things for managing deferreds
109 109 #-------------------------------------------------------------------------------
110 110
111 111
112 112 def parseResults(results):
113 113 """Pull out results/Failures from a DeferredList."""
114 114 return [x[1] for x in results]
115 115
116 116 def gatherBoth(dlist, fireOnOneCallback=0,
117 117 fireOnOneErrback=0,
118 118 consumeErrors=0,
119 119 logErrors=0):
120 120 """This is like gatherBoth, but sets consumeErrors=1."""
121 121 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
122 122 consumeErrors, logErrors)
123 123 if not fireOnOneCallback:
124 124 d.addCallback(parseResults)
125 125 return d
126 126
127 127 SUCCESS = True
128 128 FAILURE = False
129 129
130 130 class DeferredList(defer.Deferred):
131 131 """I combine a group of deferreds into one callback.
132 132
133 133 I track a list of L{Deferred}s for their callbacks, and make a single
134 134 callback when they have all completed, a list of (success, result)
135 135 tuples, 'success' being a boolean.
136 136
137 137 Note that you can still use a L{Deferred} after putting it in a
138 138 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
139 139 messages by adding errbacks to the Deferreds *after* putting them in the
140 140 DeferredList, as a DeferredList won't swallow the errors. (Although a more
141 141 convenient way to do this is simply to set the consumeErrors flag)
142 142
143 143 Note: This is a modified version of the twisted.internet.defer.DeferredList
144 144 """
145 145
146 146 fireOnOneCallback = 0
147 147 fireOnOneErrback = 0
148 148
149 149 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
150 150 consumeErrors=0, logErrors=0):
151 151 """Initialize a DeferredList.
152 152
153 153 @type deferredList: C{list} of L{Deferred}s
154 154 @param deferredList: The list of deferreds to track.
155 155 @param fireOnOneCallback: (keyword param) a flag indicating that
156 156 only one callback needs to be fired for me to call
157 157 my callback
158 158 @param fireOnOneErrback: (keyword param) a flag indicating that
159 159 only one errback needs to be fired for me to call
160 160 my errback
161 161 @param consumeErrors: (keyword param) a flag indicating that any errors
162 162 raised in the original deferreds should be
163 163 consumed by this DeferredList. This is useful to
164 164 prevent spurious warnings being logged.
165 165 """
166 166 self.resultList = [None] * len(deferredList)
167 167 defer.Deferred.__init__(self)
168 168 if len(deferredList) == 0 and not fireOnOneCallback:
169 169 self.callback(self.resultList)
170 170
171 171 # These flags need to be set *before* attaching callbacks to the
172 172 # deferreds, because the callbacks use these flags, and will run
173 173 # synchronously if any of the deferreds are already fired.
174 174 self.fireOnOneCallback = fireOnOneCallback
175 175 self.fireOnOneErrback = fireOnOneErrback
176 176 self.consumeErrors = consumeErrors
177 177 self.logErrors = logErrors
178 178 self.finishedCount = 0
179 179
180 180 index = 0
181 181 for deferred in deferredList:
182 182 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
183 183 callbackArgs=(index,SUCCESS),
184 184 errbackArgs=(index,FAILURE))
185 185 index = index + 1
186 186
187 187 def _cbDeferred(self, result, index, succeeded):
188 188 """(internal) Callback for when one of my deferreds fires.
189 189 """
190 190 self.resultList[index] = (succeeded, result)
191 191
192 192 self.finishedCount += 1
193 193 if not self.called:
194 194 if succeeded == SUCCESS and self.fireOnOneCallback:
195 195 self.callback((result, index))
196 196 elif succeeded == FAILURE and self.fireOnOneErrback:
197 197 # We have modified this to fire the errback chain with the actual
198 198 # Failure instance the originally occured rather than twisted's
199 199 # FirstError which wraps the failure
200 200 self.errback(result)
201 201 elif self.finishedCount == len(self.resultList):
202 202 self.callback(self.resultList)
203 203
204 204 if succeeded == FAILURE and self.logErrors:
205 205 log.err(result)
206 206 if succeeded == FAILURE and self.consumeErrors:
207 207 result = None
208 208
209 209 return result
210 210
211 211
212 212 def wait_for_file(filename, delay=0.1, max_tries=10):
213 213 """Wait (poll) for a file to be created.
214 214
215 215 This method returns a Deferred that will fire when a file exists. It
216 216 works by polling os.path.isfile in time intervals specified by the
217 217 delay argument. If `max_tries` is reached, it will errback with a
218 218 `FileTimeoutError`.
219 219
220 220 Parameters
221 221 ----------
222 222 filename : str
223 223 The name of the file to wait for.
224 224 delay : float
225 225 The time to wait between polls.
226 226 max_tries : int
227 227 The max number of attempts before raising `FileTimeoutError`
228 228
229 229 Returns
230 230 -------
231 231 d : Deferred
232 232 A Deferred instance that will fire when the file exists.
233 233 """
234 234
235 235 d = defer.Deferred()
236 236
237 237 def _test_for_file(filename, attempt=0):
238 238 if attempt >= max_tries:
239 239 d.errback(FileTimeoutError(
240 240 'timeout waiting for file to be created: %s' % filename
241 241 ))
242 242 else:
243 243 if os.path.isfile(filename):
244 244 d.callback(True)
245 245 else:
246 246 reactor.callLater(delay, _test_for_file, filename, attempt+1)
247 247
248 248 _test_for_file(filename)
249 249 return d
250
251
252 def sleep_deferred(seconds):
253 """Sleep without blocking the event loop."""
254 d = defer.Deferred()
255 reactor.callLater(seconds, d.callback, seconds)
256 return d
@@ -1,125 +1,143 b''
1 #!/usr/bin/env python
1 2 # encoding: utf-8
2
3 """The IPython Core Notification Center.
3 """
4 The IPython Core Notification Center.
4 5
5 6 See docs/source/development/notification_blueprint.txt for an overview of the
6 7 notification module.
7 """
8 8
9 __docformat__ = "restructuredtext en"
9 Authors:
10
11 * Barry Wark
12 * Brian Granger
13 """
10 14
11 15 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
16 # Copyright (C) 2008-2009 The IPython Development Team
13 17 #
14 18 # Distributed under the terms of the BSD License. The full license is in
15 19 # the file COPYING, distributed as part of this software.
16 20 #-----------------------------------------------------------------------------
17 21
18 # Tell nose to skip the testing of this module
19 __test__ = {}
22 #-----------------------------------------------------------------------------
23 # Code
24 #-----------------------------------------------------------------------------
25
26
27 class NotificationError(Exception):
28 pass
29
20 30
21 31 class NotificationCenter(object):
22 """Synchronous notification center
32 """Synchronous notification center.
23 33
24 34 Examples
25 35 --------
26 >>> import IPython.kernel.core.notification as notification
27 >>> def callback(theType, theSender, args={}):
28 ... print theType,theSender,args
29 ...
30 >>> notification.sharedCenter.add_observer(callback, 'NOTIFICATION_TYPE', None)
31 >>> notification.sharedCenter.post_notification('NOTIFICATION_TYPE', object()) # doctest:+ELLIPSIS
32 NOTIFICATION_TYPE ...
36 Here is a simple example of how to use this::
33 37
38 import IPython.kernel.core.notification as notification
39 def callback(ntype, theSender, args={}):
40 print ntype,theSender,args
41
42 notification.sharedCenter.add_observer(callback, 'NOTIFICATION_TYPE', None)
43 notification.sharedCenter.post_notification('NOTIFICATION_TYPE', object()) # doctest:+ELLIPSIS
44 NOTIFICATION_TYPE ...
34 45 """
35 46 def __init__(self):
36 47 super(NotificationCenter, self).__init__()
37 48 self._init_observers()
38 49
39
40 50 def _init_observers(self):
41 51 """Initialize observer storage"""
42 52
43 53 self.registered_types = set() #set of types that are observed
44 54 self.registered_senders = set() #set of senders that are observed
45 55 self.observers = {} #map (type,sender) => callback (callable)
46 56
57 def post_notification(self, ntype, sender, *args, **kwargs):
58 """Post notification to all registered observers.
47 59
48 def post_notification(self, theType, sender, **kwargs):
49 """Post notification (type,sender,**kwargs) to all registered
50 observers.
60 The registered callback will be called as::
51 61
52 Implementation notes:
62 callback(ntype, sender, *args, **kwargs)
53 63
64 Parameters
65 ----------
66 ntype : hashable
67 The notification type.
68 sender : hashable
69 The object sending the notification.
70 *args : tuple
71 The positional arguments to be passed to the callback.
72 **kwargs : dict
73 The keyword argument to be passed to the callback.
74
75 Notes
76 -----
54 77 * If no registered observers, performance is O(1).
55 78 * Notificaiton order is undefined.
56 79 * Notifications are posted synchronously.
57 80 """
58 81
59 if(theType==None or sender==None):
60 raise Exception("NotificationCenter.post_notification requires \
61 type and sender.")
82 if(ntype==None or sender==None):
83 raise NotificationError(
84 "Notification type and sender are required.")
62 85
63 86 # If there are no registered observers for the type/sender pair
64 if((theType not in self.registered_types and
87 if((ntype not in self.registered_types and
65 88 None not in self.registered_types) or
66 89 (sender not in self.registered_senders and
67 90 None not in self.registered_senders)):
68 91 return
69 92
70 for o in self._observers_for_notification(theType, sender):
71 o(theType, sender, args=kwargs)
93 for o in self._observers_for_notification(ntype, sender):
94 o(ntype, sender, *args, **kwargs)
72 95
73
74 def _observers_for_notification(self, theType, sender):
96 def _observers_for_notification(self, ntype, sender):
75 97 """Find all registered observers that should recieve notification"""
76 98
77 99 keys = (
78 (theType,sender),
79 (theType, None),
100 (ntype,sender),
101 (ntype, None),
80 102 (None, sender),
81 103 (None,None)
82 104 )
83 105
84
85 106 obs = set()
86 107 for k in keys:
87 108 obs.update(self.observers.get(k, set()))
88 109
89 110 return obs
90 111
91
92 def add_observer(self, callback, theType, sender):
112 def add_observer(self, callback, ntype, sender):
93 113 """Add an observer callback to this notification center.
94 114
95 115 The given callback will be called upon posting of notifications of
96 the given type/sender and will receive any additional kwargs passed
116 the given type/sender and will receive any additional arguments passed
97 117 to post_notification.
98 118
99 119 Parameters
100 120 ----------
101 observerCallback : callable
102 Callable. Must take at least two arguments::
103 observerCallback(type, sender, args={})
104
105 theType : hashable
121 callback : callable
122 The callable that will be called by :meth:`post_notification`
123 as ``callback(ntype, sender, *args, **kwargs)
124 ntype : hashable
106 125 The notification type. If None, all notifications from sender
107 126 will be posted.
108
109 127 sender : hashable
110 The notification sender. If None, all notifications of theType
128 The notification sender. If None, all notifications of ntype
111 129 will be posted.
112 130 """
113 131 assert(callback != None)
114 self.registered_types.add(theType)
132 self.registered_types.add(ntype)
115 133 self.registered_senders.add(sender)
116 self.observers.setdefault((theType,sender), set()).add(callback)
134 self.observers.setdefault((ntype,sender), set()).add(callback)
117 135
118 136 def remove_all_observers(self):
119 137 """Removes all observers from this notification center"""
120 138
121 139 self._init_observers()
122 140
123 141
124 142
125 sharedCenter = NotificationCenter()
143 shared_center = NotificationCenter()
@@ -1,161 +1,154 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the notification.py module."""
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2008-2009 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is
9 9 # in the file COPYING, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15
16 # Tell nose to skip this module
17 __test__ = {}
16 import unittest
18 17
19 from twisted.trial import unittest
20 import IPython.kernel.core.notification as notification
18 from IPython.utils.notification import (
19 NotificationCenter,
20 NotificationError,
21 shared_center
22 )
21 23
22 24 #-----------------------------------------------------------------------------
23 25 # Support Classes
24 26 #-----------------------------------------------------------------------------
25 27
28
26 29 class Observer(object):
27 """docstring for Observer"""
28 def __init__(self, expectedType, expectedSender,
29 center=notification.sharedCenter, **kwargs):
30
31 def __init__(self, expected_ntype, expected_sender,
32 center=shared_center, *args, **kwargs):
30 33 super(Observer, self).__init__()
31 self.expectedType = expectedType
32 self.expectedSender = expectedSender
33 self.expectedKwArgs = kwargs
34 self.expected_ntype = expected_ntype
35 self.expected_sender = expected_sender
36 self.expected_args = args
37 self.expected_kwargs = kwargs
34 38 self.recieved = False
35 39 center.add_observer(self.callback,
36 self.expectedType,
37 self.expectedSender)
38
39 def callback(self, theType, sender, args={}):
40 """callback"""
41
42 assert(theType == self.expectedType or
43 self.expectedType == None)
44 assert(sender == self.expectedSender or
45 self.expectedSender == None)
46 assert(args == self.expectedKwArgs)
40 self.expected_ntype,
41 self.expected_sender)
42
43 def callback(self, ntype, sender, *args, **kwargs):
44 assert(ntype == self.expected_ntype or
45 self.expected_ntype == None)
46 assert(sender == self.expected_sender or
47 self.expected_sender == None)
48 assert(args == self.expected_args)
49 assert(kwargs == self.expected_kwargs)
47 50 self.recieved = True
48 51
49 52 def verify(self):
50 """verify"""
51
52 53 assert(self.recieved)
53 54
54 55 def reset(self):
55 """reset"""
56
57 56 self.recieved = False
58 57
59 58
60 59 class Notifier(object):
61 """docstring for Notifier"""
62 def __init__(self, theType, **kwargs):
60
61 def __init__(self, ntype, **kwargs):
63 62 super(Notifier, self).__init__()
64 self.theType = theType
63 self.ntype = ntype
65 64 self.kwargs = kwargs
66 65
67 def post(self, center=notification.sharedCenter):
68 """fire"""
66 def post(self, center=shared_center):
69 67
70 center.post_notification(self.theType, self,
68 center.post_notification(self.ntype, self,
71 69 **self.kwargs)
72 70
71
73 72 #-----------------------------------------------------------------------------
74 73 # Tests
75 74 #-----------------------------------------------------------------------------
76 75
76
77 77 class NotificationTests(unittest.TestCase):
78 """docstring for NotificationTests"""
79 78
80 79 def tearDown(self):
81 notification.sharedCenter.remove_all_observers()
80 shared_center.remove_all_observers()
82 81
83 82 def test_notification_delivered(self):
84 83 """Test that notifications are delivered"""
85 expectedType = 'EXPECTED_TYPE'
86 sender = Notifier(expectedType)
87 observer = Observer(expectedType, sender)
88 84
89 sender.post()
85 expected_ntype = 'EXPECTED_TYPE'
86 sender = Notifier(expected_ntype)
87 observer = Observer(expected_ntype, sender)
90 88
89 sender.post()
91 90 observer.verify()
92 91
93 92 def test_type_specificity(self):
94 93 """Test that observers are registered by type"""
95 94
96 expectedType = 1
97 unexpectedType = "UNEXPECTED_TYPE"
98 sender = Notifier(expectedType)
99 unexpectedSender = Notifier(unexpectedType)
100 observer = Observer(expectedType, sender)
95 expected_ntype = 1
96 unexpected_ntype = "UNEXPECTED_TYPE"
97 sender = Notifier(expected_ntype)
98 unexpected_sender = Notifier(unexpected_ntype)
99 observer = Observer(expected_ntype, sender)
101 100
102 101 sender.post()
103 unexpectedSender.post()
104
102 unexpected_sender.post()
105 103 observer.verify()
106 104
107 105 def test_sender_specificity(self):
108 106 """Test that observers are registered by sender"""
109 107
110 expectedType = "EXPECTED_TYPE"
111 sender1 = Notifier(expectedType)
112 sender2 = Notifier(expectedType)
113 observer = Observer(expectedType, sender1)
108 expected_ntype = "EXPECTED_TYPE"
109 sender1 = Notifier(expected_ntype)
110 sender2 = Notifier(expected_ntype)
111 observer = Observer(expected_ntype, sender1)
114 112
115 113 sender1.post()
116 114 sender2.post()
117 115
118 116 observer.verify()
119 117
120 118 def test_remove_all_observers(self):
121 119 """White-box test for remove_all_observers"""
122 120
123 121 for i in xrange(10):
124 Observer('TYPE', None, center=notification.sharedCenter)
122 Observer('TYPE', None, center=shared_center)
125 123
126 self.assert_(len(notification.sharedCenter.observers[('TYPE',None)]) >= 10,
124 self.assert_(len(shared_center.observers[('TYPE',None)]) >= 10,
127 125 "observers registered")
128 126
129 notification.sharedCenter.remove_all_observers()
130
131 self.assert_(len(notification.sharedCenter.observers) == 0, "observers removed")
127 shared_center.remove_all_observers()
128 self.assert_(len(shared_center.observers) == 0, "observers removed")
132 129
133 130 def test_any_sender(self):
134 """test_any_sender"""
135
136 expectedType = "EXPECTED_TYPE"
137 sender1 = Notifier(expectedType)
138 sender2 = Notifier(expectedType)
139 observer = Observer(expectedType, None)
140
131 expected_ntype = "EXPECTED_TYPE"
132 sender1 = Notifier(expected_ntype)
133 sender2 = Notifier(expected_ntype)
134 observer = Observer(expected_ntype, None)
141 135
142 136 sender1.post()
143 137 observer.verify()
144 138
145 139 observer.reset()
146 140 sender2.post()
147 141 observer.verify()
148 142
149 143 def test_post_performance(self):
150 144 """Test that post_notification, even with many registered irrelevant
151 145 observers is fast"""
152 146
153 147 for i in xrange(10):
154 148 Observer("UNRELATED_TYPE", None)
155 149
156 150 o = Observer('EXPECTED_TYPE', None)
157
158 notification.sharedCenter.post_notification('EXPECTED_TYPE', self)
159
151 shared_center.post_notification('EXPECTED_TYPE', self)
160 152 o.verify()
161 153
154
@@ -1,210 +1,210 b''
1 1 #!/usr/bin/env python
2 2 # -*- coding: utf-8 -*-
3 3 """Setup script for IPython.
4 4
5 5 Under Posix environments it works like a typical setup.py script.
6 6 Under Windows, the command sdist is not supported, since IPython
7 7 requires utilities which are not available under Windows."""
8 8
9 9 #-------------------------------------------------------------------------------
10 10 # Copyright (C) 2008 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-------------------------------------------------------------------------------
15 15
16 16 #-------------------------------------------------------------------------------
17 17 # Imports
18 18 #-------------------------------------------------------------------------------
19 19
20 20 # Stdlib imports
21 21 import os
22 22 import sys
23 23
24 24 from glob import glob
25 25
26 26 # BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
27 27 # update it when the contents of directories change.
28 28 if os.path.exists('MANIFEST'): os.remove('MANIFEST')
29 29
30 30 from distutils.core import setup
31 31
32 32 from IPython.utils.genutils import target_update
33 33
34 34 from setupbase import (
35 35 setup_args,
36 36 find_packages,
37 37 find_package_data,
38 38 find_scripts,
39 39 find_data_files,
40 40 check_for_dependencies
41 41 )
42 42
43 43 isfile = os.path.isfile
44 44 pjoin = os.path.join
45 45
46 46 #-------------------------------------------------------------------------------
47 47 # Handle OS specific things
48 48 #-------------------------------------------------------------------------------
49 49
50 50 if os.name == 'posix':
51 51 os_name = 'posix'
52 52 elif os.name in ['nt','dos']:
53 53 os_name = 'windows'
54 54 else:
55 55 print 'Unsupported operating system:',os.name
56 56 sys.exit(1)
57 57
58 58 # Under Windows, 'sdist' has not been supported. Now that the docs build with
59 59 # Sphinx it might work, but let's not turn it on until someone confirms that it
60 60 # actually works.
61 61 if os_name == 'windows' and 'sdist' in sys.argv:
62 62 print 'The sdist command is not available under Windows. Exiting.'
63 63 sys.exit(1)
64 64
65 65 #-------------------------------------------------------------------------------
66 66 # Things related to the IPython documentation
67 67 #-------------------------------------------------------------------------------
68 68
69 69 # update the manuals when building a source dist
70 70 if len(sys.argv) >= 2 and sys.argv[1] in ('sdist','bdist_rpm'):
71 71 import textwrap
72 72
73 73 # List of things to be updated. Each entry is a triplet of args for
74 74 # target_update()
75 75 to_update = [
76 76 # FIXME - Disabled for now: we need to redo an automatic way
77 77 # of generating the magic info inside the rst.
78 78 #('docs/magic.tex',
79 79 #['IPython/Magic.py'],
80 80 #"cd doc && ./update_magic.sh" ),
81 81
82 82 ('docs/man/ipcluster.1.gz',
83 83 ['docs/man/ipcluster.1'],
84 84 'cd docs/man && gzip -9c ipcluster.1 > ipcluster.1.gz'),
85 85
86 86 ('docs/man/ipcontroller.1.gz',
87 87 ['docs/man/ipcontroller.1'],
88 88 'cd docs/man && gzip -9c ipcontroller.1 > ipcontroller.1.gz'),
89 89
90 90 ('docs/man/ipengine.1.gz',
91 91 ['docs/man/ipengine.1'],
92 92 'cd docs/man && gzip -9c ipengine.1 > ipengine.1.gz'),
93 93
94 94 ('docs/man/ipython.1.gz',
95 95 ['docs/man/ipython.1'],
96 96 'cd docs/man && gzip -9c ipython.1 > ipython.1.gz'),
97 97
98 98 ('docs/man/ipython-wx.1.gz',
99 99 ['docs/man/ipython-wx.1'],
100 100 'cd docs/man && gzip -9c ipython-wx.1 > ipython-wx.1.gz'),
101 101
102 102 ('docs/man/ipythonx.1.gz',
103 103 ['docs/man/ipythonx.1'],
104 104 'cd docs/man && gzip -9c ipythonx.1 > ipythonx.1.gz'),
105 105
106 106 ('docs/man/irunner.1.gz',
107 107 ['docs/man/irunner.1'],
108 108 'cd docs/man && gzip -9c irunner.1 > irunner.1.gz'),
109 109
110 110 ('docs/man/pycolor.1.gz',
111 111 ['docs/man/pycolor.1'],
112 112 'cd docs/man && gzip -9c pycolor.1 > pycolor.1.gz'),
113 113 ]
114 114
115 115 # Only build the docs if sphinx is present
116 116 try:
117 117 import sphinx
118 118 except ImportError:
119 119 pass
120 120 else:
121 121 # The Makefile calls the do_sphinx scripts to build html and pdf, so
122 122 # just one target is enough to cover all manual generation
123 123
124 124 # First, compute all the dependencies that can force us to rebuild the
125 125 # docs. Start with the main release file that contains metadata
126 126 docdeps = ['IPython/core/release.py']
127 127 # Inculde all the reST sources
128 128 pjoin = os.path.join
129 129 for dirpath,dirnames,filenames in os.walk('docs/source'):
130 130 if dirpath in ['_static','_templates']:
131 131 continue
132 132 docdeps += [ pjoin(dirpath,f) for f in filenames
133 133 if f.endswith('.txt') ]
134 134 # and the examples
135 135 for dirpath,dirnames,filenames in os.walk('docs/example'):
136 136 docdeps += [ pjoin(dirpath,f) for f in filenames
137 137 if not f.endswith('~') ]
138 138 # then, make them all dependencies for the main PDF (the html will get
139 139 # auto-generated as well).
140 140 to_update.append(
141 141 ('docs/dist/ipython.pdf',
142 142 docdeps,
143 143 "cd docs && make dist")
144 144 )
145 145
146 146 [ target_update(*t) for t in to_update ]
147 147
148 148
149 149 #---------------------------------------------------------------------------
150 150 # Find all the packages, package data, scripts and data_files
151 151 #---------------------------------------------------------------------------
152 152
153 153 packages = find_packages()
154 154 package_data = find_package_data()
155 155 scripts = find_scripts()
156 156 data_files = find_data_files()
157 157
158 158 #---------------------------------------------------------------------------
159 159 # Handle dependencies and setuptools specific things
160 160 #---------------------------------------------------------------------------
161 161
162 162 # This dict is used for passing extra arguments that are setuptools
163 163 # specific to setup
164 164 setuptools_extra_args = {}
165 165
166 166 if 'setuptools' in sys.modules:
167 167 setuptools_extra_args['zip_safe'] = False
168 168 setuptools_extra_args['entry_points'] = {
169 169 'console_scripts': [
170 170 'ipython = IPython.core.ipapp:launch_new_instance',
171 171 'pycolor = IPython.utils.PyColorize:main',
172 172 'ipcontroller = IPython.kernel.ipcontrollerapp:launch_new_instance',
173 173 'ipengine = IPython.kernel.ipengineapp:launch_new_instance',
174 'ipcluster = IPython.kernel.scripts.ipcluster:main',
174 'ipcluster = IPython.kernel.ipclusterapp:launch_new_instance',
175 175 'ipythonx = IPython.frontend.wx.ipythonx:main',
176 176 'iptest = IPython.testing.iptest:main',
177 177 'irunner = IPython.lib.irunner:main'
178 178 ]
179 179 }
180 180 setup_args['extras_require'] = dict(
181 181 kernel = [
182 182 'zope.interface>=3.4.1',
183 183 'Twisted>=8.0.1',
184 184 'foolscap>=0.2.6'
185 185 ],
186 186 doc='Sphinx>=0.3',
187 187 test='nose>=0.10.1',
188 188 security='pyOpenSSL>=0.6'
189 189 )
190 190 # Allow setuptools to handle the scripts
191 191 scripts = []
192 192 else:
193 193 # If we are running without setuptools, call this function which will
194 194 # check for dependencies an inform the user what is needed. This is
195 195 # just to make life easy for users.
196 196 check_for_dependencies()
197 197
198 198
199 199 #---------------------------------------------------------------------------
200 200 # Do the actual setup now
201 201 #---------------------------------------------------------------------------
202 202
203 203 setup_args['packages'] = packages
204 204 setup_args['package_data'] = package_data
205 205 setup_args['scripts'] = scripts
206 206 setup_args['data_files'] = data_files
207 207 setup_args.update(setuptools_extra_args)
208 208
209 209 if __name__ == '__main__':
210 210 setup(**setup_args)
General Comments 0
You need to be logged in to leave comments. Login now